Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer backup #8490

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion peer/brontide.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const (

// ErrorBufferSize is the number of historic peer errors that we store.
ErrorBufferSize = 10

backupUpdateInterval = 1 * time.Second
)

var (
Expand Down Expand Up @@ -518,6 +520,7 @@ type Brontide struct {
// backupData is an in-memory store for data that we store for our
// peers.
backupData lnwire.PeerStorageBlob
bMtx *sync.Cond
}

// A compile-time check to ensure that Brontide satisfies the lnpeer.Peer interface.
Expand Down Expand Up @@ -549,6 +552,7 @@ func NewBrontide(cfg Config) *Brontide {
startReady: make(chan struct{}),
quit: make(chan struct{}),
log: build.NewPrefixLog(logPrefix, peerLog),
bMtx: sync.NewCond(&sync.Mutex{}),
}

var (
Expand Down Expand Up @@ -752,10 +756,14 @@ func (p *Brontide) Start() error {
return fmt.Errorf("could not start ping manager %w", err)
}

p.wg.Add(4)
p.wg.Add(5)
go p.queueHandler()
go p.writeHandler()
go p.channelManager()

// Initialize peerStorageWriter before readHandler to ensure readiness
// for storing `PeerStorage` messages upon receipt.
go p.peerStorageWriter()
go p.readHandler()

// Signal to any external processes that the peer is now active.
Expand All @@ -777,6 +785,75 @@ func (p *Brontide) Start() error {
return nil
}

// peerStorageWriter coordinates persisting peer's backup data by delaying its
// storage from its time of receipt to its time of persistence by the
// duration specified in the `backupUpdateInterval`.
func (p *Brontide) peerStorageWriter() {
defer p.wg.Done()

var data []byte

Loop:
p.bMtx.L.Lock()
for {
p.bMtx.Wait()

// Store the data in a different variable as we are about to
// exit lock.
data = p.backupData
p.bMtx.L.Unlock()

// Check if we are to exit, now that we are awake.
select {
case <-p.quit:
if data == nil {
return
}

// Store the data immediately and exit.
err := p.cfg.PeerDataStore.Store(data)
if err != nil {
peerLog.Warnf("Failed to store peer "+
"backup data: %v", err)
}

return
Comment on lines +828 to +840
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the main PR thread this is not the approach we want to take. You shouldn't need a separate peerStorageWriter thread. It is fine to persist it in the main handler thread, or fork a single goroutine for overwriting the data so it doesn't block the main readHandler thread.

This will also alleviate the need to use Conds which are notoriously difficult to use correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean in the Store method?


default:
}

break
}

t := time.NewTicker(backupUpdateInterval)

select {
case <-t.C:
// Store the data.
err := p.cfg.PeerDataStore.Store(data)
if err != nil {
peerLog.Criticalf("Failed to store peer "+
"backup data: %v", err)
}

goto Loop

case <-p.quit:
if data == nil {
return
}

// Store the data immediately and exit.
err := p.cfg.PeerDataStore.Store(data)
if err != nil {
peerLog.Warnf("Failed to store peer backup "+
"data: %v", err)
}

return
}
}

// initGossipSync initializes either a gossip syncer or an initial routing
// dump, depending on the negotiated synchronization method.
func (p *Brontide) initGossipSync() {
Expand Down Expand Up @@ -1286,6 +1363,10 @@ func (p *Brontide) Disconnect(reason error) {

close(p.quit)

// Signal the peerStorageWriter to stop waiting and pick up the close
// signal.
p.bMtx.Signal()

if err := p.pingManager.Stop(); err != nil {
p.log.Errorf("couldn't stop pingManager during disconnect: %v",
err)
Expand Down Expand Up @@ -4210,7 +4291,10 @@ func (p *Brontide) handlePeerStorageMessage(msg *lnwire.PeerStorage) error {

p.log.Debugf("handling peerbackup for peer(%s)", p)

p.bMtx.L.Lock()
p.backupData = msg.Blob
p.bMtx.Signal()
p.bMtx.L.Unlock()

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions peer/brontide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,89 @@ func TestHandlePeerStorage(t *testing.T) {
})
}
}

// TestPeerStorageWriter tests the peerStorageWriter function.
func TestPeerStorageWriter(t *testing.T) {
harness := createTestPeer(t)
peer := harness.peer

// Start the function in a goroutine to test its functionality in
// another thread. A successful test means the function started and
// performed as expected.
peer.wg.Add(1)
go peer.peerStorageWriter()

// Update the backupData every quarter of the backupUpdateInterval.
// We do this to test that only the most recent update within this
// interval is persisted. This also tests that indeed, the storage is
// delayed.
interval := backupUpdateInterval / 4

// We would update the backupData eight times at every quarter of the
// backupUpdateInterval. That means this process would go on for
// 2 times the backupUpdateInterval duration.
for i := 0; i < 8; i++ {
ti := time.NewTicker(interval)
select {
case <-ti.C:
peer.bMtx.L.Lock()
peer.backupData = []byte{byte(i)}
peer.bMtx.Signal()
peer.bMtx.L.Unlock()

case <-time.After(1 * time.Second):
t.Fatalf("did not receive ticker as expected.")
}
}

// We expect to persist initial backup data at index 0. After one full
// backupUpdateInterval, the next data to be persisted is at index 4.
// Between data at index 4 and the final data at index 7, only 3/4 of
// the backupUpdateInterval elapses. The remaining interval is completed
// with this sleep command, so that the data at index 4 would be
// persisted.
time.Sleep(backupUpdateInterval / 4)

retrievedData, err := peer.cfg.PeerDataStore.Retrieve()
require.NoError(t, err)

// We expect one data persisted within a backupUpdateInterval.
// Since we sent updates within a duration of two times the
// backupUpdateInterval, we expect to have persisted two updates
// only.
require.Len(t, retrievedData, 2)

// Convert the data to its corresponding integer values.
convToIntData := func(retrievedData []byte) []int {
return fn.Map(func(b byte) int {
return int(b)
}, retrievedData)
}

// The backup data was updated eight times, each annotated with its
// index. Due to the delay, we expect only the data from the zeroth and
// fourth updates to be persisted in that order.
require.Equal(t, []int{0, 4}, convToIntData(retrievedData))

// Test that we store remaining data on quit.
close(peer.quit)
peer.bMtx.Signal()

// The signal for data at index 7 wasn't picked up because it
// was sent during the storage delay for data at index 4.
// After sending another signal, we expect the
// `peerStorageWriter` to now pick it up, then bypass the
// storage delay and persist it immediately as we have now
// closed the peer's quit channel as well.
//
// Wait a bit to allow for storing before we retrieve.
time.Sleep(time.Second / 2)
retrievedData, err = peer.cfg.PeerDataStore.Retrieve()
require.NoError(t, err)

require.Len(t, retrievedData, 3)
require.Equal(t, []int{0, 4, 7}, convToIntData(retrievedData))

// Wait for goroutine to exit(good manners).
peer.wg.Wait()
}
44 changes: 43 additions & 1 deletion peer/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -538,6 +539,46 @@ func (m *mockMessageConn) Close() error {
return nil
}

type mockPeerDataStore struct {
data [][]byte
mtx sync.RWMutex
}

func newMockDataStore() *mockPeerDataStore {
return &mockPeerDataStore{}
}

// Store persists the backup data given to us by peers.
func (d *mockPeerDataStore) Store(data []byte) error {
d.mtx.Lock()
defer d.mtx.Unlock()

d.data = append(d.data, data)

return nil
}

// Delete deletes the peer with PeerPub public key from the storage layer.
func (d *mockPeerDataStore) Delete() error {
d.mtx.Lock()
defer d.mtx.Unlock()

d.data = nil

return nil
}

// Retrieve obtains data for peer with peerPub public key from the storage
// layer.
func (d *mockPeerDataStore) Retrieve() (
[]byte, error) {

d.mtx.RLock()
defer d.mtx.RUnlock()

return fn.Flatten(d.data), nil
}

// createTestPeer creates a new peer for testing and returns a context struct
// containing necessary handles and mock objects for conducting tests on peer
// functionalities.
Expand Down Expand Up @@ -708,7 +749,8 @@ func createTestPeer(t *testing.T) *peerTestCtx {

return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
PongBuf: make([]byte, lnwire.MaxPongBytes),
PeerDataStore: newMockDataStore(),
}

alicePeer := NewBrontide(*cfg)
Expand Down