Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sub-orchestration and Terminate SDK support #24

Merged
merged 3 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[submodule "submodules/durabletask-protobuf"]
path = submodules/durabletask-protobuf
url = https://github.com/microsoft/durabletask-protobuf
branch = distributed_tracing
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@ Go in an out-of-process worker process. It also contains various minor improveme

- Added `client` package with `TaskHubGrpcClient` and related functions
- Added otel span events for external events, suspend, and resume operations
- Added termination support to task module
- Added sub-orchestration support to task module
- (Tests) Added test suite starter for Go-based orchestration execution logic

### Changed

- Renamed `WithJsonSerializableEventData` to `WithJsonEventPayload`
cgillum marked this conversation as resolved.
Show resolved Hide resolved
- Moved gRPC client and related functions from `api` package to `client` package
- Switched SQLite driver to pure-Go implementation (no CGO dependency) ([#17](https://github.com/microsoft/durabletask-go/pull/17)) - contributed by [@ItalyPaleAle](https://github.com/ItalyPaleAle)
- Orchestration metadata fetching now gets input and output data by default (previously had to opt-in)
- Removed "input" parameter from CallActivity APIs and replaced with options pattern
- Removed "reason" parameter from Termination APIs and replaced with options pattern
- Renamed api.WithJsonEventPayload to api.WithEventPayload
- (Tests) Switched from `assert` to `require` in several tests to simplify code

### Fixed
Expand Down
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ Activity sequences like the following are the simplest and most common pattern u
// as an array.
func ActivitySequenceOrchestrator(ctx *task.OrchestrationContext) (any, error) {
var helloTokyo string
if err := ctx.CallActivity(SayHelloActivity, "Tokyo").Await(&helloTokyo); err != nil {
if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Tokyo")).Await(&helloTokyo); err != nil {
return nil, err
}
var helloLondon string
if err := ctx.CallActivity(SayHelloActivity, "London").Await(&helloLondon); err != nil {
if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("London")).Await(&helloLondon); err != nil {
return nil, err
}
var helloSeattle string
if err := ctx.CallActivity(SayHelloActivity, "Seattle").Await(&helloSeattle); err != nil {
if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Seattle")).Await(&helloSeattle); err != nil {
return nil, err
}
return []string{helloTokyo, helloLondon, helloSeattle}, nil
Expand All @@ -114,14 +114,14 @@ The next most common pattern is "fan-out / fan-in" where multiple activities are
func UpdateDevicesOrchestrator(ctx *task.OrchestrationContext) (any, error) {
// Get a dynamic list of devices to perform updates on
var devices []string
if err := ctx.CallActivity(GetDevicesToUpdate, nil).Await(&devices); err != nil {
if err := ctx.CallActivity(GetDevicesToUpdate).Await(&devices); err != nil {
return nil, err
}

// Start a dynamic number of tasks in parallel, not waiting for any to complete (yet)
tasks := make([]task.Task, 0, len(devices))
for _, id := range devices {
tasks = append(tasks, ctx.CallActivity(UpdateDevice, id))
tasks = append(tasks, ctx.CallActivity(UpdateDevice, task.WithActivityInput(id)))
}

// Now that all are started, wait for them to complete and then return the success rate
Expand Down Expand Up @@ -167,8 +167,7 @@ go func() {
var nameInput string
fmt.Scanln(&nameInput)

opts := api.WithJsonSerializableEventData(nameInput)
client.RaiseEvent(ctx, id, "Name", opts)
client.RaiseEvent(ctx, id, "Name", api.WithEventPayload(nameInput))
}()
```

Expand Down
71 changes: 56 additions & 15 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

var (
ErrInstanceNotFound = errors.New("No such instance exists")
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
Expand All @@ -37,45 +37,53 @@ type OrchestrationMetadata struct {
}

// NewOrchestrationOptions configures options for starting a new orchestration.
type NewOrchestrationOptions func(*protos.CreateInstanceRequest)
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error

// GetOrchestrationMetadataOptions is a set of options for fetching orchestration metadata.
type FetchOrchestrationMetadataOptions func(*protos.GetInstanceRequest)

// RaiseEventOptions is a set of options for raising an orchestration event.
type RaiseEventOptions func(*protos.RaiseEventRequest)
type RaiseEventOptions func(*protos.RaiseEventRequest) error

// TerminateOptions is a set of options for terminating an orchestration.
type TerminateOptions func(*protos.TerminateRequest) error

// WithInstanceID configures an explicit orchestration instance ID. If not specified,
// a random UUID value will be used for the orchestration instance ID.
func WithInstanceID(id InstanceID) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) {
return func(req *protos.CreateInstanceRequest) error {
req.InstanceId = string(id)
return nil
}
}

// WithInput configures an input for the orchestration. The specified input must be serializable.
func WithInput(input any) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) {
// TODO: Make the encoder configurable
// TODO: Error handling?
bytes, _ := json.Marshal(input)
return func(req *protos.CreateInstanceRequest) error {
bytes, err := json.Marshal(input)
if err != nil {
return err
}
req.Input = wrapperspb.String(string(bytes))
return nil
}
}

// WithRawInput configures an input for the orchestration. The specified input must be a string.
func WithRawInput(rawInput string) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) {
return func(req *protos.CreateInstanceRequest) error {
req.Input = wrapperspb.String(rawInput)
return nil
}
}

// WithStartTime configures a start time at which the orchestration should start running.
// Note that the actual start time could be later than the specified start time if the
// task hub is under load or if the app is not running at the specified start time.
func WithStartTime(startTime time.Time) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) {
return func(req *protos.CreateInstanceRequest) error {
req.ScheduledStartTimestamp = timestamppb.New(startTime)
return nil
}
}

Expand All @@ -86,18 +94,51 @@ func WithFetchPayloads(fetchPayloads bool) FetchOrchestrationMetadataOptions {
}
}

// WithJsonEventPayload configures an event payload that can be serialized to JSON.
func WithJsonEventPayload(data any) RaiseEventOptions {
return func(req *protos.RaiseEventRequest) {
bytes, _ := json.Marshal(data)
// WithEventPayload configures an event payload. The specified payload must be serializable.
func WithEventPayload(data any) RaiseEventOptions {
return func(req *protos.RaiseEventRequest) error {
bytes, err := json.Marshal(data)
if err != nil {
return err
}
req.Input = wrapperspb.String(string(bytes))
return nil
}
}

// WithRawEventData configures an event payload that is a raw, unprocessed string (e.g. JSON data).
func WithRawEventData(data string) RaiseEventOptions {
return func(req *protos.RaiseEventRequest) {
return func(req *protos.RaiseEventRequest) error {
req.Input = wrapperspb.String(data)
return nil
}
}

// WithOutput configures an output for the terminated orchestration. The specified output must be serializable.
func WithOutput(data any) TerminateOptions {
return func(req *protos.TerminateRequest) error {
bytes, err := json.Marshal(data)
if err != nil {
return err
}
req.Output = wrapperspb.String(string(bytes))
return nil
}
}

// WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for the terminated orchestration.
func WithRawOutput(data string) TerminateOptions {
return func(req *protos.TerminateRequest) error {
req.Output = wrapperspb.String(data)
return nil
}
}

// WithRecursive configures whether to terminate all sub-orchestrations created by the target orchestration.
func WithRecursive(recursive bool) TerminateOptions {
return func(req *protos.TerminateRequest) error {
req.Recursive = recursive
return nil
}
}

Expand Down
29 changes: 17 additions & 12 deletions backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/google/uuid"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/internal/helpers"
Expand All @@ -21,7 +20,7 @@ type TaskHubClient interface {
FetchOrchestrationMetadata(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
WaitForOrchestrationStart(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID) (*api.OrchestrationMetadata, error)
TerminateOrchestration(ctx context.Context, id api.InstanceID, reason string) error
TerminateOrchestration(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error
RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error
SuspendOrchestration(ctx context.Context, id api.InstanceID, reason string) error
ResumeOrchestration(ctx context.Context, id api.InstanceID, reason string) error
Expand All @@ -42,7 +41,9 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat
name := helpers.GetTaskFunctionName(orchestrator)
req := &protos.CreateInstanceRequest{Name: name}
for _, configure := range opts {
configure(req)
if err := configure(req); err != nil {
return api.EmptyInstanceID, fmt.Errorf("failed to configure create instance request: %w", err)
}
}
if req.InstanceId == "" {
req.InstanceId = uuid.NewString()
Expand Down Expand Up @@ -123,8 +124,15 @@ func (c *backendClient) waitForOrchestrationCondition(ctx context.Context, id ap
// TerminateOrchestration enqueues a message to terminate a running orchestration, causing it to stop receiving new events and
// go directly into the TERMINATED state. This operation is asynchronous. An orchestration worker must
// dequeue the termination event before the orchestration will be terminated.
func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.InstanceID, reason string) error {
e := helpers.NewExecutionTerminatedEvent(wrapperspb.String(reason))
func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error {
req := &protos.TerminateRequest{InstanceId: string(id), Recursive: true}
for _, configure := range opts {
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure termination request: %w", err)
}
}

e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to add terminate event: %w", err)
}
Expand All @@ -142,15 +150,12 @@ func (c *backendClient) TerminateOrchestration(ctx context.Context, id api.Insta
func (c *backendClient) RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error {
req := &protos.RaiseEventRequest{InstanceId: string(id), Name: eventName}
for _, configure := range opts {
configure(req)
}

var rawValue *wrapperspb.StringValue
if req.Input != nil {
rawValue = wrapperspb.String(string(req.Input.Value))
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure raise event request: %w", err)
}
}

e := helpers.NewEventRaisedEvent(eventName, rawValue)
e := helpers.NewEventRaisedEvent(req.Name, req.Input)
if err := c.be.AddNewOrchestrationEvent(ctx, id, e); err != nil {
return fmt.Errorf("failed to raise event: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst

// TerminateInstance implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
e := helpers.NewExecutionTerminatedEvent(req.Output)
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
if err := g.backend.AddNewOrchestrationEvent(ctx, api.InstanceID(req.InstanceId), e); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions backend/runtimestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ func (s *OrchestrationRuntimeState) ApplyActions(actions []*protos.OrchestratorA
e := helpers.NewSendEventEvent(action.Id, sendEvent.Instance.InstanceId, sendEvent.Name, sendEvent.Data)
s.AddEvent(e)
s.pendingMessages = append(s.pendingMessages, OrchestratorMessage{HistoryEvent: e, TargetInstanceID: sendEvent.Instance.InstanceId})
} else if terminate := action.GetTerminateOrchestration(); terminate != nil {
// Send a message to terminate the target orchestration
msg := OrchestratorMessage{
TargetInstanceID: terminate.InstanceId,
HistoryEvent: helpers.NewExecutionTerminatedEvent(terminate.Reason, terminate.Recurse),
}
s.pendingMessages = append(s.pendingMessages, msg)
} else {
return false, fmt.Errorf("unknown action type: %v", action)
}
Expand Down
20 changes: 14 additions & 6 deletions client/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ func (c *TaskHubGrpcClient) WaitForOrchestrationCompletion(ctx context.Context,

// TerminateOrchestration terminates a running orchestration by causing it to stop receiving new events and
// putting it directly into the TERMINATED state.
func (c *TaskHubGrpcClient) TerminateOrchestration(ctx context.Context, id api.InstanceID, reason string) error {
req := &protos.TerminateRequest{
InstanceId: string(id),
Output: wrapperspb.String(reason),
func (c *TaskHubGrpcClient) TerminateOrchestration(ctx context.Context, id api.InstanceID, opts ...api.TerminateOptions) error {
req := &protos.TerminateRequest{InstanceId: string(id), Recursive: true}
for _, configure := range opts {
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure termination request: %w", err)
}
}

_, err := c.client.TerminateInstance(ctx, req)
if err != nil {
if ctx.Err() != nil {
Expand All @@ -130,7 +133,9 @@ func (c *TaskHubGrpcClient) TerminateOrchestration(ctx context.Context, id api.I
func (c *TaskHubGrpcClient) RaiseEvent(ctx context.Context, id api.InstanceID, eventName string, opts ...api.RaiseEventOptions) error {
req := &protos.RaiseEventRequest{InstanceId: string(id), Name: eventName}
for _, configure := range opts {
configure(req)
if err := configure(req); err != nil {
return fmt.Errorf("failed to configure raise event request: %w", err)
}
}

if _, err := c.client.RaiseEvent(ctx, req); err != nil {
Expand Down Expand Up @@ -195,7 +200,10 @@ func (c *TaskHubGrpcClient) PurgeOrchestrationState(ctx context.Context, id api.
}

func makeGetInstanceRequest(id api.InstanceID, opts []api.FetchOrchestrationMetadataOptions) *protos.GetInstanceRequest {
req := &protos.GetInstanceRequest{InstanceId: string(id)}
req := &protos.GetInstanceRequest{
InstanceId: string(id),
GetInputsAndOutputs: true,
}
for _, configure := range opts {
configure(req)
}
Expand Down
3 changes: 2 additions & 1 deletion client/worker_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/microsoft/durabletask-go/internal/protos"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error {
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *TaskHubGrpcClient) processOrchestrationWorkItem(
failureAction := helpers.NewCompleteOrchestrationAction(
-1,
protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED,
"An internal error occured while executing the orchestration.",
wrapperspb.String("An internal error occured while executing the orchestration."),
nil,
&protos.TaskFailureDetails{
ErrorType: fmt.Sprintf("%T", err),
Expand Down
Loading
Loading