More Concurrency Bugs

More Real-world Usage

Datadog has excellent software engineers. Nevertheless, it’s easy to find code with race conditions. Let’s examine a simplified version of waitForConfigsFromAD:

 1package main
 2
 3import (
 4	"fmt"
 5	"runtime"
 6	"time"
 7
 8	"go.uber.org/atomic"
 9)
10
11type Config struct{}
12
13type Component interface {
14	AddScheduler(s func([]Config))
15}
16
17func waitForConfigsFromAD(discoveryMinInstances int, ac Component) (configs []Config, returnErr error) {
18	configChan := make(chan Config)
19
20	// signal to the scheduler when we are no longer waiting, so we do not continue
21	// to push items to configChan
22	waiting := atomic.NewBool(true)
23	defer func() {
24		waiting.Store(false)
25		// ..and drain any message currently pending in the channel
26		select {
27		case <-configChan:
28		default:
29		}
30	}()
31
32	// add the scheduler in a goroutine, since it will schedule any "catch-up" immediately,
33	// placing items in configChan
34	go ac.AddScheduler(func(configs []Config) {
35		for _, cfg := range configs {
36			if waiting.Load() {
37				runtime.Gosched()
38				configChan <- cfg
39			}
40		}
41	})
42
43	for len(configs) < discoveryMinInstances {
44		cfg := <-configChan
45		configs = append(configs, cfg)
46	}
47
48	return
49}

Side note:1 This code is used in the Agent Check Status CLI. You will not notice resource leaks there.

What Are the Issues?

Utilizing an atomic Boolean as a “finished” indicator, coupled with a deferred goroutine to set it and subsequently draining a channel seems clever2 to me.

Unfortunately, it has a race condition. When we first test the atomic waiting (seeing it be true), and then in parallel exit waitForConfigsFromAD, spawning the deferred goroutine which tries to drain the channel, we leak the goroutine at line 38 because no one will ever read from configChan.

Try it on the Go Playground.

An Alternative Implementation

Let us try a synchronous approach. The original is a little more complicated, but let’s simply assume we want to collect configurations until we either:

  • collected a fixed number.
  • encountered a configuration error.
  • canceled the passed context, for example in a time out.

Also, we are interested in the list of encountered errors. On cancelation we just return what we have so far, without signaling an error.

Simply put, we need a collector that allows us to wait until it is finished collecting according to the criteria above and ask what it has collected so far. Something like:

type Collector struct {
	// ...
}

func NewCollector(discoveryMinInstances int) *Collector {
	// ...
}

func (c *Collector) Schedule(configs []Config) {
	// ...
}

func (c *Collector) Done() <-chan struct{} {
	// ...
}

func (c *Collector) Result() ([]Config, error) {
	// ...
}

Given that, we can reimplement waitForConfigsFromAD:

 1func waitForConfigsFromAD(ctx context.Context, discoveryMinInstances int, ac Component) ([]Config, error) {
 2	c := NewCollector(discoveryMinInstances)
 3
 4	ac.AddScheduler(c.Schedule)
 5
 6	select {
 7	case <-ctx.Done():
 8	case <-c.Done():
 9	}
10
11	// ac.RemoveScheduler(c.Schedule)
12
13	return c.Result()
14}

Simple, synchronous code - we could even think about removing the scheduler from the component after it is done.

Now everything else falls into place:

 1type Collector struct {
 2	discoveryMinInstances int
 3	mu                    sync.Mutex // protects configs, errors
 4	configs               []Config
 5	errors                []error
 6	done                  chan struct{}
 7	setDone               func()
 8}
 9
10func NewCollector(discoveryMinInstances int) *Collector {
11	done := make(chan struct{})
12	setDone := sync.OnceFunc(func() { close(done) })
13	return &Collector{
14		discoveryMinInstances: discoveryMinInstances,
15		done:                  done,
16		setDone:               setDone,
17	}
18}
19
20func (c *Collector) Schedule(configs []Config) {
21	for _, cfg := range configs {
22		if filterErrors := filterInstances(cfg); len(filterErrors) > 0 {
23			c.addErrors(filterErrors)
24			c.setDone()
25
26			continue
27		}
28
29		if !c.addConfig(cfg) {
30			c.setDone()
31		}
32	}
33}
34
35func (c *Collector) Done() <-chan struct{} {
36	return c.done
37}
38
39func (c *Collector) Result() ([]Config, error) {
40	c.mu.Lock()
41	defer c.mu.Unlock()
42
43	configs := c.configs
44	c.configs = nil
45
46	err := errors.Join(c.errors...)
47	c.errors = nil
48
49	return configs, err
50}
51
52func (c *Collector) addConfig(cfg Config) bool {
53	c.mu.Lock()
54	defer c.mu.Unlock()
55
56	if len(c.configs) < c.discoveryMinInstances {
57		c.configs = append(c.configs, cfg)
58	}
59
60	return len(c.configs) < c.discoveryMinInstances
61}
62
63func (c *Collector) addErrors(errs []error) {
64	c.mu.Lock()
65	defer c.mu.Unlock()
66
67	if len(errs) > 0 {
68		c.errors = append(c.errors, errs...)
69	}
70}

There’s definite room for improvement here, but the key takeaway is that it can just be written down and it is easily testable.

Summary

We replaced asynchronous code with a race condition with a synchronous, thread safe implementation. Like in previous posts, refactoring and separation of concerns helps structuring our tasks and avoid errors.


  1. This blog is not intended to assign blame or irresponsibly publish bugs. We are here to learn. ↩︎

  2. “Clear is better than clever”. Rob Pike. 2015. Go Proverbs with Rob Pike — Gopherfest 2015 — <golang.org/doc/effective_go.html#sharing↩︎