Skip to content

Commit

Permalink
Add a new test and explain logic
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
  • Loading branch information
tpaschalis committed Jun 30, 2024
1 parent 5927af7 commit 8fec037
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
68 changes: 67 additions & 1 deletion client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
clientSyncedState := &ClientSyncedState{}
clientSyncedState.SetFlags(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid)
capabilities := protobufs.AgentCapabilities_AgentCapabilities_Unspecified
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities)
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, nil, capabilities, new(sync.Mutex))

// If we process a message with a nil AgentIdentification, or an incorrect NewInstanceUid.
sender.receiveProcessor.ProcessReceivedMessage(ctx,
Expand All @@ -208,3 +208,69 @@ func TestRequestInstanceUidFlagReset(t *testing.T) {
assert.Equal(t, sender.receiveProcessor.clientSyncedState.flags, protobufs.AgentToServerFlags_AgentToServerFlags_Unspecified)
cancel()
}

func TestPackageUpdatesInParallel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
localPackageState := NewInMemPackagesStore()
sender := NewHTTPSender(&sharedinternal.NopLogger{})

var messages atomic.Int32
var mut sync.Mutex
sender.callbacks = types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
err := msg.PackageSyncer.Sync(ctx)
assert.NoError(t, err)
messages.Add(1)
},
}

// Set the RequestInstanceUid flag on the tracked state to request the server for a new ID to use.
clientSyncedState := &ClientSyncedState{}
capabilities := protobufs.AgentCapabilities_AgentCapabilities_AcceptsPackages
sender.receiveProcessor = newReceivedProcessor(&sharedinternal.NopLogger{}, sender.callbacks, sender, clientSyncedState, localPackageState, capabilities, &mut)

go func() {
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "foo",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})
}()
go func() {
sender.receiveProcessor.ProcessReceivedMessage(ctx,
&protobufs.ServerToAgent{
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package22": {
Type: protobufs.PackageType_PackageType_TopLevel,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: "bar",
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
})
}()

assert.Eventually(t, func() bool {
return messages.Load() == 2
}, 2*time.Second, 100*time.Millisecond, "both messages must have been processed successfully")

cancel()
}
7 changes: 6 additions & 1 deletion client/internal/packagessyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (s *packagesSyncer) Sync(ctx context.Context) error {
}()

// Prepare package statuses.
// Grab a lock to make sure that package statuses are not overriden by
// another call to Sync running in parallel.
s.mut.Lock()
if err := s.initStatuses(); err != nil {
return err
Expand All @@ -62,7 +64,8 @@ func (s *packagesSyncer) Sync(ctx context.Context) error {
return err
}

// Now do the actual syncing in the background.
// Now do the actual syncing in the background and release the lock from
// inside of the goroutine.
go s.doSync(ctx)

return nil
Expand Down Expand Up @@ -104,6 +107,8 @@ func (s *packagesSyncer) initStatuses() error {

// doSync performs the actual syncing process.
func (s *packagesSyncer) doSync(ctx context.Context) {
// Once doSync returns in a separate goroutine, make sure to release the
// mutex so that a new syncing process can take place.
defer s.mut.Unlock()

hash, err := s.localState.AllPackagesHash()
Expand Down

0 comments on commit 8fec037

Please sign in to comment.