Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor state client SetWithTTL #383

Merged
merged 8 commits into from
Apr 27, 2023
10 changes: 9 additions & 1 deletion actor/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package actor
import (
"context"
"sync"
"time"
)

// Client is the interface that should be impl by user's actor client.
Expand Down Expand Up @@ -205,8 +206,15 @@ type StateManagerContext interface {
Add(ctx context.Context, stateName string, value any) error
// Get is to get state store of @stateName with type @reply
Get(ctx context.Context, stateName string, reply any) error
// Set is to set new state store with @stateName and @value
// Set sets a state store with @stateName and @value.
// You should always use SetWithTTL unless you also intend to implement your
// own state expiration logic. This is to prevent the state store from
// growing indefinitely.
Set(ctx context.Context, stateName string, value any) error
// SetWithTTL sets a state store with @stateName and @value, for the given
// TTL. After the TTL has passed, the value will no longer be available with
// `Get`. Always preferred over `Set`.
SetWithTTL(ctx context.Context, stateName string, value any, ttl time.Duration) error
// Remove is to remove state store with @stateName
Remove(ctx context.Context, stateName string) error
// Contains is to check if state store contains @stateName
Expand Down
2 changes: 1 addition & 1 deletion actor/manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// Deprecated: use ActorContainerContext instead.
type ActorContainer interface {
Invoke(methodName string, param []byte) ([]reflect.Value, actorErr.ActorErr)
//nolint:staticcheck
//nolint:staticcheck // SA1019 Deprecated: use ActorContainerContext instead.
GetActor() actor.Server
}

Expand Down
20 changes: 15 additions & 5 deletions actor/state/actor_state_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ limitations under the License.

package state

import (
"time"
)

type ActorStateChange struct {
stateName string
value interface{}
changeKind ChangeKind
stateName string
value interface{}
changeKind ChangeKind
ttlInSeconds *int64
}

func NewActorStateChange(stateName string, value interface{}, changeKind ChangeKind) *ActorStateChange {
return &ActorStateChange{stateName: stateName, value: value, changeKind: changeKind}
func NewActorStateChange(stateName string, value any, changeKind ChangeKind, ttl *time.Duration) *ActorStateChange {
var ttlF *int64
if ttl != nil && *ttl > 0 {
ttlInSeconds := int64(ttl.Seconds())
ttlF = &ttlInSeconds
}
return &ActorStateChange{stateName: stateName, value: value, changeKind: changeKind, ttlInSeconds: ttlF}
}
46 changes: 25 additions & 21 deletions actor/state/actor_state_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,40 @@ limitations under the License.
package state

import (
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewActorStateChange(t *testing.T) {
type args struct {
secs5 := int64(5)

tests := map[string]struct {
stateName string
value interface{}
value any
changeKind ChangeKind
}
tests := []struct {
name string
args args
want *ActorStateChange
ttl time.Duration
want *ActorStateChange
}{
{
name: "init",
args: args{
stateName: "testStateName",
value: "testValue",
changeKind: Add,
},
want: &ActorStateChange{stateName: "testStateName", value: "testValue", changeKind: Add},
"init": {
stateName: "testStateName",
value: "testValue",
changeKind: Add,
ttl: time.Second*5 + time.Millisecond*400,
want: &ActorStateChange{stateName: "testStateName", value: "testValue", changeKind: Add, ttlInSeconds: &secs5},
},
"no TTL": {
stateName: "testStateName",
value: "testValue",
changeKind: Add,
ttl: 0,
want: &ActorStateChange{stateName: "testStateName", value: "testValue", changeKind: Add, ttlInSeconds: nil},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewActorStateChange(tt.args.stateName, tt.args.value, tt.args.changeKind); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewActorStateChange() = %v, want %v", got, tt.want)
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.want, NewActorStateChange(test.stateName, test.value, test.changeKind, &test.ttl))
})
}
}
1 change: 1 addition & 0 deletions actor/state/state_async_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (d *DaprStateAsyncProvider) ApplyContext(ctx context.Context, actorType, ac
OperationType: daprOperationName,
Key: stateChange.stateName,
Value: value,
TTLInSeconds: stateChange.ttlInSeconds,
})
}

Expand Down
12 changes: 10 additions & 2 deletions actor/state/state_change_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ limitations under the License.

package state

import "time"

type ChangeKind string

const (
Expand All @@ -24,12 +26,18 @@ const (

type ChangeMetadata struct {
Kind ChangeKind
Value interface{}
Value any
TTL *time.Duration
}

func NewChangeMetadata(kind ChangeKind, value interface{}) *ChangeMetadata {
func NewChangeMetadata(kind ChangeKind, value any) *ChangeMetadata {
return &ChangeMetadata{
Kind: kind,
Value: value,
}
}

func (c *ChangeMetadata) WithTTL(ttl time.Duration) *ChangeMetadata {
c.TTL = &ttl
return c
}
27 changes: 26 additions & 1 deletion actor/state/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/dapr/go-sdk/actor"
)
Expand Down Expand Up @@ -152,6 +153,30 @@ func (s *stateManagerCtx) Set(_ context.Context, stateName string, value any) er
return nil
}

func (s *stateManagerCtx) SetWithTTL(_ context.Context, stateName string, value any, ttl time.Duration) error {
if stateName == "" {
return errors.New("state name can't be empty")
}

if ttl < 0 {
return errors.New("ttl can't be negative")
}

if val, ok := s.stateChangeTracker.Load(stateName); ok {
metadata := val.(*ChangeMetadata)
if metadata.Kind == None || metadata.Kind == Remove {
metadata.Kind = Update
}
s.stateChangeTracker.Store(stateName, NewChangeMetadata(metadata.Kind, value))
return nil
}
s.stateChangeTracker.Store(stateName, (&ChangeMetadata{
Kind: Add,
Value: value,
}).WithTTL(ttl))
return nil
}

func (s *stateManagerCtx) Remove(ctx context.Context, stateName string) error {
if stateName == "" {
return errors.New("state name can't be empty")
Expand Down Expand Up @@ -200,7 +225,7 @@ func (s *stateManagerCtx) Save(ctx context.Context) error {
s.stateChangeTracker.Range(func(key, value any) bool {
stateName := key.(string)
metadata := value.(*ChangeMetadata)
changes = append(changes, NewActorStateChange(stateName, metadata.Value, metadata.Kind))
changes = append(changes, NewActorStateChange(stateName, metadata.Value, metadata.Kind, metadata.TTL))
return true
})
if err := s.stateAsyncProvider.ApplyContext(ctx, s.actorTypeName, s.actorID, changes); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions client/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"reflect"
"strconv"

anypb "github.com/golang/protobuf/ptypes/any"

Expand All @@ -28,6 +29,10 @@ import (
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
)

const (
metadataKeyTTLInSeconds = "ttlInSeconds"
)

type InvokeActorRequest struct {
ActorType string
ActorID string
Expand Down Expand Up @@ -463,6 +468,7 @@ type ActorStateOperation struct {
OperationType string
Key string
Value []byte
TTLInSeconds *int64
}

func (c *GRPCClient) SaveStateTransactionally(ctx context.Context, actorType, actorID string, operations []*ActorStateOperation) error {
Expand All @@ -477,12 +483,18 @@ func (c *GRPCClient) SaveStateTransactionally(ctx context.Context, actorType, ac
}
grpcOperations := make([]*pb.TransactionalActorStateOperation, 0)
for _, op := range operations {
var metadata map[string]string
if op.TTLInSeconds != nil {
metadata = make(map[string]string)
metadata[metadataKeyTTLInSeconds] = strconv.FormatInt(*op.TTLInSeconds, 10)
}
grpcOperations = append(grpcOperations, &pb.TransactionalActorStateOperation{
OperationType: op.OperationType,
Key: op.Key,
Value: &anypb.Any{
Value: op.Value,
},
Metadata: metadata,
})
}
_, err := c.protoClient.ExecuteActorStateTransaction(c.withAuthToken(ctx), &pb.ExecuteActorStateTransactionRequest{
Expand Down
Loading