Sleeping, Signals, And Selects In Temporal Workflows

I’m making some changes to a Temporal workflow at the moment. The old workflow was a pretty standard affair: wait two minutes, check the status of a payment, then either proceed with an order or cancel it. I achieved this by defining a pretty simple workflow that had the three activities:

func MyWorkflow(ctx workflow.Context) error {
    ctx = temporal.WithDefaultActivityOptions(ctx)

    var wasPaid
    if err := workflow.ExecuteActivity(ctx, CheckPayment).Get(ctx, &wasPaid); err != nil {
        return err
    }

    if wasPaid {
        if err := workflow.ExecuteActivity(ctx, PlaceOrder).Get(ctx, nil); err != nil {
            return err
        }
    } else {
        if err := workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
            return err
        }
    }
}

To achieve the delay, I instructed Temporal to delay the start of the workflow by two minutes:

_, err := p.temporalClient.ExecuteWorkflow(
    ctx,
    client.StartWorkflowOptions{
        ID:         orderID,
        TaskQueue:  workflows.DefaultConfig.TaskQueue,
        StartDelay: 2 * time.Minute,
    },
    workflows.MyWorkflow,
    workflows.MyWorkflowInput{
        // ...
    },
)
if err != nil {
    return err
}

Being somewhat new to Temporal, this was pretty close to the limit of my knowledge. But some new requirements have come through, and I had learn a few new Temporal tricks in order to meet them. These lessons I’m documenting here for anyone else that needs to know this.

Timers: Sleeping In Workflows

What I had to do was change the workflow to accept a notification that the payment was cancelled prematurely. If that happens, I had to stop sleeping and fail the order immediately. I’ll step through the changes I made in a piecemeal fashion starting with moving the 2 minute delay into the workflow itself.

Sleeping in a workflow can be achieved by using Durable Timers, of which two different ways of sleeping are available. The first is effectively a sleep call which pauses the workflow for a given amount of time, similar to what time.Sleep() does in Go. This wasn’t exactly useful to me, as I couldn’t find a way to cancel the sleep, so I opted for the second approach, which is a timer that will complete a future when the timer is fired. You’ll see why I chose this approach a little later, but for now, let’s simply move the 2 minute wait into the workflow, thereby replacing the StartDelay option in the ExecuteWorkflow call:

func MyWorkflow(ctx workflow.Context) error {
    ctx = temporal.WithDefaultActivityOptions(ctx)

    waitTimeFuture := workflow.NewTimer(ctx, 2 * time.Minute)
    waitTimeFuture.Get(ctx, nil)

    var wasPaid
    if err := workflow.ExecuteActivity(ctx, CheckPayment).Get(ctx, &wasPaid); err != nil {
        return err
    }

    if wasPaid {
        if err := workflow.ExecuteActivity(ctx, PlaceOrder).Get(ctx, nil); err != nil {
            return err
        }
    } else {
        if err := workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
            return err
        }
    }
}

Signals: Messaging Running Workflows

The next part was sending the cancellation event to the workflow. This can be achieved using Signal Channels. The workflow can get access to the signal channel, and block until a signal is received by calling Receive. This wouldn’t do for what I’m trying to achieve, but here’s an example of how this looks for the sake of completeness:

func MyWorkflow(ctx workflow.Context) error {
    ctx = temporal.WithDefaultActivityOptions(ctx)

    signalChannel := workflow.GetSignalChannel(ctx, "wasCancelled")

    var orderWasCancelledPrematurely = false

    waitTimeFuture := workflow.NewTimer(ctx, 2 * time.Minute)
    waitTimeFuture.Get(ctx, nil)

    signalChannel.Receive(ctx, &orderWasCancelledPrematurely)

    if orderWasCancelledPrematurely {
        return workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
    }

    var wasPaid
    if err := workflow.ExecuteActivity(ctx, CheckPayment).Get(ctx, &wasPaid); err != nil {
        return err
    }

    if wasPaid {
        if err := workflow.ExecuteActivity(ctx, PlaceOrder).Get(ctx, nil); err != nil {
            return err
        }
    } else {
        if err := workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
            return err
        }
    }
}

Sending a signal is done by calling SignalWorkflow. This requires a workflow ID, so coming up with a way of identifying workflows using a method based on what they’re operating on is probably a good idea. In this example I’m simply using the order ID, but it may be prudent to namespace the workflow or signal channel names in some way based on the workflow type: I think the signal channels names are scoped globally, so there’s a potential for confusion.

p.temporalClient.SignalWorkflow(ctx, orderID, "", "wasCancelled", true)

The call returns an error, and I think, although I haven’t confirmed this yet, that a serviceerror.NotFound will be returned if there’s no workflow of that ID currently running. If it doesn’t matter to you whether the workflow is running or not when you send the signal, this might be the error type to ignore.

One nice thing about signals is that they’re recorded in the workflow history, allowing one to see them visually when monitoring the execution of a workflow run.

Selector: Adding Races

The last part is to hold execution of a workflow until either the timer times-out, or the cancellation message comes through. This can be achieved using selectors. This works much like Go’s select statement in that a series of parallel execution threads can be monitored and execution held until one of them fires.

Here’s we use a select to wait for the timer future, or until a signal channel receives a message. The callback associated with each select case will be executed when the associated condition is met, which we’re using here to set the value of a boolean indicating whether we can simply cancel the order and exit the workflow. We then call Select to pause execution until one of them fires:

func MyWorkflow(ctx workflow.Context) error {
    ctx = temporal.WithDefaultActivityOptions(ctx)

    waitSelector := workflow.NewSelector(ctx)

    var orderWasCancelledPrematurely = false

    waitTimeFuture := workflow.NewTimer(ctx, 2 * time.Minute)
    waitSelector.AddFuture(waitTimeFuture, func(workflow.Future) {})

    signalChannel := workflow.GetSignalChannel(ctx, "wasCancelled")
    waitSelector.AddReceive(signalChannel, func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderWasCancelledPrematurely)
    })

    waitSelector.Select(ctx)

    if orderWasCancelledPrematurely {
        return workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
    }

    var wasPaid
    if err := workflow.ExecuteActivity(ctx, CheckPayment).Get(ctx, &wasPaid); err != nil {
        return err
    }

    if wasPaid {
        if err := workflow.ExecuteActivity(ctx, PlaceOrder).Get(ctx, nil); err != nil {
            return err
        }
    } else {
        if err := workflow.ExecuteActivity(ctx, CancelOrder).Get(ctx, nil); err != nil {
            return err
        }
    }
}

At this point we’re done. The workflow still waits 2 minutes before checking the payment, but now it can be interrupted to cancel the order immediately.

The End

The people who introduce Temporal to our team swear by it and used it for everything in their previous jobs. I can see now why that think this: it would’ve been a nightmare to code this up using AWS Step Functions, which is the workflow technology I have the most experience in. I have much to learn about Temporal — I still don’t know what’s possible or appropriate in workflows-as-code constructs like this — but I can see the potential for good things here.

Go