An Alternative Approach

An Alternative Approach

25 March 2024

… continued from the previous post.

Do Not Communicate by Sharing Memory; Instead, Share Memory by Communicating1

This approach can be taken too far. […] But as a high-level approach, using channels to control access makes it easier to write clear, correct programs.1

Let’s just for comparison reformulate the scheme in the last post with shared variables and a sync.WaitGroup:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
	"context"
	"sync"

	"fillmore-labs.com/blog/structured/pkg/task"
)

func doWork(ctx context.Context) error {
	ctx, cancel := context.WithCancelCause(ctx)
	defer cancel(nil)

	var firstErr error
	var once sync.Once
	setErr := func(err error) {
		if err == nil {
			return
		}
		once.Do(func() {
			firstErr = err
			cancel(err)
		})
	}

	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		err := task.Task(ctx, "task1", processingTime/3, nil)
		setErr(err)
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		err := task.Task(ctx, "task2", processingTime/2, errFail)
		setErr(err)
	}()

	err := task.Task(ctx, "task3", processingTime, nil)
	setErr(err)

	wg.Wait()

	return firstErr
}

Here we replace the error channel with a function storing the first error and canceling the context. This works fine:

> go run fillmore-labs.com/blog/structured/cmd/structured2
task1 <nil>
task2 failed
task3 context canceled
Got "failed" error in 501ms

But it is a lot of boilerplate.

Refactor and Separate Concerns

We can easily extract the orchestration part:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
	"context"
	"sync"
)

type Group struct {
	err    error
	cancel context.CancelCauseFunc
	once   sync.Once
	wg     sync.WaitGroup
}

func NewGroup(cancel context.CancelCauseFunc) *Group {
	return &Group{cancel: cancel}
}

func (g *Group) Do(fn func() error) {
	err := fn()
	if err == nil {
		return
	}
	g.once.Do(func() {
		g.err = err
		if g.cancel != nil {
			g.cancel(err)
		}
	})
}

func (g *Group) Go(fn func() error) {
	g.wg.Add(1)
	go func() {
		defer g.wg.Done()
		g.Do(fn)
	}()
}

func (g *Group) Wait() error {
	g.wg.Wait()

	return g.err
}

Making our function only:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
	"context"

	"fillmore-labs.com/blog/structured/pkg/task"
)

func doWork(ctx context.Context) error {
	ctx, cancel := context.WithCancelCause(ctx)
	defer cancel(nil)

	g := NewGroup(cancel)

	g.Go(func() error {
		return task.Task(ctx, "task1", processingTime/3, nil)
	})

	g.Go(func() error {
		return task.Task(ctx, "task2", processingTime/2, errFail)
	})

	g.Do(func() error {
		return task.Task(ctx, "task3", processingTime, nil)
	})

	return g.Wait()
}

This separates processing and orchestration, which is nice and makes our code much more readable and improves testability.

Summary

Separating orchestration from the processing code, we can reach simplified structured concurrency with improved readability while eliminating some sources of resource leaks.

… continued in the next post.