Skip to content

Commit

Permalink
feat(concurrency): adding lo.WaitFor
Browse files Browse the repository at this point in the history
  • Loading branch information
samber committed Jun 28, 2024
1 parent c23c040 commit aaebc3f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ Concurrency helpers:
- [Synchronize](#synchronize)
- [Async](#async)
- [Transaction](#transaction)
- [WaitFor](#waitfor)

Error handling:

Expand Down Expand Up @@ -2837,6 +2838,30 @@ _, _ = transaction.Process(-5)
// rollback 1
```

### WaitFor

Runs periodically until a condition is validated.

```go
alwaysTrue := func(i int) bool { return true }
alwaysFalse := func(i int) bool { return false }
laterTrue := func(i int) bool {
return i > 5
}

ok := lo.WaitFor(alwaysTrue, 10*time.Millisecond, time.Millisecond)
// true

ok := lo.WaitFor(alwaysFalse, 10*time.Millisecond, time.Millisecond)
// false

ok := lo.WaitFor(laterTrue, 10*time.Millisecond, time.Millisecond)
// true

ok := lo.WaitFor(laterTrue, 10*time.Millisecond, 5*time.Millisecond)
// false
```

### Validate

Helper function that creates an error when a condition is not met.
Expand Down
37 changes: 36 additions & 1 deletion concurrency.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package lo

import "sync"
import (
"sync"
"time"
)

type synchronize struct {
locker sync.Locker
Expand Down Expand Up @@ -93,3 +96,35 @@ func Async6[A, B, C, D, E, F any](f func() (A, B, C, D, E, F)) <-chan Tuple6[A,
}()
return ch
}

// WaitFor runs periodically until a condition is validated.
func WaitFor(condition func(i int) bool, maxDuration time.Duration, tick time.Duration) (int, time.Duration, bool) {
if condition(0) {
return 1, 0, true
}

start := time.Now()

timer := time.NewTimer(maxDuration)
ticker := time.NewTicker(tick)

defer func() {
timer.Stop()
ticker.Stop()
}()

i := 1

for {
select {
case <-timer.C:
return i, time.Since(start), false
case <-ticker.C:
if condition(i) {
return i + 1, time.Since(start), true
}

i++
}
}
}
44 changes: 44 additions & 0 deletions concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,47 @@ func TestAsyncX(t *testing.T) {
}
}
}

func TestWaitFor(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

alwaysTrue := func(i int) bool { return true }
alwaysFalse := func(i int) bool { return false }

iter, duration, ok := WaitFor(alwaysTrue, 10*time.Millisecond, time.Millisecond)
is.Equal(1, iter)
is.Equal(time.Duration(0), duration)
is.True(ok)
iter, duration, ok = WaitFor(alwaysFalse, 10*time.Millisecond, time.Millisecond)
is.Equal(10, iter)
is.InEpsilon(10*time.Millisecond, duration, float64(500*time.Microsecond))
is.False(ok)

laterTrue := func(i int) bool {
return i >= 5
}

iter, duration, ok = WaitFor(laterTrue, 10*time.Millisecond, time.Millisecond)
is.Equal(6, iter)
is.InEpsilon(6*time.Millisecond, duration, float64(500*time.Microsecond))
is.True(ok)
iter, duration, ok = WaitFor(laterTrue, 10*time.Millisecond, 5*time.Millisecond)
is.Equal(2, iter)
is.InEpsilon(10*time.Millisecond, duration, float64(500*time.Microsecond))
is.False(ok)

counter := 0

alwaysFalse = func(i int) bool {
is.Equal(counter, i)
counter++
return false
}

iter, duration, ok = WaitFor(alwaysFalse, 10*time.Millisecond, time.Millisecond)
is.Equal(10, iter)
is.InEpsilon(10*time.Millisecond, duration, float64(500*time.Microsecond))
is.False(ok)
}

0 comments on commit aaebc3f

Please sign in to comment.