I'm playing with some code for learning purposes and I am getting a race condition on its execution when using the -race flag and I want to understand why. The code starts a fixed set of goroutines that act as workers consuming tasks from a channel, there is no fixed number of tasks, as long as the channel receives tasks the workers must keep working.
I'm getting a race condition when calling the WaitGroup functions. From what I understand (taking a look at the data race report) the race condition happens when the first wg.Add call is executed by one of the spawned goroutines and the main routine calls wg.Wait at the same time. Is that correct? If it is, it means that I must always execute calls to Add on the main routine to avoid this kind of race on the resource? But, that also would mean that I need to know how many tasks the workers will need to handle in advance, which kinds of sucks if I need that the code handles any number of tasks that may come once the workers are running...
The code:
func Test(t *testing.T) {
t.Run("", func(t *testing.T) {
var wg sync.WaitGroup
queuedTaskC := make(chan func())
for i := 0; i < 5; i++ {
wID := i + 1
go func(workerID int) {
for task := range queuedTaskC {
wg.Add(1)
task()
}
}(wID)
}
taskFn := func() {
fmt.Println("executing task...")
wg.Done()
}
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
queuedTaskC <- taskFn
wg.Wait()
close(queuedTaskC)
fmt.Println(len(queuedTaskC))
})
}
The report:
==================
WARNING: DATA RACE
Read at 0x00c0001280d8 by goroutine 11:
internal/race.Read()
/src/internal/race/race.go:37 +0x206
sync.(*WaitGroup).Add()
/src/sync/waitgroup.go:71 +0x219
workerpool.Test.func1.1()
/workerpool/workerpool_test.go:36 +0x64
Previous write at 0x00c0001280d8 by goroutine 8:
internal/race.Write()
/src/internal/race/race.go:41 +0x125
sync.(*WaitGroup).Wait()
/src/sync/waitgroup.go:128 +0x126
workerpool.Test.func1()
/workerpool/workerpool_test.go:57 +0x292
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 11 (running) created at:
workerpool.Test.func1()
/workerpool/workerpool_test.go:34 +0xe4
testing.tRunner()
/src/testing/testing.go:1123 +0x202
Goroutine 8 (running) created at:
testing.(*T).Run()
/src/testing/testing.go:1168 +0x5bb
workerpool.Test()
workerpool_test.go:29 +0x4c
testing.tRunner()
/src/testing/testing.go:1123 +0x202
==================
WaitGroupimplementation is based on the internal counter which is changed byAddandDonemethods. TheWaitmethod will not return until the counter is zeroed. It is also possible to reuseWaitGroupbut under certain conditions described in the documentation:Although your code is not reusing
wgit's able to zero theWaitGroupcounter multiple times. This happens when no tasks are being processed at a given time, which is entirely possible in concurrent code. And since your code does not waitWaitto return before callingAddyou get the race condition error.As everyone suggests in the comments you should abandon the idea of tracking the task with
WaitGroupin favor of controlling running goroutines. Attaching the code proposal.