Skip to content

Commit

Permalink
Drop support for go 1.18
Browse files Browse the repository at this point in the history
Use the new atomic package API introduced in go 1.19:

* Replace atomic.Value with atomic.Pointer
* Replace atomic.Add*/Store*/Load*/etc. with new Int*/Uint* atomic types
* Except the atomic.Value used in filewatcher, as to take full advantage
  of that we would need to make breaking API changes to filewatcher
  • Loading branch information
fishy committed Dec 1, 2022
1 parent 8b9e2c7 commit 2dd3ab8
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 167 deletions.
1 change: 0 additions & 1 deletion .github/workflows/go-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
strategy:
matrix:
go-version:
- 1.18
- 1.19

container:
Expand Down
8 changes: 4 additions & 4 deletions clientpool/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type channelPool struct {
pool chan Client
opener ClientOpener
numActive int32
numActive atomic.Int32
initialClients int
maxClients int
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func NewChannelPool(initialClients, maxClients int, opener ClientOpener) (Pool,
func (cp *channelPool) Get() (client Client, err error) {
defer func() {
if err == nil {
atomic.AddInt32(&cp.numActive, 1)
cp.numActive.Add(1)
}
}()

Expand Down Expand Up @@ -96,7 +96,7 @@ func (cp *channelPool) Release(c Client) error {

// As long as c is not nil, we always need to decrease numActive by 1,
// even if we encounter errors here, either due to close or opener.
defer atomic.AddInt32(&cp.numActive, -1)
defer cp.numActive.Add(-1)

if !c.IsOpen() {
// Even when c.IsOpen reported false, still call Close explicitly to avoid
Expand Down Expand Up @@ -134,7 +134,7 @@ func (cp *channelPool) Close() error {

// NumActiveClients returns the number of clients curently given out for use.
func (cp *channelPool) NumActiveClients() int32 {
return atomic.LoadInt32(&cp.numActive)
return cp.numActive.Load()
}

// NumAllocated returns the number of allocated clients in internal pool.
Expand Down
10 changes: 5 additions & 5 deletions clientpool/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ func TestChannelPoolInvalidConfig(t *testing.T) {
}

func TestChannelPool(t *testing.T) {
opener := func(called *int32) clientpool.ClientOpener {
opener := func(called *atomic.Int32) clientpool.ClientOpener {
return func() (clientpool.Client, error) {
if called != nil {
atomic.AddInt32(called, 1)
called.Add(1)
}
return &testClient{}, nil
}
}

const min, max = 2, 5
var openerCalled int32
var openerCalled atomic.Int32
pool, err := clientpool.NewChannelPool(min, max, opener(&openerCalled))
if err != nil {
t.Fatal(err)
Expand All @@ -44,10 +44,10 @@ func TestChannelPool(t *testing.T) {
func TestChannelPoolWithOpenerFailure(t *testing.T) {
// In this opener, every other call will fail
opener := func() clientpool.ClientOpener {
var called int32
var called atomic.Int32
failure := errors.New("failed")
return func() (clientpool.Client, error) {
if atomic.AddInt32(&called, 1)%2 == 0 {
if called.Add(1)%2 == 0 {
return nil, failure
}
return &testClient{}, nil
Expand Down
38 changes: 19 additions & 19 deletions clientpool/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func checkActiveAndAllocated(t *testing.T, pool clientpool.Pool, expectedActive,
}
}

func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max int) {
func testPool(t *testing.T, pool clientpool.Pool, openerCalled *atomic.Int32, min, max int) {
t.Run(
"drain-the-pool",
func(t *testing.T) {
Expand All @@ -57,7 +57,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, min, 0)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -71,7 +71,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, min+1, 0)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -87,7 +87,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, max, 0)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -103,20 +103,20 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
)
}

beforeOpenerCalled := atomic.LoadInt32(openerCalled)
beforeOpenerCalled := openerCalled.Load()
_, err := pool.Get()
if err == nil {
t.Error("pool.Get expected error, got nil")
}

diff := atomic.LoadInt32(openerCalled) - beforeOpenerCalled
diff := openerCalled.Load() - beforeOpenerCalled
if diff != 0 {
t.Errorf("pool.Get should not call opener, called %d times", diff)
}

checkActiveAndAllocated(t, pool, max, 0)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -135,27 +135,27 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
// Close the client in the pool
c.closed = true

beforeOpenerCalled := atomic.LoadInt32(openerCalled)
beforeOpenerCalled := openerCalled.Load()
newc, err := pool.Get()
if err != nil {
t.Fatalf("pool.Get returned error: %v", err)
}
if !newc.IsOpen() {
t.Error("pool.Get returned closed client")
}
diff := atomic.LoadInt32(openerCalled) - beforeOpenerCalled
diff := openerCalled.Load() - beforeOpenerCalled
if diff != 1 {
t.Error("opener not called with closed client")
}

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

t.Run(
"release-min-closed-clients",
func(t *testing.T) {
beforeOpenerCalled := atomic.LoadInt32(openerCalled)
beforeOpenerCalled := openerCalled.Load()

for i := 0; i < min; i++ {
c := &testClient{
Expand All @@ -166,7 +166,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
}
}

diff := atomic.LoadInt32(openerCalled) - beforeOpenerCalled
diff := openerCalled.Load() - beforeOpenerCalled
if int(diff) != min {
t.Errorf(
"Expected opener to be called %d times, called %d times instead",
Expand All @@ -177,14 +177,14 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, max-min, min)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

t.Run(
"release-to-max-minus-1",
func(t *testing.T) {
beforeOpenerCalled := atomic.LoadInt32(openerCalled)
beforeOpenerCalled := openerCalled.Load()

for i := 0; i < max-min-1; i++ {
c := &testClient{}
Expand All @@ -193,7 +193,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
}
}

diff := atomic.LoadInt32(openerCalled) - beforeOpenerCalled
diff := openerCalled.Load() - beforeOpenerCalled
if diff != 0 {
t.Errorf(
"Didn't expect opener to be called, called %d times instead",
Expand All @@ -203,7 +203,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, 1, max-1)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -222,7 +222,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max

checkActiveAndAllocated(t, pool, 0, max)

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand All @@ -240,7 +240,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
t.Error("pool.Release did not close extra released client")
}

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)

Expand Down Expand Up @@ -280,7 +280,7 @@ func testPool(t *testing.T, pool clientpool.Pool, openerCalled *int32, min, max
t.Error("pool.Close did not close client")
}

t.Logf("opener called %d times", atomic.LoadInt32(openerCalled))
t.Logf("opener called %d times", openerCalled.Load())
},
)
}
6 changes: 3 additions & 3 deletions drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ type HealthCheckCloser interface {
}

type drainer struct {
closed int64
closed atomic.Int64
}

func (d *drainer) IsHealthy(_ context.Context) bool {
return atomic.LoadInt64(&d.closed) == 0
return d.closed.Load() == 0
}

func (d *drainer) Close() error {
atomic.StoreInt64(&d.closed, 1)
d.closed.Store(1)
return nil
}

Expand Down
27 changes: 7 additions & 20 deletions ecinterface/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,11 @@ var Logger log.Wrapper
// ErrGetBeforeSet is the error returned when Get is called before Set.
var ErrGetBeforeSet = errors.New("ecinterface: Get called before Set is called")

// current is the storage type of global.
//
// atomic.Value requires that the underlying concrete type remain constant.
// If we try to store two different implementations of Interface, we will get a panic,
// because Interface is promoted to any when you call Store.
//
// Thus, we use a `current{}` so that the concrete type is always the same.
type current struct {
Interface
}

// actual type: current
var global atomic.Value
var global atomic.Pointer[Interface]

// Set sets the global edge context implementation.
func Set(impl Interface) {
global.Store(current{impl})
global.Store(&impl)
}

// Get returns the previously Set global edge context implementation.
Expand All @@ -60,13 +48,12 @@ func Set(impl Interface) {
//
// - Its ContextToHeader always return ("", false).
func Get() Interface {
stored := global.Load()
if stored == nil {
Logger.Log(context.Background(), ErrGetBeforeSet.Error())
getBeforeSet.Inc()
return nopImpl
if stored := global.Load(); stored != nil {
return *stored
}
return stored.(current).Interface
Logger.Log(context.Background(), ErrGetBeforeSet.Error())
getBeforeSet.Inc()
return nopImpl
}

type nop struct{}
Expand Down
2 changes: 0 additions & 2 deletions ecinterface/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func BenchmarkAtomics(b *testing.B) {
b.Fatalf("this is just to avoid eliding the call")
}
})
/* If you have go1.19 with the new atomic APIs, you can throw this one in for fun:
b.Run("atomicPointer", func(b *testing.B) {
var global atomic.Pointer[Interface]
var impl Interface = nop{}
Expand All @@ -102,5 +101,4 @@ func BenchmarkAtomics(b *testing.B) {
b.Fatalf("this is just to avoid eliding the call")
}
})
*/
}
12 changes: 6 additions & 6 deletions events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestV2(t *testing.T) {

// put
var wg sync.WaitGroup
var failed int64
var failed atomic.Int64
const expectedFailures = 1
const n = queueSize + expectedFailures

Expand All @@ -58,7 +58,7 @@ func TestV2(t *testing.T) {
before := time.Now()
if err := v2.Put(ctx, mockTStruct{}); err != nil {
t.Log("Put failed with:", err)
atomic.AddInt64(&failed, 1)
failed.Add(1)
}
elapsed := time.Since(before)
if elapsed > tripleTime {
Expand All @@ -72,7 +72,7 @@ func TestV2(t *testing.T) {
}
wg.Wait()

actualFailures := atomic.LoadInt64(&failed)
actualFailures := failed.Load()
if actualFailures != expectedFailures {
t.Errorf(
"Expected %d failed Put call, actual %d",
Expand All @@ -99,7 +99,7 @@ func TestV2(t *testing.T) {
}

// PutRaw
atomic.StoreInt64(&failed, 0)
failed.Store(0)
const rawData = "hello, world"
wg.Add(n)
for i := 0; i < n; i++ {
Expand All @@ -110,7 +110,7 @@ func TestV2(t *testing.T) {
before := time.Now()
if err := v2.PutRaw(ctx, []byte(rawData)); err != nil {
t.Log("PutRaw failed with:", err)
atomic.AddInt64(&failed, 1)
failed.Add(1)
}
elapsed := time.Since(before)
if elapsed > tripleTime {
Expand All @@ -124,7 +124,7 @@ func TestV2(t *testing.T) {
}
wg.Wait()

actualFailures = atomic.LoadInt64(&failed)
actualFailures = failed.Load()
if actualFailures != expectedFailures {
t.Errorf(
"Expected %d failed Put call, actual %d",
Expand Down
Loading

0 comments on commit 2dd3ab8

Please sign in to comment.