diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b547634..da0308a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +- default to WorkflowUpdateStageCompleted if update options WaitForStage unspecified + # [1.14.0](https://github.com/cludden/protoc-gen-go-temporal/releases/tag/v1.14.0) - 2024-06-20 @@ -22,12 +24,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### ⚠ BREAKING CHANGES - [#72](https://github.com/cludden/protoc-gen-go-temporal/pull/72) upgrade go.temporal.io/sdk to [v1.27.0](https://github.com/temporalio/sdk-go/releases/tag/v1.27.0) - -### Added - -### Changed - -### Fixed diff --git a/README.md b/README.md index 096e9a08..a829f5b2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,6 @@ [![GoDoc](https://godoc.org/github.com/cludden/protoc-gen-go-temporal?status.svg)](https://pkg.go.dev/github.com/cludden/protoc-gen-go-temporal) [![Buf](https://img.shields.io/badge/buf-cludden%2Fprotoc--gen--go--temporal-blue)](https://buf.build/cludden/protoc-gen-go-temporal) -https://buf.build/cludden/protoc-gen-go-temporal A protoc plugin for generating typed Temporal clients and workers in Go from protobuf schemas. This plugin allows Workflow authors to configure sensible defaults and guardrails, simplifies the implementation and testing of Temporal workers, and streamlines integration by providing typed client SDKs and a generated CLI application. Inspired by [Chad Retz's](https://github.com/cretz/) awesome [github.com/cretz/temporal-sdk-go-advanced](https://github.com/cretz/temporal-sdk-go-advanced) and [Jacob LeGrone's](https://github.com/jlegrone/) excellent Replay talk on [Temporal @ Datadog](https://youtu.be/LxgkAoTSI8Q) @@ -106,33 +105,52 @@ See examples for more usage: 4. Initialize buf repository ```shell - mkdir proto && cd proto && buf mod init + mkdir proto && && buf config init ``` 5. Add dependency to `buf.yaml` ```yaml - version: v1 + version: v2 + modules: + - path: proto deps: - - buf.build/cludden/protoc-gen-go-temporal: + - buf.build/cludden/protoc-gen-go-temporal:v1.14.0 + lint: + use: + - DEFAULT + breaking: + use: + - FILE ``` 6. Add plugin to `buf.gen.yaml` and exclude it from managed mode go prefix ```yaml - version: v1 + version: v2 managed: enabled: true - go_package_prefix: - default: github.com/foo/bar/gen - except: - - buf.build/cludden/protoc-gen-go-temporal + disable: + - file_option: go_package_prefix + module: buf.build/cludden/protoc-gen-go-temporal + override: + - file_option: go_package_prefix + value: example/gen + inputs: + - directory: proto plugins: - - plugin: go + - local: protoc-gen-go out: gen - opt: paths=source_relative - - plugin: go_temporal + opt: + - paths=source_relative + - local: protoc-gen-go_temporal out: gen - opt: paths=source_relative,cli-enabled=true,cli-categories=true,workflow-update-enabled=true,docs-out=./proto/README.md strategy: all + opt: + - cli-categories=true + - cli-enabled=true + - docs-out=./proto/README.md + - enable-xns=true + - paths=source_relative + - workflow-update-enabled=true ``` 7. Define your service @@ -234,7 +252,7 @@ See examples for more usage: 8. Generate temporal worker, client, and cli types, methods, interfaces, and functions ```shell - buf mod update && buf generate + buf dep update && buf generate ``` 9. Implement the required Workflow and Activity interfaces @@ -242,121 +260,127 @@ See examples for more usage: package main import ( - "context" - "fmt" - - examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1" - "go.temporal.io/sdk/activity" - "go.temporal.io/sdk/log" - "go.temporal.io/sdk/workflow" + "context" + "fmt" + "log" + "os" + + examplev1 "example/gen/example/v1" + + "github.com/urfave/cli/v2" + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + tlog "go.temporal.io/sdk/log" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" ) type ( - // Workflows manages shared state for workflow constructors and is used to - // register workflows with a worker - Workflows struct{} + // Workflows manages shared state for workflow constructors and is used to + // register workflows with a worker + Workflows struct{} - // Activities manages shared state for activities and is used to register - // activities with a worker - Activities struct{} + // Activities manages shared state for activities and is used to register + // activities with a worker + Activities struct{} ) // CreateFooWorkflow manages workflow state for a CreateFoo workflow type CreateFooWorkflow struct { - // it embeds the generated workflow Input type that contains the workflow - // input and signal helpers - *examplev1.CreateFooWorkflowInput + // it embeds the generated workflow Input type that contains the workflow + // input and signal helpers + *examplev1.CreateFooWorkflowInput - log log.Logger - progress float32 - status examplev1.Foo_Status + log tlog.Logger + progress float32 + status examplev1.Foo_Status } // CreateFoo initializes a new CreateFooWorkflow value func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) { - return &CreateFooWorkflow{ - CreateFooWorkflowInput: input, - log: workflow.GetLogger(ctx), - status: examplev1.Foo_FOO_STATUS_CREATING, - }, nil + return &CreateFooWorkflow{ + CreateFooWorkflowInput: input, + log: workflow.GetLogger(ctx), + status: examplev1.Foo_FOO_STATUS_CREATING, + }, nil } // Execute defines the entrypoint to a CreateFooWorkflow value func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) { // listen for signals workflow.Go(ctx, func(ctx workflow.Context) { - for { - signal, _ := wf.SetFooProgress.Receive(ctx) - wf.UpdateFooProgress(ctx, signal) - } + for { + signal, _ := wf.SetFooProgress.Receive(ctx) + wf.UpdateFooProgress(ctx, signal) + } }) // execute Notify activity using generated helper if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{ - Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()), + Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()), }); err != nil { - return nil, fmt.Errorf("error sending notification: %w", err) + return nil, fmt.Errorf("error sending notification: %w", err) } // block until progress has reached 100 via signals and/or updates if err := workflow.Await(ctx, func() bool { - return wf.status == examplev1.Foo_FOO_STATUS_READY + r eturn wf.status == examplev1.Foo_FOO_STATUS_READY }); err != nil { - return nil, fmt.Errorf("error awaiting ready status: %w", err) + return nil, fmt.Errorf("error awaiting ready status: %w", err) } return &examplev1.CreateFooResponse{ - Foo: &examplev1.Foo{ - Name: wf.Req.GetName(), - Status: wf.status, - }, + Foo: &examplev1.Foo{ + Name: wf.Req.GetName(), + Status: wf.status, + }, }, nil } // GetFooProgress defines the handler for a GetFooProgress query func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) { - return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil + return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil } // UpdateFooProgress defines the handler for an UpdateFooProgress update func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) { - wf.progress = req.GetProgress() - switch { - case wf.progress < 0: - wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING - case wf.progress < 100: - wf.status = examplev1.Foo_FOO_STATUS_CREATING - case wf.progress >= 100: - wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY - } - return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil + wf.progress = req.GetProgress() + switch { + case wf.progress < 0: + wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING + case wf.progress < 100: + wf.status = examplev1.Foo_FOO_STATUS_CREATING + case wf.progress >= 100: + wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY + } + return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil } // Notify defines the implementation for a Notify activity func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error { - activity.GetLogger(ctx).Info("notification", "message", req.GetMessage()) - return nil + activity.GetLogger(ctx).Info("notification", "message", req.GetMessage()) + return nil } func main() { - // initialize the generated cli application - app, err := examplev1.NewExampleCli( - examplev1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) { - // register activities and workflows using generated helpers - w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{}) - examplev1.RegisterExampleActivities(w, &example.Activities{}) - examplev1.RegisterExampleWorkflows(w, &example.Workflows{}) - return w, nil - }), - ) - if err != nil { - log.Fatalf("error initializing example cli: %v", err) - } + // initialize the generated cli application + app, err := examplev1.NewExampleCli( + examplev1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) { + // register activities and workflows using generated helpers + w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{}) + examplev1.RegisterExampleActivities(w, &Activities{}) + examplev1.RegisterExampleWorkflows(w, &Workflows{}) + return w, nil + }), + ) + if err != nil { + log.Fatalf("error initializing example cli: %v", err) + } - // run cli - if err := app.Run(os.Args); err != nil { - log.Fatal(err) - } + // run cli + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } } ``` @@ -371,7 +395,8 @@ See examples for more usage: *start worker* ```shell - go get -u github.com/cludden/protoc-gen-go-temporal@ && go mod tidy + go get -u github.com/cludden/protoc-gen-go-temporal@ + go mod tidy go run main.go worker ``` @@ -382,31 +407,31 @@ See examples for more usage: package main import ( - "context" - "log" + "context" + "log" - examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1" - "go.temporal.io/sdk/client" + examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1" + "go.temporal.io/sdk/client" ) func main() { - c, _ := client.Dial(client.Options{}) - client, ctx := examplev1.NewClient(c), context.Background() + c, _ := client.Dial(client.Options{}) + client, ctx := examplev1.NewClient(c), context.Background() - run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"}) - log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID()) + run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"}) + log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID()) - log.Println("signalling progress") - _ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7}) + log.Println("signalling progress") + _ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7}) - progress, _ := run.GetFooProgress(ctx) - log.Printf("queried progress: %s\n", progress.String()) + progress, _ := run.GetFooProgress(ctx) + log.Printf("queried progress: %s\n", progress.String()) - update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100}) - log.Printf("updated progress: %s\n", update.String()) + update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100}) + log.Printf("updated progress: %s\n", update.String()) - resp, _ := run.Get(ctx) - log.Printf("workflow completed: %s\n", resp.String()) + resp, _ := run.Get(ctx) + log.Printf("workflow completed: %s\n", resp.String()) } ``` diff --git a/internal/plugin/client.go b/internal/plugin/client.go index 41d11bd0..8ee4047f 100644 --- a/internal/plugin/client.go +++ b/internal/plugin/client.go @@ -1276,20 +1276,18 @@ func (svc *Manifest) genClientUpdateOptions(f *g.File, update protoreflect.FullN if wp == temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED && updateOpts.GetWaitPolicy() != temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED { wp = updateOpts.GetWaitPolicy() } - if wp != temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED { - var stage string - switch wp { - case temporalv1.WaitPolicy_WAIT_POLICY_ACCEPTED: - stage = "WorkflowUpdateStageAccepted" - case temporalv1.WaitPolicy_WAIT_POLICY_ADMITTED: - stage = "WorkflowUpdateStageAdmitted" - case temporalv1.WaitPolicy_WAIT_POLICY_COMPLETED: - stage = "WorkflowUpdateStageCompleted" - } - waitPolicy.Else().If(g.Id("opts").Dot("WaitForStage").Op("==").Qual(clientPkg, "WorkflowUpdateStageUnspecified")).Block( - g.Id("opts").Dot("WaitForStage").Op("=").Qual(clientPkg, stage), - ) + var stage string + switch wp { + case temporalv1.WaitPolicy_WAIT_POLICY_ACCEPTED: + stage = "WorkflowUpdateStageAccepted" + case temporalv1.WaitPolicy_WAIT_POLICY_ADMITTED: + stage = "WorkflowUpdateStageAdmitted" + case temporalv1.WaitPolicy_WAIT_POLICY_COMPLETED, temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED: + stage = "WorkflowUpdateStageCompleted" } + waitPolicy.Else().If(g.Id("opts").Dot("WaitForStage").Op("==").Qual(clientPkg, "WorkflowUpdateStageUnspecified")).Block( + g.Id("opts").Dot("WaitForStage").Op("=").Qual(clientPkg, stage), + ) fn.Return(g.Id("opts"), g.Nil()) })