How to pause a Step Function while waiting on a response from a third party API

3 August 2022

In this post we explore how you can pause a Step Function, using Task Tokens, while waiting on an external process to be completed

When building a serverless application we have found AWS Step Functions to be an excellent tool for orchestrating events and services. Step Functions give us a visual way of creating systems which are resilient, observable and simple.

The Problem

Recently we had a feature in which we were required to send a series of requests to a third party API before notifying the user of success and storing some metadata in DynamoDB.

We immediately thought that a Step Function would be the most intuitive and maintainable way of doing this as it allowed us to visualise the process and break down the problem into small and simple steps. However, some of the API calls would not return a response immediately and instead the result would be sent to a webhook listener when the processing was completed. The challenge we faced was how do we tell our step function to wait on a process being completed by a third party?

The Solution - Task Tokens

The Theory

Using Task Tokens we can pause a Step Function task indefinitely until the task token is returned. We may do this if the task needs to wait on human approval or interact with a third party.

If a task in your state machine definition is set as a callback task using .waitForTaskToken a Task Token will be generated for that task and placed in the context object of the Step Function when the task is started.

It should be noted that Task Tokens are region and account specific. They are valid up to 1 year however a new task token will be regenerated automatically if the task state using the callback task token times out.

The Implementation

1. Creating the task which will be paused

The first order of business is to create the task that will wait on the task token. As mentioned previously we have to mark the task as a callback task.

This can be done by checking the "Wait for callback" option inside workflow studio for your selected task.

Workflow studio wait for task token

This simply appends .waitForTaskToken to the resource of the state definition in the Step Function's ASL file like so:

"States": {
    "My Lambda Task To Pause": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      // more fields below
    }
  }

2. Accessing the Task Token in the created task

In order to access the task token inside a lambda we have to take it from the context object and pass it in with our input as the lambda payload. The state definition will look as follows:

"States": {
	"My Lambda Task To Pause": {
		"Type": "Task",
		"Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
		"Parameters": {
        "Payload": {
          "input.$": "$",
          "taskToken.$": "$$.Task.Token"
        },
        "FunctionName": "The Lambda Function Name"
      },
	  // more fields below
	}
}

For example our corresponding lambda handler's definition in typescript would look like this:

interface Event {
  input: InputBodyType;
  taskToken: string;
}

const handler = async (event: Event): Promise<void> => {
  // Do something with the input and task token
}

my-lambda/src/index.ts

3. Resume the Step Function

You should now be able to start an execution and get the Step Function to pause at the correct task. Next you have to resume your Step Function. There are two functions which do this:

SendTaskSuccess

  • This will resume the Step Function in the normal flow.
  • The output (JSON string) specified when calling the SendTaskSuccess function will be the output of the step that was paused and therefore can be passed into the next step.

Code example:

import {StepFunctions} from "aws-sdk";

const region = "your-region"
const stepFunctions = new StepFunctions({
  region
});

export async function sendTaskSuccess(taskToken: string, output: object): Promise<void> {
  await stepFunctions.sendTaskSuccess({
    taskToken,
    output: JSON.stringify({
      Payload: payload
    })
  }).promise();
}

//usage 
await sendTaskSuccess("the-task-token", {foo: 1, bar: "two"});

StepFunctions.ts

You can also test this using the AWS CLI which we found to be very helpful when creating our flow without having the webhook listener completed.

aws stepfunctions send-task-failure \ 
		--task-token YOUR_TASK_TOKEN \
		--task-output '{"foo": 1, "bar": "two"}'

SendTaskFailure

  • This will resume the Step Function, however it will go to fallback state specified for the task.
  • You have to specify an error and cause which will be added to the output of the task.
export async function sendTaskFailure(taskToken: string, error: string, cause: string): Promise<void> {
  await stepFunctions.sendTaskFailure({
    taskToken,
    error,
    cause
  }).promise();
}

//usage 
await sendTaskFailure("the-task-token", "DummyError", "Something went wrong...");

StepFunctions.ts

Once again you can also test this using the AWS CLI.

aws stepfunctions send-task-success \ 
		--task-token YOUR_TASK_TOKEN \
		--error "DummyError" \
		--cause "Something went wrong..."

4. Integrating with a webhook listener

Now to put all the pieces together…

In order to notify the Step Function to resume we need to be able to get the task token from inside the webhook listener. We decided to store the task token in DynamoDB and then fetch it again when the webhhook is hit. Using DynamoDB as a cache for the task token worked out well as we were already writing data to DynamoDB related to the process being carried out. This will only work if the body of the webhook response has something that you can use to find the correct table row with your task token.

interface Event {
  input: InputBodyType;
  taskToken: string;
}

const handler = async (event: Event): Promise<void> => {
  // Do something with the input
  await dynamoDBDocumentClient.put({
    TableName: "table",
    Item: {
      partitionKey: PREFIX + event.input.id,
      sortKey: PREFIX,
      taskToken
    }
  });
};

my-lambda/src/index.ts

interface BaseEventStringParams {
  id: string;
}

interface SuccessEventParams extends BaseEventStringParams {
  status: "SUCCESS"
}

interface FailureEventParams extends BaseEventStringParams {
  status: "FAILURE";
  error: string;
  cause: string;
}

interface Event {
  queryStringParameters: SuccessEventParams | FailureEventParams
}

const webhookListener = async ({queryStringParameters} : Event): Promise<void> {
  const {taskToken} = await dynamoDBDocumentClient.get({
    TableName: "table",
    Key: {
      partitionKey: PREFIX + queryStringParameters.id,
      sortKey: PREFIX
    }
  }).promise();
  
  if (queryStringParameters.status === "SUCCESS") {
    await sendTaskSuccess(taskToken, {status: "SUCCESSFUL"});
  } else {
    await sendTaskfailure(taskToken, queryStringParameters.error, queryStringParameters.error.cause)
  }
}

my-webhook-listener/src/index.ts

Conclusion

We felt for our use case that using a webhook and Task Tokens was the most idiomatic and event driven way of pausing a Step Function execution, however one size does not fit all. If the task token approach isn't going to work for you, a possible alternative could be to create a Step Function (with an express workflow) which polls for the status of your process. You can then nest this Step Function inside your main one and save on costs of state transitions while still maintaining the visual benefits of a standard workflow. You can find more information on standard vs express workflows here.

Thanks for reading and feel free to browse some more of our cloud related insights and if you want to work on some serverless projects like this, please see our careers page.

Article By
Gravatar for caleb.wilson@instil.co

Caleb Wilson

Apprentice Software Engineer