I use AWS Step Functions
and have the following workflow
initStep - It's a lambda function handler, that gets some data and sends it to SQS
for external service.
activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')
def lambda_handler(event, context):
event['my_activity'] = activity
data = json.dumps(event)
# Retrieving a queue by its name
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)
queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))
return event
validationWaiting - It's an activity
that waits for an answer from the external service that include the data.
complete - It's a lambda function handler, that uses the data from the initStep
.
def lambda_handler(event, context):
email = event['email'] if 'email' in event else None
data = event['data'] if 'data' in event else None
client = boto3.client(service_name='ses')
to = email.split(', ')
message_conrainer = {'Subject': {'Data': 'Email from step functions'},
'Body': {'Html': {
'Charset': "UTF-8",
'Data': """<html><body>
<p>""" + data """</p>
</body> </html> """
}}}
destination = {'ToAddresses': to,
'CcAddresses': [],
'BccAddresses': []}
return client.send_email(Source=from_addresses,
Destination=destination,
Message=message_container)
It does work, but the problem is that I'm sending full data from the initStep
to external service, just to pass it later to complete
. Potentially more steps can be added.
I believe it would be better to share it as some sort of global data (of current step function), that way I could add or remove steps and data would still be available for all.
You can make use of InputPath
and ResultPath
. In initStep
you would only send necessary data to external service (probably along with some unique identifier of Execution). In the ValidaitonWaiting
step you can set following properties (in State Machine definition):
InputPath
: What data will be provided to GetActivityTask
. Probably you want to set it to something like $.execution_unique_id
where execution_unique_id
is field in your data that external service uses to identify Execution (to match it with specific request during initStep
).ResultPath
: Where output of ValidationWaiting Activity will be saved in data. You can set it to $.validation_output
and json result from external service will be present there.This way you can send to external service only data that is actually needed by it and you won't lose access to any data that was previously (before ValidationWaiting
step) in the input.
For example, you could have following definition of the State Machine:
{
"StartAt": "initStep",
"States": {
"initStep": {
"Type": "Pass",
"Result": {
"executionId": "some:special:id",
"data": {},
"someOtherData": {"value": "key"}
},
"Next": "ValidationWaiting"
},
"ValidationWaiting": {
"Type": "Pass",
"InputPath": "$.executionId",
"ResultPath": "$.validationOutput",
"Result": {
"validationMessages": ["a", "b"]
},
"Next": "Complete"
},
"Complete": {
"Type": "Pass",
"End": true
}
}
}
I've used Pass
states for initStep
and ValidationWaiting
to simplify the example (I haven't run it, but it should work). Result
field is specific to Pass
task and it is equivalent to the result of your Lambda functions or Activity.
In this scenario Complete
step would get following input:
{
"executionId": "some:special:id",
"data": {},
"someOtherData": {"value": key"},
"validationOutput": {
"validationMessages": ["a", "b"]
}
}
So the result of ValidationWaiting
step has been saved into validationOutput
field.