Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data race due to "Unsynchronized send and close operations" #70

Closed
hongkuancn opened this issue Aug 29, 2024 · 3 comments · Fixed by #72
Closed

Data race due to "Unsynchronized send and close operations" #70

hongkuancn opened this issue Aug 29, 2024 · 3 comments · Fixed by #72

Comments

@hongkuancn
Copy link
Collaborator

Hi!

I came across a data race with the test

func TestPoolWithCustomIdleTimeoutNew(t *testing.T) {

	pool := pond.New(10, 15, pond.IdleTimeout(100*time.Millisecond))

	for i := 0; i < 10; i++ {
		pool.Submit(func() {
			time.Sleep(10 * time.Millisecond)
		})
	}

	assertEqual(t, 10, pool.RunningWorkers())

	time.Sleep(200 * time.Millisecond)

	pool.StopAndWait()
}
➜  pond git:(main) ✗ go test -race -v -coverprofile=coverage.out -covermode=atomic -run TestPoolWithCustomIdleTimeoutNew
=== RUN   TestPoolWithCustomIdleTimeoutNew
==================
WARNING: DATA RACE
Write at 0x00c000128130 by goroutine 6:
  runtime.closechan()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/runtime/chan.go:357 +0x0
  github.com/alitto/pond.(*WorkerPool).stop.func1()
      /Users/whk/Documents/code/pond/pond.go:375 +0x56
  sync.(*Once).doSlow()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/sync/once.go:74 +0xf0
  sync.(*Once).Do()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/sync/once.go:65 +0x44
  github.com/alitto/pond.(*WorkerPool).stop()
      /Users/whk/Documents/code/pond/pond.go:374 +0x153
  github.com/alitto/pond.(*WorkerPool).StopAndWait()
      /Users/whk/Documents/code/pond/pond.go:334 +0x25b
  github.com/alitto/pond_test.TestPoolWithCustomIdleTimeoutNew()
      /Users/whk/Documents/code/pond/pond_blackbox_test.go:412 +0x1fb
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x44

Previous read at 0x00c000128130 by goroutine 7:
  runtime.chansend()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/runtime/chan.go:160 +0x0
  github.com/alitto/pond.(*WorkerPool).maybeStopIdleWorker()
      /Users/whk/Documents/code/pond/pond.go:409 +0x355
  github.com/alitto/pond.(*WorkerPool).purge()
      /Users/whk/Documents/code/pond/pond.go:393 +0x356
  github.com/alitto/pond.New.gowrap1()
      /Users/whk/Documents/code/pond/pond.go:144 +0x33

Goroutine 6 (running) created at:
  testing.(*T).Run()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x825
  testing.runTests.func1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2161 +0x85
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.runTests()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2159 +0x8be
  testing.(*M).Run()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:2027 +0xf17
  main.main()
      _testmain.go:159 +0x2e4

Goroutine 7 (finished) created at:
  github.com/alitto/pond.New()
      /Users/whk/Documents/code/pond/pond.go:144 +0x6ad
  github.com/alitto/pond_test.TestPoolWithCustomIdleTimeoutNew()
      /Users/whk/Documents/code/pond/pond_blackbox_test.go:380 +0xe4
  testing.tRunner()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /Users/whk/go/pkg/mod/golang.org/toolchain@v0.0.1-go1.22.6.darwin-amd64/src/testing/testing.go:1742 +0x44
==================
    testing.go:1398: race detected during execution of test
--- FAIL: TestPoolWithCustomIdleTimeoutNew (0.20s)
FAIL

It seems the issue is related to purge() send nil task to the task channel to recycle the workers. Meanwhile, stop() close the task channel. The race detector will report the issue according to Unsynchronized send and close operations | Data Race Detector - The Go Programming Language

@alitto
Copy link
Owner

alitto commented Aug 31, 2024

Hey @hongkuancn, thanks for opening this issue. It looks like this send inside maybeStopIdleWorker() should be skipped if the context has been canceled already. E.g.:

// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) maybeStopIdleWorker() bool {

	if decremented := p.decrementWorkerCount(); !decremented {
		return false
	}
	
	// If the pool context has been canceled the tasks channel could be closed, so sending a nil to it will panic
	select {
	    case p.context.Done():
	      return false
	    default:
	}

	// Send a nil task to stop an idle worker
	p.tasks <- nil

	return true
}

What do you think? Feel free to open a pull request with these changes and I'll be glad to merge it.

@hongkuancn
Copy link
Collaborator Author

hongkuancn commented Sep 1, 2024

Hey @alitto ! Thanks for the suggestion. I noticed there might still be synchronization issue after your change

   stop               purge     
               │                
               │                
               │    new select  
               │◄───────────────
 cancel context│                
──────────────►│                
               │                
               │                
 close channel │                
──────────────►│   task <- nil  
               │◄───────────────
               │                
               ▼                

I just looked back the history and come up with an idea.

The issue is probably brought in by Pull Request #62. It wants to stop the pool with long running tasks when the context is canceled. However, when draining the tasks, it depends on a closed task channel. Worker goroutines are blocked because of this task channel. So the PR moves close(p.tasks) before p.workersWaitGroup.Wait() in stop() method. But this move can't guarantee the purge goroutine stopped, which leads to the data race.

So I suggest to move close(p.tasks) back and there is another unblocked way to drain the tasks instead of relying on a closed task channel.

func drainTasks(tasks <-chan func(), tasksWaitGroup *sync.WaitGroup) {
	for {
		select {
		case task, ok := <-tasks:
			if task != nil && ok {
				tasksWaitGroup.Done()
			}
		default:
			return
		}
	}
}

So after the change, the stop() method has the following steps:

  1. Wait the tasks done. (If context is canceled explicitly, long running tasks will be drained beforehand. Task waitgroup won't be blocked. All worker and purge goroutines are probably returned here, otherwise goroutines are returned in step 3)
  2. Reset worker count.
  3. Cancel the context.
  4. Wait all worker and purge goroutines done.
  5. Close the task channel.

What do you think? Please let me know if anything I missed out.

@alitto
Copy link
Owner

alitto commented Sep 1, 2024

That makes a lot of sense, yes. I overlooked that change in #62. As a general rule of thumb, a writable channel shared with N goroutines can only be closed after all of them have returned.
I have merged both of your PRs and released them as part of v1.9.2 🚀
Thank you for your contributions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants