Features - Go SDK feature guide
The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.
In this section you can find the following:
- How to develop Signals
- How to develop Queries
- How to start a Child Workflow Execution
- How to start a Temporal Cron Job
- How to use Continue-As-New
- How to set Workflow timeouts & retries
- How to set Activity timeouts & retries
- How to Heartbeat an Activity
- How to Asynchronously complete an Activity
- How to register Namespaces
- How to develop with Updates
- How to use Start Delay
How to develop with Signals
A Signal is a message sent to a running Workflow Execution.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
How to define a Signal
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializable.
Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter.
The Receive()
method on the Data Converter decodes the data into the Struct within the Workflow.
Only public fields are serializable.
MySignal struct {
Message string // serializable
message string // not serializable
}
How to handle a Signal
Workflows listen for Signals by the Signal's name.
Use the GetSignalChannel()
API from the go.temporal.io/sdk/workflow
package to get the Signal Channel.
A common use-case is to block a Workflow while waiting for a Signal, like in the following snippet:
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
signalChan.Receive(ctx, &signal)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}
Alternatively, you might want the Workflow to proceed and still be capable of handling external Signals.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
workflow.Go(ctx, func(ctx workflow.Context) {
for {
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &mySignal)
})
selector.Select(ctx)
}
})
// submit activity one
// signal can be received while activity one is pending
}
In the example above, the Workflow code uses workflow.GetSignalChannel
to open a workflow.Channel
for the Signal type (identified by the Signal name).
Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost.
How to send a Signal from a Temporal Client
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.
Use the SignalWorkflow()
method on an instance of the Go SDK Temporal Client to send a Signal to a Workflow Execution.
Pass in both the Workflow Id and Run Id to uniquely identify the Workflow Execution. If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...
Possible errors:
serviceerror.NotFound
serviceerror.Internal
serviceerror.Unavailable
How to send a Signal from a Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow
API from the go.temporal.io/sdk/workflow
package.
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}
How to Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
Use the SignalWithStartWorkflow()
API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.
Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
How to develop with Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
How to define a Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializable.
In Go, a Query type, also called a Query name, is a string
value.
queryType := "your_query_name"
How to handle a Query
Queries are handled by your Workflow.
Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.
Use the SetQueryHandler
API from the go.temporal.io/sdk/workflow
package to set a Query Handler that listens for a Query by name.
The handler must be a function that returns two values:
- A serializable result
- An error
The handler function can receive any number of input parameters, but all input parameters must be serializable.
The following sample code sets up a Query Handler that handles the current_state
Query type:
func YourWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = WithActivityOptions(ctx, yourActivityOptions)
err = ExecuteActivity(ctx, YourActivity, "your_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}
For example, suppose your query handler function takes two parameters:
err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})
How to send a Query
Queries are sent from a Temporal Client.
Use the QueryWorkflow()
API or the QueryWorkflowWithOptions
API on the Temporal Client to send a Query to a Workflow Execution.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...
You can pass an arbitrary number of arguments to the QueryWorkflow()
function.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...
The value of response
returned by the Query needs to be decoded into result
.
Because this is a future, use Get()
on response
to get the result, such as a string in this example.
var result string
if err != response.Get(&result); err != nil {
// ...
}
log.Println("Received Query result. Result: " + result)
How to develop with Updates
An Update is an operation that can mutate the state of a Workflow Execution and return a response.
How to define an Update in Go
In Go, you define an Update type, also known as an Update name, as a string
value.
You must ensure the arguments and result are serializable.
When sending and receiving the Update, use the Update name as an identifier.
The name does not link to the data type(s) sent with the Update.
Ensure that every Workflow listening to the same Update name can handle the same Update arguments.
View the source code
in the context of the rest of the application code.
// YourUpdateName holds a string value used to correlate Updates.
const YourUpdateName = "your_update_name"
// ...
func YourUpdatableWorkflow(ctx workflow.Context, param WFParam) (WFResult, error) {
// ...
err := workflow.SetUpdateHandler(ctx, YourUpdateName, func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
// ...
}
// ...
}
How to handle an Update in Go
Register an Update handler for a given name using the SetUpdateHandler API from the go.temporal.io/sdk/workflow
package.
The handler function can accept multiple serializable input parameters, but we recommend using only a single parameter.
This practice enables you to add fields in future versions while maintaining backward compatibility.
You can optionally include a workflow.Context
parameter in the first position of the function.
The function can return either a serializable value with an error or just an error.
The Workflow's WorkflowPanicPolicy configuration determines how panics are handled inside the Handler function.
WorkflowPanicPolicy is set in the Worker Options.
Update handlers, unlike Query handlers, can change Workflow state.
View the source code
in the context of the rest of the application code.
// ...
func YourUpdatableWorkflow(ctx workflow.Context, param WFParam) (WFResult, error) {
counter := param.StartCount
err := workflow.SetUpdateHandler(ctx, YourUpdateName, func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
counter += arg.Add
result := YourUpdateResult{
Total: counter,
}
return result, nil
})
// ...
}
How to set an Update validator function in go
Validate certain aspects of the data sent to the Workflow using an Update validator function.
For instance, a counter Workflow might never want to accept a non-positive number.
Invoke the SetUpdateHandlerWithOptions
API and define a validator function as one of the options.
When you use a Validator function, the Worker receives the Update first, before any Events are written to the Event History.
If the Update is rejected, it's not recorded in the Event History.
If it's accepted, the WorkflowExecutionUpdateAccepted
Event occurs.
Afterwards, the Worker executes the accepted Update and, upon completion, a WorkflowExecutionUpdateCompleted
Event gets written into the Event History.
The Validator function, unlike the Update Handler, can not change the state of the Workflow.
The platform treats a panic in the Validator function as a rejection of the Update."
View the source code
in the context of the rest of the application code.
// UpdatableWorkflowWithValidator is a Workflow Definition.
// This Workflow Definition has an Update handler that uses the isPositive() validator function.
// After setting the Update hanlder it sleeps for 1 minutue.
// Updates can be sent to the Workflow during this time.
func UpdatableWorkflowWithValidator(ctx workflow.Context, param WFParam) (WFResult, error) {
counter := param.StartCount
err := workflow.SetUpdateHandlerWithOptions(
ctx, YourValidatedUpdateName,
func(ctx workflow.Context, arg YourUpdateArg) (YourUpdateResult, error) {
// ...
},
// Set the isPositive validator.
workflow.UpdateHandlerOptions{Validator: isPositive},
)
if err != nil {
return WFResult{}, err
}
// ...
}
// isPositive is a validator function.
// It returns an error if the int value is below 1.
// This function can not change the state of the Workflow.
// workflow.Context can be used to log
func isPositive(ctx workflow.Context, u YourUpdateArg) error {
log := workflow.GetLogger(ctx)
if u.Add < 1 {
log.Debug("Rejecting non-positive number, positive integers only", "UpdateValue", u.Add)
return fmt.Errorf("addend must be a positive integer (%v)", u.Add)
}
log.Debug("Accepting Update", "UpdateValue", u.Add)
return nil
}
How to send an Update from a Temporal Client in Go
Invoke the UpdateWorkflow() method on an instance of the Go SDK Temporal Client to dispatch an Update to a Workflow Execution.
You must provide the Workflow Id, but specifying a Run Id is optional. If you supply only the Workflow Id (and provide an empty string as the Run Id param), the currently running Workflow Execution receives the Update.
View the source code
in the context of the rest of the application code.
func main() {
// ...
// Set the Update argument values.
updateArg := updates.YourUpdateArg{
Add: n,
}
// Call the UpdateWorkflow API.
// A blank RunID means that the Update is routed to the most recent Workflow Run of the specified Workflow ID.
updateHandle, err := temporalClient.UpdateWorkflow(context.Background(), updates.YourUpdateWFID, "", updates.YourUpdateName, updateArg)
if err != nil {
log.Fatalln("Error issuing Update request", err)
}
// Get the result of the Update.
var updateResult updates.YourUpdateResult
err = updateHandle.Get(context.Background(), &updateResult)
if err != nil {
log.Fatalln("Update encountered an error", err)
}
log.Println("Update succeeded, new total: ", updateResult.Total)
}
Workflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Workflow timeouts are set when starting the Workflow Execution.
- Workflow Execution Timeout - restricts the maximum amount of time that a single Workflow Execution can be executed.
- Workflow Run Timeout: restricts the maximum amount of time that a single Workflow Run can last.
- Workflow Task Timeout: restricts the maximum amount of time that a Worker can execute a Workflow Task.
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set a timeout, and pass the instance to the ExecuteWorkflow
call.
Available timeouts are:
WorkflowExecutionTimeout
WorkflowRunTimeout
WorkflowTaskTimeout
workflowOptions := client.StartWorkflowOptions{
// ...
// Set Workflow Timeout duration
WorkflowExecutionTimeout: time.Hours * 24 * 365 * 10,
// WorkflowRunTimeout: time.Hours * 24 * 365 * 10,
// WorkflowTaskTimeout: time.Second * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Workflow retries
A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Use a Retry Policy to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
Create an instance of a RetryPolicy
from the go.temporal.io/sdk/temporal
package and provide it as the value to the RetryPolicy
field of the instance of StartWorkflowOptions
.
- Type:
RetryPolicy
- Default: None
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
workflowOptions := client.StartWorkflowOptions{
RetryPolicy: retrypolicy,
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
How to set Activity timeouts
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.
The following timeouts are available in the Activity Options.
- Schedule-To-Close Timeout: is the maximum amount of time allowed for the overall Activity Execution.
- Start-To-Close Timeout: is the maximum time allowed for a single Activity Task Execution.
- Schedule-To-Start Timeout: is the maximum amount of time that is allowed from when an Activity Task is scheduled to when a Worker starts that Activity Task.
An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.
To set an Activity Timeout in Go, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the Activity Timeout field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
Available timeouts are:
StartToCloseTimeout
ScheduleToClose
ScheduleToStartTimeout
activityoptions := workflow.ActivityOptions{
// Set Activity Timeout duration
ScheduleToCloseTimeout: 10 * time.Second,
// StartToCloseTimeout: 10 * time.Second,
// ScheduleToStartTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
How to set an Activity Retry Policy
A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Activity Executions are automatically associated with a default Retry Policy if a custom one is not provided.
To set a RetryPolicy, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
- Type:
RetryPolicy
- Default:
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100, // 100 * InitialInterval
MaximumAttempts: 0, // Unlimited
NonRetryableErrorTypes: []string, // empty
}
Providing a Retry Policy here is a customization, and overwrites individual Field defaults.
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
activityoptions := workflow.ActivityOptions{
RetryPolicy: retrypolicy,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
How to Heartbeat an Activity
An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Service. Each Heartbeat informs the Temporal Service that the Activity Execution is making progress and the Worker has not crashed. If the Temporal Service does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Temporal Service—they may be throttled by the Worker.
Activity Cancellations are delivered to Activities from the Temporal Service when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain a details
field describing the Activity's current progress.
If an Activity gets retried, the Activity can access the details
from the last Heartbeat that was sent to the Temporal Service.
To Heartbeat in an Activity in Go, use the RecordHeartbeat
API.
import (
// ...
"go.temporal.io/sdk/workflow"
// ...
)
func YourActivityDefinition(ctx, YourActivityDefinitionParam) (YourActivityDefinitionResult, error) {
// ...
activity.RecordHeartbeat(ctx, details)
// ...
}
When an Activity Task Execution times out due to a missed Heartbeat, the last value of the details
variable above is returned to the calling Workflow in the details
field of TimeoutError
with TimeoutType
set to Heartbeat
.
You can also Heartbeat an Activity from an external source:
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Record heartbeat.
err := temporalClient.RecordActivityHeartbeat(ctx, taskToken, details)
The parameters of the RecordActivityHeartbeat
function are:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.details
: The serializable payload containing progress information.
If an Activity Execution Heartbeats its progress before it failed, the retry attempt will have access to the progress information, so that the Activity Execution can resume from the failed state. Here's an example of how this can be implemented:
func SampleActivity(ctx context.Context, inputArg InputParams) error {
startIdx := inputArg.StartIndex
if activity.HasHeartbeatDetails(ctx) {
// Recover from finished progress.
var finishedIndex int
if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil {
startIdx = finishedIndex + 1 // Start from next one.
}
}
// Normal Activity logic...
for i:=startIdx; i<inputArg.EndIdx; i++ {
// Code for processing item i goes here...
activity.RecordHeartbeat(ctx, i) // Report progress.
}
}
How to set a Heartbeat Timeout
A Heartbeat Timeout works in conjunction with Activity Heartbeats.
To set a Heartbeat Timeout, Create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
activityoptions := workflow.ActivityOptions{
HeartbeatTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
How to asynchronously complete an Activity
Asynchronous Activity Completion enables the Activity Function to return without the Activity Execution completing.
There are three steps to follow:
-
The Activity provides the external system with identifying information needed to complete the Activity Execution. Identifying information can be a Task Token, or a combination of Namespace, Workflow Id, and Activity Id.
-
The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
-
The Temporal Client is used to Heartbeat and complete the Activity.
-
Provide the external system with a Task Token to complete the Activity Execution. To do this, use the
GetInfo()
API from thego.temporal.io/sdk/activity
package.
// Retrieve the Activity information needed to asynchronously complete the Activity.
activityInfo := activity.GetInfo(ctx)
taskToken := activityInfo.TaskToken
// Send the taskToken to the external service that will complete the Activity.
- Return an
activity.ErrResultPending
error to indicate that the Activity is completing asynchronously.
return "", activity.ErrResultPending
- Use the Temporal Client to complete the Activity using the Task Token.
// Instantiate a Temporal service client.
// The same client can be used to complete or fail any number of Activities.
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Complete the Activity.
temporalClient.CompleteActivity(context.Background(), taskToken, result, nil)
The following are the parameters of the CompleteActivity
function:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.result
: The return value to record for the Activity. The type of this value must match the type of the return value declared by the Activity function.err
: The error code to return if the Activity terminates with an error.
If error
is not null, the value of the result
field is ignored.
To fail the Activity, you would do the following:
// Fail the Activity.
client.CompleteActivity(context.Background(), taskToken, nil, err)
How to start a Child Workflow Execution
A Child Workflow Execution is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.
When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiated, ChildWorkflowExecutionStarted, ChildWorkflowExecutionCompleted, etc...) are logged in the Workflow Execution Event History.
Always block progress until the ChildWorkflowExecutionStarted Event is logged to the Event History to ensure the Child Workflow Execution has started. After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close Policy set in the Child Workflow Options.
To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.
Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.
To spawn a Child Workflow Execution in Go, use the ExecuteChildWorkflow
API, which is available from the go.temporal.io/sdk/workflow
package.
The ExecuteChildWorkflow
call requires an instance of workflow.Context
, with an instance of workflow.ChildWorkflowOptions
applied to it, the Workflow Type, and any parameters that should be passed to the Child Workflow Execution.
workflow.ChildWorkflowOptions
contain the same fields as client.StartWorkflowOptions
.
Workflow Option fields automatically inherit their values from the Parent Workflow Options if they are not explicitly set.
If a custom WorkflowID
is not set, one is generated when the Child Workflow Execution is spawned.
Use the WithChildOptions
API to apply Child Workflow Options to the instance of workflow.Context
.
The ExecuteChildWorkflow
call returns an instance of a ChildWorkflowFuture
.
Call the .Get()
method on the instance of ChildWorkflowFuture
to wait for the result.
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
var result ChildResp
err := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{}).Get(ctx, &result)
if err != nil {
// ...
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
To asynchronously spawn a Child Workflow Execution, the Child Workflow must have an "Abandon" Parent Close Policy set in the Child Workflow Options.
Additionally, the Parent Workflow Execution must wait for the ChildWorkflowExecutionStarted
Event to appear in its Event History before it completes.
If the Parent makes the ExecuteChildWorkflow
call and then immediately completes, the Child Workflow Execution does not spawn.
To be sure that the Child Workflow Execution has started, first call the GetChildWorkflowExecution
method on the instance of the ChildWorkflowFuture
, which will return a different Future.
Then call the Get()
method on that Future, which is what will wait until the Child Workflow Execution has spawned.
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// Wait for the Child Workflow Execution to spawn
var childWE workflow.Execution
if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWE); err != nil {
return err
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
How to set a Parent Close Policy
A Parent Close Policy determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).
The default Parent Close Policy option is set to terminate the Child Workflow Execution.
In Go, a Parent Close Policy is set on the ParentClosePolicy
field of an instance of workflow.ChildWorkflowOptions
.
The possible values can be obtained from the go.temporal.io/api/enums/v1
package.
PARENT_CLOSE_POLICY_ABANDON
PARENT_CLOSE_POLICY_TERMINATE
PARENT_CLOSE_POLICY_REQUEST_CANCEL
The Child Workflow Options are then applied to the instance of workflow.Context
by using the WithChildOptions
API, which is then passed to the ExecuteChildWorkflow()
call.
- Type:
ParentClosePolicy
- Default:
PARENT_CLOSE_POLICY_TERMINATE
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
// ...
childWorkflowOptions := workflow.ChildWorkflowOptions{
// ...
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// ...
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
How to Continue-As-New
Continue-As-New enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.
To cause a Workflow Execution to Continue-As-New, the Workflow API should return the result of the NewContinueAsNewError()
function available from the go.temporal.io/sdk/workflow
package.
func SimpleWorkflow(ctx workflow.Context, value string) error {
...
return workflow.NewContinueAsNewError(ctx, SimpleWorkflow, value)
}
To check whether a Workflow Execution was spawned as a result of Continue-As-New, you can check if workflow.GetInfo(ctx).ContinuedExecutionRunID
is not empty (i.e. ""
).
Notes
- To prevent Signal loss, be sure to perform an asynchronous drain on the Signal channel. Failure to do so can result in buffered Signals being ignored and lost.
- Make sure that the previous Workflow and the Continue-As-New Workflow are referenced by the same alias. Failure to do so can cause the Workflow to Continue-As-New on an entirely different Workflow.
What is a Timer?
A Workflow can set a durable timer for a fixed time period.
In some SDKs, the function is called sleep()
, and in others, it's called timer()
.
A Workflow can sleep for months.
Timers are persisted, so even if your Worker or Temporal Service is down when the time period completes, as soon as your Worker and Temporal Service are back up, the sleep()
call will resolve and your code will continue executing.
Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.
To set a Timer in Go, use the NewTimer()
function and pass the duration you want to wait before continuing.
timer := workflow.NewTimer(timerCtx, duration)
To set a sleep duration in Go, use the sleep()
function and pass the duration you want to wait before continuing.
A zero or negative sleep duration causes the function to return immediately.
sleep = workflow.Sleep(ctx, 10*time.Second)
For more information, see the Timer example in the Go Samples repository.
How to Schedule a Workflow
Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes
Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.
How to create a Schedule in Go
Schedules are initiated with the create
call.
The user generates a unique Schedule ID for each new Schedule.
To create a Schedule in Go, use Create()
on the Client.
Schedules must be initialized with a Schedule ID, Spec, and Action in client.ScheduleOptions{}
.
View the source code
in the context of the rest of the application code.
func main() {
// ...
scheduleID := "schedule_id"
workflowID := "schedule_workflow_id"
// Create the schedule.
scheduleHandle, err := temporalClient.ScheduleClient().Create(ctx, client.ScheduleOptions{
ID: scheduleID,
Spec: client.ScheduleSpec{},
Action: &client.ScheduleWorkflowAction{
ID: workflowID,
Workflow: schedule.ScheduleWorkflow,
TaskQueue: "schedule",
},
})
// ...
}
// ...
How to backfill a Schedule in Go
Backfilling a Schedule executes Workflow Tasks ahead of the Schedule's specified time range. This is useful for executing a missed or delayed Action, or for testing the Workflow ahead of time.
To backfill a Schedule in Go, use Backfill()
on ScheduleHandle
.
Specify the start and end times to execute the Workflow, along with the overlap policy.
View the source code
in the context of the rest of the application code.
func main() {
// ...
err = scheduleHandle.Backfill(ctx, client.ScheduleBackfillOptions{
Backfill: []client.ScheduleBackfill{
{
Start: now.Add(-4 * time.Minute),
End: now.Add(-2 * time.Minute),
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
{
Start: now.Add(-2 * time.Minute),
End: now,
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
},
})
if err != nil {
log.Fatalln("Unable to Backfill Schedule", err)
}
// ...
}
// ...
How to delete a Schedule in Go
Deleting a Schedule erases a Schedule. Deletion does not affect any Workflows started by the Schedule.
To delete a Schedule, use Delete()
on the ScheduleHandle
.
View the source code
in the context of the rest of the application code.
func main() {
// ...
defer func() {
log.Println("Deleting schedule", "ScheduleID", scheduleHandle.GetID())
err = scheduleHandle.Delete(ctx)
if err != nil {
log.Fatalln("Unable to delete schedule", err)
}
}()
// ...
How to describe a Schedule in Go
Describe
retrieves information about the current Schedule configuration.
This can include details about the Schedule Spec (such as Intervals), CronExpressions, and Schedule State.
To describe a Schedule, use Describe()
on the ScheduleHandle.
View the source code
in the context of the rest of the application code.
func main() {
// ...
scheduleHandle.Describe(ctx)
// ...
How to list a Schedule in Go
The List
action returns all available Schedules and their respective Schedule IDs.
To return information on all Schedules, use ScheduleClient.List()
.
View the source code
in the context of the rest of the application code.
func main() {
// ...
listView, _ := temporalClient.ScheduleClient().List(ctx, client.ScheduleListOptions{
PageSize: 1,
})
for listView.HasNext() {
log.Println(listView.Next())
}
// ...
How to pause a Schedule in Go
Pause
and Unpause
enable the start or stop of all future Workflow Runs on a given Schedule.
Pausing a Schedule halts all future Workflow Runs.
Pausing can be enabled by setting State.Paused
to true
, or by using Pause()
on the ScheduleHandle.
Unpausing a Schedule allows the Workflow to execute as planned.
To unpause a Schedule, use Unpause()
on ScheduleHandle
.
View the source code
in the context of the rest of the application code.
func main() {
// ...
err = scheduleHandle.Pause(ctx, client.SchedulePauseOptions{
Note: "The Schedule has been paused.",
})
// ...
err = scheduleHandle.Unpause(ctx, client.ScheduleUnpauseOptions{
Note: "The Schedule has been unpaused.",
})
How to trigger a Schedule in Go
Triggering a Schedule immediately executes an Action defined in that Schedule.
By default, trigger
is subject to the Overlap Policy.
To trigger a Scheduled Workflow Execution, use trigger()
on ScheduleHandle
.
View the source code
in the context of the rest of the application code.
func main() {
// ...
for i := 0; i < 5; i++ {
scheduleHandle.Trigger(ctx, client.ScheduleTriggerOptions{
Overlap: enums.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
})
time.Sleep(2 * time.Second)
}
// ...
How to update a Schedule in Go
Updating a Schedule changes the configuration of an existing Schedule. These changes can be made to Workflow Actions, Action parameters, Memos, and the Workflow's Cancellation Policy.
Use Update()
on the ScheduleHandle to modify a Schedule.
View the source code
in the context of the rest of the application code.
func main() {
// ...
updateSchedule := func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
}
_ = scheduleHandle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: updateSchedule,
})
}
// ...
How to use Temporal Cron Jobs
A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the CronSchedule
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: None
workflowOptions := client.StartWorkflowOptions{
CronSchedule: "15 8 * * *",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Side Effects
Side Effects are used to execute non-deterministic code, such as generating a UUID or a random number, without compromising deterministic in the Workflow. This is done by storing the non-deterministic results of the Side Effect into the Workflow Event History.
A Side Effect does not re-execute during a Replay. Instead, it returns the recorded result from the Workflow Execution Event History.
Side Effects should not fail. An exception that is thrown from the Side Effect causes failure and retry of the current Workflow Task.
An Activity or a Local Activity may also be used instead of a Side effect, as its result is also persisted in Workflow Execution History.
You shouldn’t modify the Workflow state inside a Side Effect function, because it is not reexecuted during Replay. Side Effect function should be used to return a value.
Use the SideEffect
function from the go.temporal.io/sdk/workflow
package to execute a Side Effect directly in your Workflow.
Pass it an instance of context.Context
and the function to execute.
The SideEffect
API returns a Future, an instance of converter.EncodedValue
.
Use the Get
method on the Future to retrieve the result of the Side Effect.
Correct implementation
The following example demonstrates the correct way to use SideEffect
:
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
// ...
}
Incorrect implementation
The following example demonstrates how NOT to use SideEffect
:
// Warning: This is an incorrect example.
// This code is non-deterministic.
var random int
workflow.SideEffect(func(ctx workflow.Context) interface{} {
random = rand.Intn(100)
return nil
})
// random will always be 0 in replay, so this code is non-deterministic.
On replay the provided function is not executed, the random number will always be 0, and the Workflow Execution could take a different path, breaking determinism.