diff --git a/lib/go/thrift/serializer_test.go b/lib/go/thrift/serializer_test.go index 78b67453bc5..425ce0691bc 100644 --- a/lib/go/thrift/serializer_test.go +++ b/lib/go/thrift/serializer_test.go @@ -243,7 +243,7 @@ func TestSerializer(t *testing.T) { func TestSerializerPoolAsync(t *testing.T) { var wg sync.WaitGroup - var counter int64 + var counter atomic.Int64 s := NewTSerializerPool(NewTSerializer) d := NewTDeserializerPool(NewTDeserializer) f := func(i int64) bool { @@ -251,7 +251,7 @@ func TestSerializerPoolAsync(t *testing.T) { go func() { defer wg.Done() t.Run( - fmt.Sprintf("#%d-%d", atomic.AddInt64(&counter, 1), i), + fmt.Sprintf("#%d-%d", counter.Add(1), i), func(t *testing.T) { m := MyTestStruct{ Int64: i, diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index 31dfa1e6d5d..c5c14feed5f 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -54,7 +54,7 @@ var ServerStopTimeout = time.Duration(0) * This will work if golang user implements a conn-pool like thing in client side. */ type TSimpleServer struct { - closed int32 + closed atomic.Int32 wg sync.WaitGroup mu sync.Mutex stopChan chan struct{} @@ -186,7 +186,7 @@ func (p *TSimpleServer) innerAccept() (int32, error) { client, err := p.serverTransport.Accept() p.mu.Lock() defer p.mu.Unlock() - closed := atomic.LoadInt32(&p.closed) + closed := p.closed.Load() if closed != 0 { return closed, nil } @@ -246,10 +246,10 @@ func (p *TSimpleServer) Stop() error { p.mu.Lock() defer p.mu.Unlock() - if atomic.LoadInt32(&p.closed) != 0 { + if !p.closed.CompareAndSwap(0, 1) { + // Already closed return nil } - atomic.StoreInt32(&p.closed, 1) p.serverTransport.Interrupt() ctx, cancel := context.WithCancel(context.Background()) @@ -328,7 +328,7 @@ func (p *TSimpleServer) processRequests(client TTransport) (err error) { defer outputTransport.Close() } for { - if atomic.LoadInt32(&p.closed) != 0 { + if p.closed.Load() != 0 { return nil } diff --git a/lib/go/thrift/socket_conn.go b/lib/go/thrift/socket_conn.go index bbb5b7d15c0..dfd0913abcd 100644 --- a/lib/go/thrift/socket_conn.go +++ b/lib/go/thrift/socket_conn.go @@ -30,7 +30,7 @@ type socketConn struct { net.Conn buffer [1]byte - closed int32 + closed atomic.Int32 } var _ net.Conn = (*socketConn)(nil) @@ -67,7 +67,7 @@ func wrapSocketConn(conn net.Conn) *socketConn { // It's the same as the previous implementation of TSocket.IsOpen and // TSSLSocket.IsOpen before we added connectivity check. func (sc *socketConn) isValid() bool { - return sc != nil && sc.Conn != nil && atomic.LoadInt32(&sc.closed) == 0 + return sc != nil && sc.Conn != nil && sc.closed.Load() == 0 } // IsOpen checks whether the connection is open. @@ -119,6 +119,6 @@ func (sc *socketConn) Close() error { // Already closed return net.ErrClosed } - atomic.StoreInt32(&sc.closed, 1) + sc.closed.Store(1) return sc.Conn.Close() }