Skip to main content

More Concurrency Bugs

·4 mins

More Real-world Usage #

Datadog has excellent software engineers. Nevertheless, it’s easy to find code with race conditions. Let’s examine a simplified version of waitForConfigsFromAD:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
	"fmt"
	"runtime"
	"time"

	"go.uber.org/atomic"
)

type Config struct{}

type Component interface {
	AddScheduler(s func([]Config))
}

func waitForConfigsFromAD(discoveryMinInstances int, ac Component) (configs []Config, returnErr error) {
	configChan := make(chan Config)

	// signal to the scheduler when we are no longer waiting, so we do not continue
	// to push items to configChan
	waiting := atomic.NewBool(true)
	defer func() {
		waiting.Store(false)
		// ..and drain any message currently pending in the channel
		select {
		case <-configChan:
		default:
		}
	}()

	// add the scheduler in a goroutine, since it will schedule any "catch-up" immediately,
	// placing items in configChan
	go ac.AddScheduler(func(configs []Config) {
		for _, cfg := range configs {
			if waiting.Load() {
				runtime.Gosched()
				configChan <- cfg
			}
		}
	})

	for len(configs) < discoveryMinInstances {
		cfg := <-configChan
		configs = append(configs, cfg)
	}

	return
}

Side note:1 This code is used in the Agent Check Status CLI. You will not notice resource leaks there.

What Are the Issues? #

Utilizing an atomic Boolean as a “finished” indicator, coupled with a deferred goroutine to set it and subsequently draining a channel seems clever2 to me.

Unfortunately, it has a race condition. When we first test the atomic waiting (seeing it be true), and then in parallel exit waitForConfigsFromAD, spawning the deferred goroutine which tries to drain the channel, we leak the goroutine at line 38 because no one will ever read from configChan.

Try it on the Go Playground.

An Alternative Implementation #

Let us try a synchronous approach. The original is a little more complicated, but let’s simply assume we want to collect configurations until we either:

  • collected a fixed number.
  • encountered a configuration error.
  • canceled the passed context, for example in a time out.

Also, we are interested in the list of encountered errors. On cancelation we just return what we have so far, without signaling an error.

Simply put, we need a collector that allows us to wait until it is finished collecting according to the criteria above and ask what it has collected so far. Something like:

type Collector struct {
	// ...
}

func NewCollector(discoveryMinInstances int) *Collector {
	// ...
}

func (c *Collector) Schedule(configs []Config) {
	// ...
}

func (c *Collector) Done() <-chan struct{} {
	// ...
}

func (c *Collector) Result() ([]Config, error) {
	// ...
}

Given that, we can reimplement waitForConfigsFromAD:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func waitForConfigsFromAD(ctx context.Context, discoveryMinInstances int, ac Component) ([]Config, error) {
	c := NewCollector(discoveryMinInstances)

	ac.AddScheduler(c.Schedule)

	select {
	case <-ctx.Done():
	case <-c.Done():
	}

	// ac.RemoveScheduler(c.Schedule)

	return c.Result()
}

Simple, synchronous code - we could even think about removing the scheduler from the component after it is done.

Now everything else falls into place:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
type Collector struct {
	discoveryMinInstances int
	mu                    sync.Mutex // protects configs, errors
	configs               []Config
	errors                []error
	done                  chan struct{}
	setDone               func()
}

func NewCollector(discoveryMinInstances int) *Collector {
	done := make(chan struct{})
	setDone := sync.OnceFunc(func() { close(done) })
	return &Collector{
		discoveryMinInstances: discoveryMinInstances,
		done:                  done,
		setDone:               setDone,
	}
}

func (c *Collector) Schedule(configs []Config) {
	for _, cfg := range configs {
		if filterErrors := filterInstances(cfg); len(filterErrors) > 0 {
			c.addErrors(filterErrors)
			c.setDone()

			continue
		}

		if !c.addConfig(cfg) {
			c.setDone()
		}
	}
}

func (c *Collector) Done() <-chan struct{} {
	return c.done
}

func (c *Collector) Result() ([]Config, error) {
	c.mu.Lock()
	defer c.mu.Unlock()

	configs := c.configs
	c.configs = nil

	err := errors.Join(c.errors...)
	c.errors = nil

	return configs, err
}

func (c *Collector) addConfig(cfg Config) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	if len(c.configs) < c.discoveryMinInstances {
		c.configs = append(c.configs, cfg)
	}

	return len(c.configs) < c.discoveryMinInstances
}

func (c *Collector) addErrors(errs []error) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if len(errs) > 0 {
		c.errors = append(c.errors, errs...)
	}
}

There’s definite room for improvement here, but the key takeaway is that it can just be written down and it is easily testable.

Summary #

We replaced asynchronous code with a race condition with a synchronous, thread safe implementation. Like in previous posts, refactoring and separation of concerns helps structuring our tasks and avoid errors.


  1. This blog is not intended to assign blame or irresponsibly publish bugs. We are here to learn. ↩︎

  2. “Clear is better than clever”. Rob Pike. 2015. Go Proverbs with Rob Pike — Gopherfest 2015 — <golang.org/doc/effective_go.html#sharing↩︎