Skip to content

Commit

Permalink
Merge pull request #8021 from elena-kolevska/fix/raftnilmap
Browse files Browse the repository at this point in the history
Fixes nil map error when restoring raft state from old version
  • Loading branch information
JoshVanL committed Aug 28, 2024
2 parents 81cb1f5 + 4e347cc commit 5003a28
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 35 deletions.
171 changes: 139 additions & 32 deletions pkg/placement/raft/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package raft
import (
"bytes"
"io"
"sync"
"testing"

"github.com/hashicorp/raft"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/dapr/pkg/placement/hashing"
)

func TestFSMApply(t *testing.T) {
Expand Down Expand Up @@ -98,38 +101,6 @@ func TestFSMApply(t *testing.T) {
})
}

func TestRestore(t *testing.T) {
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})

s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})
s.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
Namespace: "ns1",
AppID: "FakeID",
Entities: []string{"actorTypeOne", "actorTypeTwo"},
})
buf := bytes.NewBuffer(make([]byte, 0, 256))
err := s.persist(buf)
require.NoError(t, err)

err = fsm.Restore(io.NopCloser(buf))
require.NoError(t, err)

require.Equal(t, 1, fsm.state.MemberCountInNamespace("ns1"))

hashingTable, err := fsm.State().hashingTableMap("ns1")
require.NoError(t, err)
require.Len(t, hashingTable, 2)
}

func TestPlacementStateWithVirtualNodes(t *testing.T) {
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 5,
Expand Down Expand Up @@ -197,3 +168,139 @@ func TestPlacementState(t *testing.T) {
assert.Contains(t, host.GetLoadMap(), "127.0.0.1:3030")
}
}

func TestRestore(t *testing.T) {
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})

s := newDaprHostMemberState(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})
s.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
Namespace: "ns1",
AppID: "FakeID",
Entities: []string{"actorTypeOne", "actorTypeTwo"},
})
buf := bytes.NewBuffer(make([]byte, 0, 256))
err := s.persist(buf)
require.NoError(t, err)

err = fsm.Restore(io.NopCloser(buf))
require.NoError(t, err)

require.Equal(t, 1, fsm.state.MemberCountInNamespace("ns1"))

hashingTable, err := fsm.State().hashingTableMap("ns1")
require.NoError(t, err)
require.Len(t, hashingTable, 2)
}

type DaprHostMemberOld struct {
// Name is the unique name of Dapr runtime host.
Name string
// AppID is Dapr runtime app ID.
AppID string
// Entities is the list of Actor Types which this Dapr runtime supports.
Entities []string

// UpdatedAt is the last time when this host member info is updated.
UpdatedAt int64

// Version of the Actor APIs supported by the Dapr runtime
APILevel uint32
}

type DaprHostMemberStateOld struct {
lock sync.RWMutex

config DaprHostMemberStateConfig
data DaprHostMemberStateDataOld
}

func (s *DaprHostMemberStateOld) persist(w io.Writer) error {
s.lock.RLock()
defer s.lock.RUnlock()

b, err := marshalMsgPack(s.data)
if err != nil {
return err
}

if _, err := w.Write(b); err != nil {
return err
}

return nil
}

type DaprHostMemberStateDataOld struct {
// Index is the index number of raft log.
Index uint64
// Members includes Dapr runtime hosts.
Members map[string]*DaprHostMemberOld

// TableGeneration is the generation of hashingTableMap.
// This is increased whenever hashingTableMap is updated.
TableGeneration uint64

// Version of the actor APIs for the cluster
APILevel uint32

// hashingTableMap is the map for storing consistent hashing data
// per Actor types. This will be generated when log entries are replayed.
// While snapshotting the state, this member will not be saved. Instead,
// hashingTableMap will be recovered in snapshot recovery process.
hashingTableMap map[string]*hashing.Consistent
}

func TestRestoreFromOldVersion(t *testing.T) {
// Simulate saving a snapshot with a pre 1.14 version of the state (no namespace)
config := DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
}

oldState := &DaprHostMemberStateOld{
config: config,
data: DaprHostMemberStateDataOld{
Members: map[string]*DaprHostMemberOld{},
hashingTableMap: map[string]*hashing.Consistent{},
},
}
oldState.data.Members["127.0.0.1:8080"] = &DaprHostMemberOld{
Name: "127.0.0.1:8080",
AppID: "FakeID",
Entities: []string{"actorTypeOne", "actorTypeTwo"},
APILevel: 10,
}

buf := bytes.NewBuffer(make([]byte, 0, 256))
err := oldState.persist(buf)
require.NoError(t, err)

// Start restore with new version (1.14+)
// We don't expect to restore old state, but we should not panic
fsm := newFSM(DaprHostMemberStateConfig{
replicationFactor: 100,
minAPILevel: 0,
maxAPILevel: 100,
})
err = fsm.Restore(io.NopCloser(buf))
require.NoError(t, err)

ok := fsm.state.upsertMember(&DaprHostMember{
Name: "127.0.0.1:8080",
Namespace: "ns1",
AppID: "FakeID",
Entities: []string{"actorTypeOne", "actorTypeTwo"},
})

require.True(t, ok)
}
7 changes: 5 additions & 2 deletions pkg/placement/raft/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,10 @@ func (s *DaprHostMemberState) upsertMember(host *DaprHostMember) bool {

ns, ok := s.data.Namespace[host.Namespace]
if !ok {
s.data.Namespace[host.Namespace] = &daprNamespace{
ns = &daprNamespace{
Members: make(map[string]*DaprHostMember),
}
ns = s.data.Namespace[host.Namespace]
s.data.Namespace[host.Namespace] = ns
}

if m, ok := ns.Members[host.Name]; ok {
Expand Down Expand Up @@ -473,6 +473,9 @@ func (s *DaprHostMemberState) restore(r io.Reader) error {
defer s.lock.Unlock()

s.data = data
if s.data.Namespace == nil {
s.data.Namespace = make(map[string]*daprNamespace)
}

s.restoreHashingTables()
s.updateAPILevel()
Expand Down
1 change: 0 additions & 1 deletion pkg/placement/raft/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func TestRestoreHashingTables(t *testing.T) {
maxAPILevel: 100,
})

s.data.Namespace = make(map[string]*daprNamespace)
s.data.Namespace["ns1"] = &daprNamespace{
Members: make(map[string]*DaprHostMember),
}
Expand Down

0 comments on commit 5003a28

Please sign in to comment.