An Alternative Approach

… continued from the previous post.

Do Not Communicate by Sharing Memory; Instead, Share Memory by Communicating1

This approach can be taken too far. […] But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.1

Let’s just for comparison reformulate the scheme in the last post with shared variables and a sync.WaitGroup:

 1package main
 2
 3import (
 4	"context"
 5	"sync"
 6
 7	"fillmore-labs.com/blog/structured/pkg/task"
 8)
 9
10func doWork(ctx context.Context) error {
11	ctx, cancel := context.WithCancelCause(ctx)
12	defer cancel(nil)
13
14	var firstErr error
15	var once sync.Once
16	setErr := func(err error) {
17		if err == nil {
18			return
19		}
20		once.Do(func() {
21			firstErr = err
22			cancel(err)
23		})
24	}
25
26	var wg sync.WaitGroup
27
28	wg.Add(1)
29	go func() {
30		defer wg.Done()
31		err := task.Task(ctx, "task1", processingTime/3, nil)
32		setErr(err)
33	}()
34
35	wg.Add(1)
36	go func() {
37		defer wg.Done()
38		err := task.Task(ctx, "task2", processingTime/2, errFail)
39		setErr(err)
40	}()
41
42	err := task.Task(ctx, "task3", processingTime, nil)
43	setErr(err)
44
45	wg.Wait()
46
47	return firstErr
48}

Here we replace the error channel with a function storing the first error and canceling the context. This works fine:

> go run fillmore-labs.com/blog/structured/cmd/structured2
task1 <nil>
task2 failed
task3 context canceled
Got "failed" error in 501ms

But it is a lot of boilerplate.

Refactor and Separate Concerns

We can easily extract the orchestration part:

 1package main
 2
 3import (
 4	"context"
 5	"sync"
 6)
 7
 8type Group struct {
 9	err    error
10	cancel context.CancelCauseFunc
11	once   sync.Once
12	wg     sync.WaitGroup
13}
14
15func NewGroup(cancel context.CancelCauseFunc) *Group {
16	return &Group{cancel: cancel}
17}
18
19func (g *Group) Do(fn func() error) {
20	err := fn()
21	if err == nil {
22		return
23	}
24	g.once.Do(func() {
25		g.err = err
26		if g.cancel != nil {
27			g.cancel(err)
28		}
29	})
30}
31
32func (g *Group) Go(fn func() error) {
33	g.wg.Add(1)
34	go func() {
35		defer g.wg.Done()
36		g.Do(fn)
37	}()
38}
39
40func (g *Group) Wait() error {
41	g.wg.Wait()
42
43	return g.err
44}

Making our function only:

 1package main
 2
 3import (
 4	"context"
 5
 6	"fillmore-labs.com/blog/structured/pkg/task"
 7)
 8
 9func doWork(ctx context.Context) error {
10	ctx, cancel := context.WithCancelCause(ctx)
11	defer cancel(nil)
12
13	g := NewGroup(cancel)
14
15	g.Go(func() error {
16		return task.Task(ctx, "task1", processingTime/3, nil)
17	})
18
19	g.Go(func() error {
20		return task.Task(ctx, "task2", processingTime/2, errFail)
21	})
22
23	g.Do(func() error {
24		return task.Task(ctx, "task3", processingTime, nil)
25	})
26
27	return g.Wait()
28}

This separates processing and orchestration, which is nice and makes our code much more readable and improves testability.

Summary

Separating orchestration from the processing code, we can reach simplified structured concurrency with improved readability while eliminating some sources of resource leaks.

… continued in the next post.