Improve: fix go test race detect

This commit is contained in:
Dreamacro 2020-07-18 20:56:13 +08:00
parent 6c7a8fffe0
commit cf9e1545a4
3 changed files with 19 additions and 12 deletions

View file

@ -1,8 +1,8 @@
package observable package observable
import ( import (
"runtime"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -38,20 +38,20 @@ func TestObservable_MutilSubscribe(t *testing.T) {
src := NewObservable(iter) src := NewObservable(iter)
ch1, _ := src.Subscribe() ch1, _ := src.Subscribe()
ch2, _ := src.Subscribe() ch2, _ := src.Subscribe()
count := 0 var count int32
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
waitCh := func(ch <-chan interface{}) { waitCh := func(ch <-chan interface{}) {
for range ch { for range ch {
count++ atomic.AddInt32(&count, 1)
} }
wg.Done() wg.Done()
} }
go waitCh(ch1) go waitCh(ch1)
go waitCh(ch2) go waitCh(ch2)
wg.Wait() wg.Wait()
assert.Equal(t, count, 10) assert.Equal(t, int32(10), count)
} }
func TestObservable_UnSubscribe(t *testing.T) { func TestObservable_UnSubscribe(t *testing.T) {
@ -82,9 +82,6 @@ func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {
} }
func TestObservable_SubscribeGoroutineLeak(t *testing.T) { func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
// waiting for other goroutine recycle
time.Sleep(120 * time.Millisecond)
init := runtime.NumGoroutine()
iter := iterator([]interface{}{1, 2, 3, 4, 5}) iter := iterator([]interface{}{1, 2, 3, 4, 5})
src := NewObservable(iter) src := NewObservable(iter)
max := 100 max := 100
@ -107,6 +104,12 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
go waitCh(ch) go waitCh(ch)
} }
wg.Wait() wg.Wait()
now := runtime.NumGoroutine()
assert.Equal(t, init, now) for _, sub := range list {
_, more := <-sub
assert.False(t, more)
}
_, more := <-list[0]
assert.False(t, more)
} }

View file

@ -44,9 +44,12 @@ func (s *Single) Do(fn func() (interface{}, error)) (v interface{}, err error, s
s.mux.Unlock() s.mux.Unlock()
call.val, call.err = fn() call.val, call.err = fn()
call.wg.Done() call.wg.Done()
s.mux.Lock()
s.call = nil s.call = nil
s.result = &Result{call.val, call.err} s.result = &Result{call.val, call.err}
s.last = now s.last = now
s.mux.Unlock()
return call.val, call.err, false return call.val, call.err, false
} }

View file

@ -2,6 +2,7 @@ package singledo
import ( import (
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -11,7 +12,7 @@ import (
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
single := NewSingle(time.Millisecond * 30) single := NewSingle(time.Millisecond * 30)
foo := 0 foo := 0
shardCount := 0 var shardCount int32 = 0
call := func() (interface{}, error) { call := func() (interface{}, error) {
foo++ foo++
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
@ -25,7 +26,7 @@ func TestBasic(t *testing.T) {
go func() { go func() {
_, _, shard := single.Do(call) _, _, shard := single.Do(call)
if shard { if shard {
shardCount++ atomic.AddInt32(&shardCount, 1)
} }
wg.Done() wg.Done()
}() }()
@ -33,7 +34,7 @@ func TestBasic(t *testing.T) {
wg.Wait() wg.Wait()
assert.Equal(t, 1, foo) assert.Equal(t, 1, foo)
assert.Equal(t, 4, shardCount) assert.Equal(t, int32(4), shardCount)
} }
func TestTimer(t *testing.T) { func TestTimer(t *testing.T) {