AWS Steps Function to manage split-process-aggregate workflow.
Photo by Lindsay Henwood on Unsplash

Dynamic Concurrency with AWS Step Functions & AWS Batch to Orchestrate Split-Process-Aggregate Pattern

Kunal Patil
9 min readApr 22, 2021

--

This article describes the way I handled “Split file-Process-Aggregate file”(or scatter-gather) style dataflow using AWS Step Functions and AWS Batch services with concurrent execution of the Processing step.

Use Case

My use case is comprised of three main steps-

  1. Splitter: Downloads a CSV file from an S3 bucket and splits the CSV file in smaller sized CSV files.
  2. Processor: Process each CSV file concurrently. Each process shall convert the input CSV file to a JSON document.
  3. Aggregator: Aggregates all JSON documents into one JSON document.

My Learning Goals

My goal was to learn the following from this exercise-

  1. How to use Apache Camel for splitting up a CSV file in smaller files.
  2. How to use Apache Camel to aggregate a set of JSON files into one.
  3. How to configure AWS Step Functions to manage the steps of this workflow using AWS Batch, so that the Processing step can be run concurrently. I also wanted to ensure that the aggregator step starts only once all the processing child-steps complete their execution.

The Solution Architecture looks like this-

Solution Architecture for Split-Process-Aggregate Flow

Requirements, Constraints & Scope

  1. Use Apache Camel with Spring Batch to limit the code to a minimum.
  2. Use AWS Batch to run each step of this workflow as a separate execution so that I can horizontally scale-up the Processing step independently.
  3. Use AWS Step Functions to bring in parallelism for the Processing step to enable scaling.
  4. Use Amazon Elastic File System (EFS) to store and work on all the files.
  5. Use AWS Lambda Functions only if needed (more on this need in later part of this article).

Things that I haven’t covered in this exercise:

  1. Use of a persistent data source for storing Spring Batch Metadata.
  2. Handle failures and retries in AWS Step Functions.
  3. Tackle failures and restarts in Spring Batch.

Solution Details

I have divided this solution in two parts-

  1. Part-I: AWS Infrastructure Setup
  2. Part-II: Batch Application Development

Part-I AWS Infrastructure Setup

This solution uses AWS Batch, AWS Step Functions and AWS Lambda to define the workflow. Let’s look at each one of them in detail-

1. AWS Batch

As stated earlier, each step — Split, Process and Aggregate — is a AWS Batch Job. Each one has it’s own Job Definition but they share the Job Queue and Compute Environment. These batch jobs are of FarGate type. In order keep this article focused on using these AWS Batch jobs in a Step Function, I am intentionally skipping the batch setup steps. You can refer to AWS Batch documentation for more details. The key points worth highlighting in this article are-

  • In All the AWS Batch Job Definitions, mount the same EFS volume, so that all the batches get access to the input, intermediate and output files.
Mount EFS in AWS Batch Job Definitions
  • Activate Spring Profile using AWS Batch Job Definition’s Environment Variables, as shown below. This way the Spring Batch applications running as AWS Batch will be able to load AWS specific Spring property file.
Activate Spring Profile Using AWS Batch Job Definition Environment Variables
  • In the Processor Batch Job Definition— configure Command and Parameters to pass the fileName as a program argument. fileNameis the name of the file on which the Process batch should operate. This configuration is detailed in the Part-2 section below.

All these AWS Batches share the same Job Queue and Compute Environments for the sake of simplicity and they are pretty standard configurations. Here are the complete AWS Batch Job Definitions, for reference:

AWS Batch Job Definitions

2. AWS Lambda Function

Here is the code of GetFileList Lambda Function—

import json
from os import listdir
from os.path import isfile, join
def lambda_handler(event, context):
path = '/mnt/efs/out/'

jsonFiles = [f for f in listdir(path) if isfile(join(path, f))]

print(jsonFiles)

return {
'statusCode': 200,
'body': jsonFiles
}

Output of this Lambda Function looks like-

{
"statusCode": 200,
"body": [
"customers_2.csv",
"customers_6.csv",
"customers_4.csv",
"customers_8.csv",
"customers_0.csv",
"customers_3.csv",
"customers_7.csv",
"customers_5.csv",
"customers_9.csv",
"customers_1.csv"
]
}

Ensure that the same EFS file system is mounted on the AWS Lambda Function as shown below-

AWS Lambda: File System (EFS) Configuration.

The reason why I had to use Lambda Function is described in next point.

3. AWS Step Functions- State Machine

Following is my State Machine definition in JSON & graphical formats. This state machine is configured to run multiple concurrent Process File steps. We can kick start this state machine manually using AWS Step Function console or configure it to trigger periodically or based on a S3 put event using Amazon CloudWatch Event Rules.

AWS State Machine Graph for Split-Process-Aggregate Flow
{
"StartAt": "Split File",
"States": {
"Split File": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-definition/splitter-jd:1",
"JobName": "SplitFile",
"JobQueue": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-queue/split-process-aggregate-job-queue"
},
"Next": "Get File List"
},
"Get File List": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:function:GetFileList",
"ResultPath": "$",
"Next": "Process Files"
},
"Process Files": {
"Type": "Map",
"ItemsPath": "$.body",
"MaxConcurrency": 10,
"Iterator": {
"StartAt": "Process File",
"States": {
"Process File": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-definition/processor-jd:5",
"JobName": "processor",
"JobQueue": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-queue/split-process-aggregate-job-queue",
"Parameters": {
"FileName.$": "$"
}
},
"End": true
}
}
},
"Next": "Aggregate Files"
},

"Aggregate Files": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-definition/aggregator-jd:2",
"JobName": "FileAggregator",
"JobQueue": "arn:aws:batch:us-east-1:<USE-YOUR-AWS-ACCOUNT-NUMBER>:job-queue/split-process-aggregate-job-queue"
},
"End": true
}
}
}

The key points to note here are-

  • Step function starts from theSplit File step. This step downloads the specified CSV file from S3, splits it into smaller CSV files and saves them in EFS file system.
  • In Step Functions, output of previous step is usually consumed by the subsequent steps. In my use case, after file splitter batch completes it job, I needed a way to pass the children file list to the next step in the Step Function — Process Files. However, because batch applications, by their nature, do not return values apart from a exit code, I had to use a AWS Lambda Function named Get File List which mounts the same EFS volume, fetches list of files in the specified directory and responds with a JSON array of those file names. I could have implemented the first two steps as a single Lambda Function easily; however, I had set the constraint that all the ‘business’ processing logic is a batch program.
  • The output JSON array of file names, returned by Get File List step is passed on to the next Process Files step using “ResultPath”: “$”.
  • The Process Files step is configured as a Step Function Map step.

The Map state ("Type": "Map") can be used to run a set of steps for each element of an input array. While the Parallel state executes multiple branches of steps using the same input, a Map state will execute the same steps for multiple entries of an array in the state input.

The Map state iterates over the array of file names supplied using “ItemsPath”: “$.body”. It can run the steps concurrently with maximum concurrent steps set to 10 (“MaxConcurrency”: 10). It passes value of one of the array items (the name of the file to process) to each concurrent execution. The name of the file is passed to each instance of concurrent batch as a Parameters object to AWS Batch. The complete list of AWS Batch Parameters supported by Step Functions is here.

"Parameters": {
"FileName.$": "$"
}
  • AWS Batch application of the last step — Aggregate Files — scans the output processed directory and merges all JSON documents present in the directory into a single aggregate JSON file. This step does not need the file list as input and it also does not produce any output as it’s configured as the last step using “End”: true.
  • At the end of execution, the output looks like this-
EFS File System Directory Structure Post Successful Run of the State Machine

4. Other AWS Constructs

  • IAM Roles & Policies

Following Amazon IAM Roles and Policies are needed in place to develop this solution-

  1. StepFunctions-Split-Process-Aggregate-role: This role is attached to the State Machine. Following policies are attached to it:
  • XRayAccessPolicy
  • BatchJobManagementFullAccessPolicy
  • AWSLambdaVPCAccessExecutionRole
  • AWSLambdaRole
  • AmazonElasticFileSystemClientFullAccess

2. LambdaRoleForGetFilesLambda: This role is attached to the GetFiles Lambda Function & has following policies attached to it:

  • AWSLambdaBasicExecutionRole
  • AWSLambdaVPCAccessExecutionRole
  • AmazonElasticFileSystemClientFullAccess

3. ecsTaskExecutionRole: This role is attached to all the AWS Batch Job Definitions and it has following policies:

  • AmazonEC2ContainerRegistryFullAccess
  • AmazonECSTaskExecutionRolePolicy

Part II: Batch Applications Development

I developed three Spring Batch applications-

  1. File Splitter Batch
  2. File Processing Batch
  3. File Aggregator Batch

I will only highlight the key points of each of the application. You can find the complete source code on respective GitHub repositories by clicking on the name of project listed above.

  1. File Splitter Batch

This Spring batch has two steps. Step-1 downloads a specified file from S3 Bucket using spring-cloud-starter-aws . This code is pretty standard and hence not discussed here. Step-2 uses an Apache Camel Route to Split a CSV file in smaller CSVs each with 100 records. The Camel Route for Splitting up a file looks like this-

2. File Processing Batch

This AWS Batch receives name of the file to process via program arguments. In order to pass the file name from AWS Step function to the AWS Batch and then to the Spring Batch application, it uses AWS Step Function’s Parameters feature. Then, to be able to receive this parameter in AWS Batch, put a Parameter place holder and Commandin the AWS Batch Job Definition as shown below-

Command: Ref::FileName

Parameters: Name: FileName Value: FileNameTempValue

AWS Batch: Job Definition- Command and Parameter

Value of this parameter can then be retrieved in Spring boot application using org.springframework.boot.ApplicationArguments applicationArguments; like this-

String[] sourceArgs = applicationArguments.getSourceArgs();String fileName = null;for(String arg : sourceArgs)    fileName = arg;

This application then converts the JSON file of the file name it receives, to a JSON document and stores in a specified output directory.

3. File Aggregator Batch

The File Aggregator Batch application uses following Apache Camel Route to aggregate all JSON files present in the directory specified in above step and creates single JSON file-

from("direct:start")
.noAutoStartup()
.loop(numberOfFiles)
.pollEnrich("file:" + inputFilePath + "?noop=true",fileAggregationStrategy)
.to("file:" + outputFilePath + "?fileName=customers.json") .routeId("filesAggregationRoute");
File Aggregation Apache Camel Route Configuration
Aggregation Strategy- Aggregate JSON bodies into a single JSON

Concluding Thoughts

I hope this article helps you get answers for some questions as it did for me. It was a great learning-by-hands-on experience for me to solve a common real-world use case.

AWS Step Functions’ Map type task is a great way to bring parallelism to your existing batch jobs with little to no code changes. AWS Step Functions also ensure that the Aggregator task does not start until all the child batches spun by the Map task are complete successfully. Having this orchestration done outside of program’s logic allows for greater modularity, reuse and flexibility while setting up data processing pipelines.

Finally, I must say that, Apache Camel greatly simplifies the logic for splitting and aggregating files; however, as I am not well versed with Apache Camel DSL, it took fair amount of time to condense file splitting and aggregation logic to just few lines of code. Apache Camel is a nice framework for implementing Enterprise Integration Patterns (EIP) in Java and integrates well within Spring Boot framework. I must use it often to get a hang of it, though.

I may have missed any details, hence please comment below with your questions, suggestions and I will be happy to respond.

Cheers,

Kunal

--

--