Serverless Data Processing with AWS Step Functions, Part II

Jun Fritz

Back in Part I of Deploying a Serverless Data Processing Workflow with AWS Step Functions, Nuatu mentioned one key benefit of using step functions is their visibility into business critical workflows. Outside stakeholders, support staff, and other engineers can look at a state machine execution in AWS or Stackery, and can easily understand the process.

In most cases, teams that seek out the orchestration that state machines provide will also benefit from notifications before, during, or after an execution.

In our payroll processing example, we want to be notified when a state machine execution succeeds, and when it fails. One way to achieve this is by using AWS SDK in each of our functions to publish a message to the appropriate SNS topic when it completes its task.

But for this stack, we’ll take advantage of AWS Step Functions ability to integrate with AWS Simple Notification Service directly in our state machine.

Let’s get started.

The following example code is branched off of payroll-processing-1 and can be found in the payroll-processing-2 branch of our state-machine-examples repo.

The first thing we’ll do is configure two SNS topic resources, one with the Logical ID SuccessNotifications, and the other ErrorNotifications.

To set myself as a subscriber to each SNS topic, I’ll configure an Anything Resource with AWS::SNS::Subscription definitions.

# Subscribes jun@stackery.io to the ErrorNotifications topic ErrorSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref ErrorNotifications Endpoint: jun@stackery.io Protocol: email # Subscribes jun@stackery.io to the SuccessNotifications topic SuccessSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref SuccessNotifications Endpoint: jun@stackery.io Protocol: email

Each subscription is configured with the TopicArn of the associate SNS topic, a protocol (in this case email), and an endpoint. When configured in Stackery, the !Ref {Topic} reference value for each TopicArn will be depicted as a service discovery wire (dashed line).

Now that we have our topics and subscribers configured, we’ll update our state machine to publish messages to them when an execution succeeds or fails.

As mentioned before, AWS Step Functions allows us to configure tasks that publish messages to SNS topics. We’ll need to define a new task for each execution result (success or failure), and update our state machine execution role to have sns publishing permissions.

StatesExecutionRole: ... Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - lambda:InvokeFunction - sns:Publish Resource: '*' MyStateMachine: Type: AWS::StepFunctions::StateMachine Properties: DefinitionString: !Sub - |- { "Comment": "Payroll processing example.", "StartAt": "processData", "States": { ... "generateReport": { "Type": "Task", "Resource": "${Lambda3}", "Next": "successNotification", "Comment": "Third/final task." }, "successNotification": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${Topic1}", "Message": "Payroll process complete." }, "End": true } } } - Lambda1: !GetAtt processData.Arn Lambda2: !GetAtt generatePaystubs.Arn Lambda3: !GetAtt generateReport.Arn Topic1: !Ref SuccessNotifications RoleArn: !GetAtt StatesExecutionRole.Arn

In the code above, we’ve added the sns:Publish policy to our StatesExecutionRole configuration to allow our state machine to publish to SNS. In our generateReport task definition, we previously had the property “End” set to true, which signifies the end of a state machine execution. We’ve replace “End” with a “Next” property, with a value of “successNotification”, the SNS task defined after it.

successNotification is a state machine task that utilizes SNS (arn:aws:states:::sns:publish) as a resource instead of a Lambda function like the other tasks. Since this integration is used to publish messages, the Parameters properties consist of the topic to publish to, and the message its subscribers will receive.

We’ve taken the same approach as our Lambda functions and defined Topic1 with a value of !Ref SuccessNotifications, which resolves to the Amazon Resource Name of SuccessNotifications.

Visualizing the current stack and state machine

When we deploy this stack, email endpoints defined in ErrorSubscription & SuccessSubscription will receive a subscription verification email, and after selecting 'Confirm Subscription', you’ll be redirected to a confirmation page. Be sure to confirm your subscription before starting a new state machine execution.

Now, when we upload a file to our rawData S3 bucket, our state machine will execute the tasks as before, but rather than ending on generateReport, we see the final task successNoticication being run, and an email will be sent to jun@stackery.io with the following email.

Now that we get notified on successful executions, we'll configure our state machine to send notifications when the process fails. We'll enable error-handling on each task and construct the error messages sent to our subscribers using the ErrorNotifications topic.

MyStateMachine: Type: AWS::StepFunctions::StateMachine Properties: DefinitionString: !Sub - |- { "Comment": "Payroll processing example.", "StartAt": "processData", "States": { "processData": { ... "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "errorNotification" } ] }, "generatePaystubs": { ... "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "errorNotification" } ] }, "generateReport": { ... "Retry": [ { "ErrorEquals": [ "States.Timeout" ], "IntervalSeconds": 2, "MaxAttempts": 2 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "errorNotification" } ] }, "successNotification": { ... }, "errorNotification": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${Topic2}", "Subject": "[ERROR]: Task failed", "Message.$": "$.Cause" }, "End": true } } } - Lambda1: !GetAtt processData.Arn Lambda2: !GetAtt generatePaystubs.Arn Lambda3: !GetAtt generateReport.Arn Topic1: !Ref SuccessNotifications Topic2: !Ref ErrorNotifications RoleArn: !GetAtt StatesExecutionRole.Arn

It may look like a lot, but the additions are pretty straightforward. Each task now has a Catch property that defines an ErrorEquals and Next value. If an error occurs in a task, and it matches a predefined error type or custom error type, the task output (which is now an error message) will be redirected to the task defined in Next. So for all tasks, if any error occurs, our state machine will catch that error, and pass it along to the errorNotification task.

I also wanted to squeeze in AWS Step Functions retry functionality for the generateReport task. It consists of the error type to retry the task on, how long to wait in between retries, and the maximum retry attempts. Retries are useful for tasks that you're aware of that may be a bit flaky, but in this example, if the function times out, we'll retry up to 2 times, and if it fails a third, it will redirect to the error catching process defined below it.

You'll notice errorNotification is very similar to successNotification but the message value uses an AWS Step Functions input reference path. We're expecting an input with a Cause key and using that as the message body.

We'll add the following code to the end of our generateReport Lambda function, then deploy the stack to test out our error notifications.

exports.handler = async (event) => { ... // return event; throw new Error(`${event.completedTask} task failed...`); };

If you're following along, be sure to have confirmed a subscription to the ErrorNotifications SNS topic we created earlier.

As we did before, we'll add a new file to the rawData S3 bucket to trigger our state machine, but instead of a success notification, we'll recieve the following email, and our state machine execution in the Step Functions dashboard will illustrate the following

Our state machine will now notify the appropriate team members on when our payroll process ends successfully, or fails throughout any of the tasks.

Here's what the final workflow looks like in Stackery:

Thanks for hanging in there! Nice work. Be sure to check out example-state-definitions/choice.json and example-state-definitions/parallel.json in the state-machine-examples repo for additional state machine templates.

Want to keep building cool stuff with serverless using AWS' growing menu of cloud services + Stackery? Be sure to check out these other tutorials on our docs site.

Related posts

Deploying a serverless data processing workflow with AWS Step Functions
Tutorials & GuidesDeploying a serverless data processing workflow with AWS Step Functions

© 2022 Stackery. All rights reserved.