Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default to WorkflowUpdateStageCompleted if unspecified #73

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

- [#73](https://github.com/cludden/protoc-gen-go-temporal/pull/73) default to WorkflowUpdateStageCompleted if update options WaitForStage unspecified



# [1.14.0](https://github.com/cludden/protoc-gen-go-temporal/releases/tag/v1.14.0) - 2024-06-20

### ⚠ BREAKING CHANGES

- [#72](https://github.com/cludden/protoc-gen-go-temporal/pull/72) upgrade go.temporal.io/sdk to [v1.27.0](https://github.com/temporalio/sdk-go/releases/tag/v1.27.0)

### Added

### Changed

### Fixed



Expand Down
219 changes: 122 additions & 97 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
[![GoDoc](https://godoc.org/github.com/cludden/protoc-gen-go-temporal?status.svg)](https://pkg.go.dev/github.com/cludden/protoc-gen-go-temporal)
[![Buf](https://img.shields.io/badge/buf-cludden%2Fprotoc--gen--go--temporal-blue)](https://buf.build/cludden/protoc-gen-go-temporal)

https://buf.build/cludden/protoc-gen-go-temporal
A protoc plugin for generating typed Temporal clients and workers in Go from protobuf schemas. This plugin allows Workflow authors to configure sensible defaults and guardrails, simplifies the implementation and testing of Temporal workers, and streamlines integration by providing typed client SDKs and a generated CLI application.

<small><i>Inspired by [Chad Retz's](https://github.com/cretz/) awesome [github.com/cretz/temporal-sdk-go-advanced](https://github.com/cretz/temporal-sdk-go-advanced) and [Jacob LeGrone's](https://github.com/jlegrone/) excellent Replay talk on [Temporal @ Datadog](https://youtu.be/LxgkAoTSI8Q)</i></small>
Expand Down Expand Up @@ -106,33 +105,52 @@ See examples for more usage:

4. Initialize buf repository
```shell
mkdir proto && cd proto && buf mod init
mkdir proto && && buf config init
```

5. Add dependency to `buf.yaml`
```yaml
version: v1
version: v2
modules:
- path: proto
deps:
- buf.build/cludden/protoc-gen-go-temporal:<version>
- buf.build/cludden/protoc-gen-go-temporal:v1.14.0
lint:
use:
- DEFAULT
breaking:
use:
- FILE
```

6. Add plugin to `buf.gen.yaml` and exclude it from managed mode go prefix
```yaml
version: v1
version: v2
managed:
enabled: true
go_package_prefix:
default: github.com/foo/bar/gen
except:
- buf.build/cludden/protoc-gen-go-temporal
disable:
- file_option: go_package_prefix
module: buf.build/cludden/protoc-gen-go-temporal
override:
- file_option: go_package_prefix
value: example/gen
inputs:
- directory: proto
plugins:
- plugin: go
- local: protoc-gen-go
out: gen
opt: paths=source_relative
- plugin: go_temporal
opt:
- paths=source_relative
- local: protoc-gen-go_temporal
out: gen
opt: paths=source_relative,cli-enabled=true,cli-categories=true,workflow-update-enabled=true,docs-out=./proto/README.md
strategy: all
opt:
- cli-categories=true
- cli-enabled=true
- docs-out=./proto/README.md
- enable-xns=true
- paths=source_relative
- workflow-update-enabled=true
```

7. Define your service
Expand Down Expand Up @@ -234,129 +252,135 @@ See examples for more usage:

8. Generate temporal worker, client, and cli types, methods, interfaces, and functions
```shell
buf mod update && buf generate
buf dep update && buf generate
```

9. Implement the required Workflow and Activity interfaces
```go
package main

import (
"context"
"fmt"

examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
"context"
"fmt"
"log"
"os"

examplev1 "example/gen/example/v1"

"github.com/urfave/cli/v2"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

type (
// Workflows manages shared state for workflow constructors and is used to
// register workflows with a worker
Workflows struct{}
// Workflows manages shared state for workflow constructors and is used to
// register workflows with a worker
Workflows struct{}

// Activities manages shared state for activities and is used to register
// activities with a worker
Activities struct{}
// Activities manages shared state for activities and is used to register
// activities with a worker
Activities struct{}
)

// CreateFooWorkflow manages workflow state for a CreateFoo workflow
type CreateFooWorkflow struct {
// it embeds the generated workflow Input type that contains the workflow
// input and signal helpers
*examplev1.CreateFooWorkflowInput
// it embeds the generated workflow Input type that contains the workflow
// input and signal helpers
*examplev1.CreateFooWorkflowInput

log log.Logger
progress float32
status examplev1.Foo_Status
log tlog.Logger
progress float32
status examplev1.Foo_Status
}

// CreateFoo initializes a new CreateFooWorkflow value
func (w *Workflows) CreateFoo(ctx workflow.Context, input *examplev1.CreateFooWorkflowInput) (examplev1.CreateFooWorkflow, error) {
return &CreateFooWorkflow{
CreateFooWorkflowInput: input,
log: workflow.GetLogger(ctx),
status: examplev1.Foo_FOO_STATUS_CREATING,
}, nil
return &CreateFooWorkflow{
CreateFooWorkflowInput: input,
log: workflow.GetLogger(ctx),
status: examplev1.Foo_FOO_STATUS_CREATING,
}, nil
}

// Execute defines the entrypoint to a CreateFooWorkflow value
func (wf *CreateFooWorkflow) Execute(ctx workflow.Context) (*examplev1.CreateFooResponse, error) {
// listen for signals
workflow.Go(ctx, func(ctx workflow.Context) {
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
for {
signal, _ := wf.SetFooProgress.Receive(ctx)
wf.UpdateFooProgress(ctx, signal)
}
})

// execute Notify activity using generated helper
if err := examplev1.Notify(ctx, &examplev1.NotifyRequest{
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
Message: fmt.Sprintf("creating foo resource (%s)", wf.Req.GetName()),
}); err != nil {
return nil, fmt.Errorf("error sending notification: %w", err)
return nil, fmt.Errorf("error sending notification: %w", err)
}

// block until progress has reached 100 via signals and/or updates
if err := workflow.Await(ctx, func() bool {
return wf.status == examplev1.Foo_FOO_STATUS_READY
r eturn wf.status == examplev1.Foo_FOO_STATUS_READY
}); err != nil {
return nil, fmt.Errorf("error awaiting ready status: %w", err)
return nil, fmt.Errorf("error awaiting ready status: %w", err)
}

return &examplev1.CreateFooResponse{
Foo: &examplev1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
Foo: &examplev1.Foo{
Name: wf.Req.GetName(),
Status: wf.status,
},
}, nil
}

// GetFooProgress defines the handler for a GetFooProgress query
func (wf *CreateFooWorkflow) GetFooProgress() (*examplev1.GetFooProgressResponse, error) {
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}

// UpdateFooProgress defines the handler for an UpdateFooProgress update
func (wf *CreateFooWorkflow) UpdateFooProgress(ctx workflow.Context, req *examplev1.SetFooProgressRequest) (*examplev1.GetFooProgressResponse, error) {
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = examplev1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
}
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
wf.progress = req.GetProgress()
switch {
case wf.progress < 0:
wf.progress, wf.status = 0, examplev1.Foo_FOO_STATUS_CREATING
case wf.progress < 100:
wf.status = examplev1.Foo_FOO_STATUS_CREATING
case wf.progress >= 100:
wf.progress, wf.status = 100, examplev1.Foo_FOO_STATUS_READY
}
return &examplev1.GetFooProgressResponse{Progress: wf.progress, Status: wf.status}, nil
}

// Notify defines the implementation for a Notify activity
func (a *Activities) Notify(ctx context.Context, req *examplev1.NotifyRequest) error {
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
activity.GetLogger(ctx).Info("notification", "message", req.GetMessage())
return nil
}

func main() {
// initialize the generated cli application
app, err := examplev1.NewExampleCli(
examplev1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
// register activities and workflows using generated helpers
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
examplev1.RegisterExampleActivities(w, &example.Activities{})
examplev1.RegisterExampleWorkflows(w, &example.Workflows{})
return w, nil
}),
)
if err != nil {
log.Fatalf("error initializing example cli: %v", err)
}
// initialize the generated cli application
app, err := examplev1.NewExampleCli(
examplev1.NewExampleCliOptions().WithWorker(func(cmd *cli.Context, c client.Client) (worker.Worker, error) {
// register activities and workflows using generated helpers
w := worker.New(c, examplev1.ExampleTaskQueue, worker.Options{})
examplev1.RegisterExampleActivities(w, &Activities{})
examplev1.RegisterExampleWorkflows(w, &Workflows{})
return w, nil
}),
)
if err != nil {
log.Fatalf("error initializing example cli: %v", err)
}

// run cli
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
// run cli
if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}
```

Expand All @@ -371,7 +395,8 @@ See examples for more usage:

*start worker*
```shell
go get -u github.com/cludden/protoc-gen-go-temporal@<release> && go mod tidy
go get -u github.com/cludden/protoc-gen-go-temporal@<release>
go mod tidy
go run main.go worker
```

Expand All @@ -382,31 +407,31 @@ See examples for more usage:
package main

import (
"context"
"log"
"context"
"log"

examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/client"
examplev1 "github.com/cludden/protoc-gen-go-temporal/gen/example/v1"
"go.temporal.io/sdk/client"
)

func main() {
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewClient(c), context.Background()
c, _ := client.Dial(client.Options{})
client, ctx := examplev1.NewClient(c), context.Background()

run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"})
log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID())
run, _ := client.CreateFooAsync(ctx, &examplev1.CreateFooRequest{Name: "test"})
log.Printf("started workflow: workflow_id=%s, run_id=%s\n", run.ID(), run.RunID())

log.Println("signalling progress")
_ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7})
log.Println("signalling progress")
_ = run.SetFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 5.7})

progress, _ := run.GetFooProgress(ctx)
log.Printf("queried progress: %s\n", progress.String())
progress, _ := run.GetFooProgress(ctx)
log.Printf("queried progress: %s\n", progress.String())

update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
log.Printf("updated progress: %s\n", update.String())
update, _ := run.UpdateFooProgress(ctx, &examplev1.SetFooProgressRequest{Progress: 100})
log.Printf("updated progress: %s\n", update.String())

resp, _ := run.Get(ctx)
log.Printf("workflow completed: %s\n", resp.String())
resp, _ := run.Get(ctx)
log.Printf("workflow completed: %s\n", resp.String())
}
```

Expand Down
24 changes: 11 additions & 13 deletions internal/plugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1276,20 +1276,18 @@ func (svc *Manifest) genClientUpdateOptions(f *g.File, update protoreflect.FullN
if wp == temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED && updateOpts.GetWaitPolicy() != temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED {
wp = updateOpts.GetWaitPolicy()
}
if wp != temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED {
var stage string
switch wp {
case temporalv1.WaitPolicy_WAIT_POLICY_ACCEPTED:
stage = "WorkflowUpdateStageAccepted"
case temporalv1.WaitPolicy_WAIT_POLICY_ADMITTED:
stage = "WorkflowUpdateStageAdmitted"
case temporalv1.WaitPolicy_WAIT_POLICY_COMPLETED:
stage = "WorkflowUpdateStageCompleted"
}
waitPolicy.Else().If(g.Id("opts").Dot("WaitForStage").Op("==").Qual(clientPkg, "WorkflowUpdateStageUnspecified")).Block(
g.Id("opts").Dot("WaitForStage").Op("=").Qual(clientPkg, stage),
)
var stage string
switch wp {
case temporalv1.WaitPolicy_WAIT_POLICY_ACCEPTED:
stage = "WorkflowUpdateStageAccepted"
case temporalv1.WaitPolicy_WAIT_POLICY_ADMITTED:
stage = "WorkflowUpdateStageAdmitted"
case temporalv1.WaitPolicy_WAIT_POLICY_COMPLETED, temporalv1.WaitPolicy_WAIT_POLICY_UNSPECIFIED:
stage = "WorkflowUpdateStageCompleted"
}
waitPolicy.Else().If(g.Id("opts").Dot("WaitForStage").Op("==").Qual(clientPkg, "WorkflowUpdateStageUnspecified")).Block(
g.Id("opts").Dot("WaitForStage").Op("=").Qual(clientPkg, stage),
)

fn.Return(g.Id("opts"), g.Nil())
})
Expand Down