An Alternative Approach Monday, March 25, 2024
… continued from the previous post .
Do Not Communicate by Sharing Memory; Instead, Share Memory by Communicating This approach can be taken too far. […] But as a high-level approach, using channels to control access makes it
easier to write clear, correct programs.
Let’s just for comparison reformulate the scheme in the last post with shared variables and a sync.WaitGroup
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main
import (
"context"
"sync"
"fillmore-labs.com/blog/structured/pkg/task"
)
func doWork ( ctx context . Context ) error {
ctx , cancel := context . WithCancelCause ( ctx )
defer cancel ( nil )
var firstErr error
var once sync . Once
setErr := func ( err error ) {
if err == nil {
return
}
once . Do ( func () {
firstErr = err
cancel ( err )
})
}
var wg sync . WaitGroup
wg . Add ( 1 )
go func () {
defer wg . Done ()
err := task . Task ( ctx , "task1" , processingTime / 3 , nil )
setErr ( err )
}()
wg . Add ( 1 )
go func () {
defer wg . Done ()
err := task . Task ( ctx , "task2" , processingTime / 2 , errFail )
setErr ( err )
}()
err := task . Task ( ctx , "task3" , processingTime , nil )
setErr ( err )
wg . Wait ()
return firstErr
}
Here we replace the error channel with a function storing the first error and canceling the context. This works fine:
> go run fillmore-labs.com/blog/structured/cmd/structured2
task1 <nil>
task2 failed
task3 context canceled
Got "failed" error in 501ms
But it is a lot of boilerplate.
Refactor and Separate Concerns We can easily extract the orchestration part:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"context"
"sync"
)
type Group struct {
err error
cancel context . CancelCauseFunc
once sync . Once
wg sync . WaitGroup
}
func NewGroup ( cancel context . CancelCauseFunc ) * Group {
return & Group { cancel : cancel }
}
func ( g * Group ) Do ( fn func () error ) {
err := fn ()
if err == nil {
return
}
g . once . Do ( func () {
g . err = err
if g . cancel != nil {
g . cancel ( err )
}
})
}
func ( g * Group ) Go ( fn func () error ) {
g . wg . Add ( 1 )
go func () {
defer g . wg . Done ()
g . Do ( fn )
}()
}
func ( g * Group ) Wait () error {
g . wg . Wait ()
return g . err
}
Making our function only:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main
import (
"context"
"fillmore-labs.com/blog/structured/pkg/task"
)
func doWork ( ctx context . Context ) error {
ctx , cancel := context . WithCancelCause ( ctx )
defer cancel ( nil )
g := NewGroup ( cancel )
g . Go ( func () error {
return task . Task ( ctx , "task1" , processingTime / 3 , nil )
})
g . Go ( func () error {
return task . Task ( ctx , "task2" , processingTime / 2 , errFail )
})
g . Do ( func () error {
return task . Task ( ctx , "task3" , processingTime , nil )
})
return g . Wait ()
}
This separates processing and orchestration, which is nice and makes our code much more readable and improves
testability.
Summary Separating orchestration from the processing code, we can reach simplified structured concurrency with improved
readability while eliminating some sources of resource leaks.
… continued in the next post .