Skip to content

Commit

Permalink
libxc/binance: Fix orderbook sync
Browse files Browse the repository at this point in the history
The orderbook sync did not account for the first update to the orderbook
straddling the last update ID of the snapshot.
  • Loading branch information
martonp committed Aug 11, 2024
1 parent 1efedb4 commit 6844bb7
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
syncChan := make(chan struct{})
b.syncChan = syncChan
var updateID uint64 = updateIDUnsynced
acceptedUpdate := false

resyncChan := make(chan struct{}, 1)

Expand All @@ -165,6 +166,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
syncMtx.Lock()
defer syncMtx.Unlock()
syncCache = make([]*bntypes.BookUpdate, 0)
acceptedUpdate = false
if updateID != updateIDUnsynced {
b.synced.Store(false)
updateID = updateIDUnsynced
Expand All @@ -178,11 +180,25 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
syncCache = append(syncCache, update)
return true
}
if update.FirstUpdateID != updateID+1 {
// Trigger a resync.

if !acceptedUpdate {
// On the first update we receive, the update may straddle the last
// update ID of the snapshot. If the first update ID is greater
// than the snapshot ID + 1, it means we missed something so we
// must resync. If the last update ID is less than or equal to the
// snapshot ID, we can ignore it.
if update.FirstUpdateID > updateID+1 {
return false
} else if update.LastUpdateID <= updateID {
return true
}
// Once we've accepted the first update, the updates must be in
// sequence.
} else if update.FirstUpdateID != updateID+1 {
return false
}
// Properly sequenced.

acceptedUpdate = true
updateID = update.LastUpdateID
bids, asks, err := b.convertBinanceBook(update.Bids, update.Asks)
if err != nil {
Expand All @@ -197,15 +213,14 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
processSyncCache := func(snapshotID uint64) bool {
syncMtx.Lock()
defer syncMtx.Unlock()

updateID = snapshotID
for _, update := range syncCache {
if update.LastUpdateID <= snapshotID {
continue
}
if !acceptUpdate(update) {
return false
}
}

b.synced.Store(true)
if syncChan != nil {
close(syncChan)
Expand Down

0 comments on commit 6844bb7

Please sign in to comment.