3

AWS Step Function 异步动态调用 Lambda 后汇集结果

 1 year ago
source link: https://yanbin.blog/aws-step-function-async-call-dynamic-lambdas-aggregation/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

AWS Step Function 异步动态调用 Lambda 后汇集结果

2023-06-19 | 阅读(9)

分布式计算有这么一个需求,主进程准备好输入数据,然后根据输入中某个 Items 动态调用若干计算进程,待到所有计算完成后再汇集结果。这一需求移植到 AWS 上就像是下面这样子

lambda-sync-call-1.png

但在一个 Lambda 中同步调用其他 Lambda 时就有个费时费钱的问题,虽然我们采用线程池来调用 Lambda2, 由于每个同步调用的耗时不相同, Lambda1 最终要等待最慢的那个调用结束后才能对所有结果进行聚集处理。这就是著名的“长板效应”, Lambda1 多数时候是在无谓的等待当中消耗着你的钱财。

如果把同步改成异步调用 Lambda2 的话,就要有一种通知机制,在所有的 Lambda2 完成后再触发一个 Lambda 去收集结果,比如用一致性的协调服务器,像 ZooKeeper, etcd 等,或者是能保持事物的 Redis 都行,只要每一个 Lambda2 完成后能知道自己是最后一个就能通知下一个 Lambda 去处理结果。

基于以上的需求,AWS 的 Step Function 能派上用场了,Step Function 现在支持 Map 方式调用异步动态数量的 Lambda。比如根据输入的 JSON 中某个数组项,或 S3 中的 JSON 文件中的数组项,或 S3 的目录中的对象数进行异步动态调用。

下面是使用了 AWS Step Function 的状态机及如何启动

step-function-2.png
  1. Start:状态机将会由 SQS 触发一个 Lambda来启动
  2. Calculations: Step Function 根据输入中的 items 项数异步调用动态数量的 Calculation Lambda。Calculations 外的虚线框就表示调用数是动态的
  3. Aggregation: 所有 Calculations 完成后再处理所有的计算结果
  4. Starter, Calculation 和 Aggregation 是三个 AWS Lambda 

接下来用 Terraform 和 Python 实际演示上面整个使用 Step Function 的流程,文中贴出所有代码,这将大大增加本文的篇幅。

首先是所用到的 IAM Role, 所有 Lambda 共同一个 Role, Step Function 有自己的 Role

iam_roles.tf

resource "aws_iam_role" "lambda_role" {
  name = "step_function_lambda_role"
  assume_role_policy = <<-EOF
    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            "Action": "sts:AssumeRole"
  managed_policy_arns = ["arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"]
  inline_policy {
    name = "test_inline_policy"
    policy = <<-EOF
    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            "Resource": ["${aws_sqs_queue.step_function_input.arn}"]
            "Effect": "Allow",
            "Action": [
                "states:StartExecution"
            "Resource": ["${local.step_function_arn}"]
resource "aws_iam_role" "step_function_role" {
  name               = "step-function-role"
  assume_role_policy = <<-EOF
    "Version": "2012-10-17",
    "Statement": [
        "Effect": "Allow",
        "Action": "sts:AssumeRole",
        "Principal": {
          "Service": "states.amazonaws.com"
  inline_policy {
    name = "call_lambda_policy"
    policy = <<-EOF
    "Version": "2012-10-17",
    "Statement": [
        "Action": [
          "lambda:InvokeFunction"
        "Effect": "Allow",
        "Resource": [
            "${aws_lambda_function.calculation.arn}",
            "${aws_lambda_function.aggregation.arn}"
          "Effect": "Allow",
          "Action": [
              "states:StartExecution"
          "Resource": ["${local.step_function_arn}"]

然后是三个 Lambda 的定义

lambdas.tf

data "archive_file" "lambda_package" {
  source_file = "aws_lambdas.py"
  output_path = "aws_lambdas.zip"
  type        = "zip"
resource "aws_lambda_function" "start" {
  function_name = "step_function_start"
  role          = aws_iam_role.lambda_role.arn
  memory_size = 128
  timeout = 30
  filename = data.archive_file.lambda_package.output_path
  source_code_hash = data.archive_file.lambda_package.output_base64sha256
  runtime = "python3.10"
  handler = "aws_lambdas.start"
  environment {
    variables = {
      STATE_MACHINE_ARN = aws_sfn_state_machine.demo.arn
resource "aws_lambda_event_source_mapping" "event_mapping" {
  event_source_arn = aws_sqs_queue.step_function_input.arn
  function_name = aws_lambda_function.start.function_name
  batch_size = 1
resource "aws_lambda_function" "calculation" {
  function_name = "step_function_calculation"
  role          = aws_iam_role.lambda_role.arn
  memory_size = 128
  timeout = 60
  filename = data.archive_file.lambda_package.output_path
  source_code_hash = data.archive_file.lambda_package.output_base64sha256
  runtime = "python3.10"
  handler = "aws_lambdas.calculate"
resource "aws_lambda_function" "aggregation" {
  function_name = "step_function_aggregation"
  role          = aws_iam_role.lambda_role.arn
  memory_size = 128
  timeout = 30
  filename = data.archive_file.lambda_package.output_path
  source_code_hash = data.archive_file.lambda_package.output_base64sha256
  runtime = "python3.10"
  handler = "aws_lambdas.aggregate"

三个 Lambda 的代码实现放在同一个 py 文件中,使用了不同的入口函数

aws_lambdas.py

import json
import os
import time
import boto3
import logging
logging_format = '%(asctime)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s\n'
logger = logging.getLogger()
if logger.handlers:
    logger.handlers[0].setFormatter(logging.Formatter(logging_format))
    logger.setLevel(logging.INFO)
sfn = boto3.client('stepfunctions')
def start(event, context):
    sqs_message = event['Records'][0]['body']
    logger.info('sqs message: %s', sqs_message)
    response = sfn.start_execution(
        stateMachineArn=os.getenv("STATE_MACHINE_ARN"),
        input=sqs_message,
    logger.info("start response %s", response)
def calculate(event, context):
    logger.info("received message: %s", event)
    if event['id'] == 104:
        raise Exception('calculation failed')
    logger.info("sleep %s seconds", event['sleep'])
    time.sleep(event['sleep'])
    logger.info("done")
    return {f"result{event['id']}": f"processed task: {event['id']}"}
def aggregate(event, context):
    logger.info("received aggregated result: %s", event)
    if 'Error' in event:
        return "one of calculation failed"
    else:
        return "processed result " + json.dumps(event, default=str)

Lambda 实现中根据输入中的 sleep 模拟耗时,并在 id 为 104 时模拟 Lambda 调用失败。

再就是要创建一个 SQS 消息队列以及 Step Function 本身

main.tf

provider "aws" {}
resource "aws_sqs_queue" "step_function_input" {
  name = "step_function_input"
  sqs_managed_sse_enabled = false
locals {
  step_function_arn = format("arn:aws:states:%s:%s:stateMachine:step-function-demo",
    data.aws_region.current.id, data.aws_caller_identity.current.account_id)
data "aws_caller_identity" current {}
data aws_region current {}
resource "aws_sfn_state_machine" "demo" {
  name = "step-function-demo"
  role_arn   = aws_iam_role.step_function_role.arn
  definition = templatefile("state_machine.json", {
    CALCULATION_LAMBDA_ARN = aws_lambda_function.calculation.arn
    AGGREGATION_LAMBDA_ARN = aws_lambda_function.aggregation.arn

Step Function 的状态机定义放置在了单独的 state_machine.json 文件中

  "Comment": "A description of my state machine",
  "StartAt": "Map-calculation",
  "States": {
    "Map-calculation": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "STANDARD"
        "StartAt": "Calculations",
        "States": {
          "Calculations": {
            "Type": "Task",
            "Resource": "arn:aws:states:::lambda:invoke",
            "OutputPath": "$.Payload",
            "Parameters": {
              "Payload.$": "$",
              "FunctionName": "${CALCULATION_LAMBDA_ARN}"
            "Retry": [
                "ErrorEquals": [
                  "Lambda.ServiceException",
                  "Lambda.AWSLambdaException",
                  "Lambda.SdkClientException",
                  "Lambda.TooManyRequestsException",
                  "Exception"
                "IntervalSeconds": 2,
                "MaxAttempts": 3,
                "BackoffRate": 2
            "End": true
      "Label": "Map-calculation",
      "MaxConcurrency": 1000,
      "InputPath": "$.items",
      "Next": "Aggregation",
      "Catch": [
          "ErrorEquals": [
            "States.ALL"
          "Next": "Aggregation"
    "Aggregation": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "${AGGREGATION_LAMBDA_ARN}"
      "Retry": [
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
      "End": true

特别注意在 Map-calculation 中会根据 InputPath: $.items 即输入 JSON 中的 items 数组异步调用相应数目的 Calculation Lambda

除了定义流程外,还请留意其中的重试与异常捕获处理机制。我们定义了在 Calculation 中无论出现任何异常都会跳到 Aggregation 步骤。

接下来对 Step Function 进行测试,并统计每一步的耗时。测试方法是往 SQS 队列 step_function_input 中发送 JSON 格式的消息。

测试一:所有场景均执行成功,发送消息

    "items": [
            "id": 100,
            "sleep": 5
            "id": 101,
            "sleep": 30
            "id": 103,
            "sleep": 10
Lambda Function 耗时(Duration) 输入(Event) 输出(Lambda return)
step_function_start 319.15 ms { "items": [ { "id": 100, "sleep": 5 }, { "id": 101, "sleep": 30 }, { "id": 103, "sleep": 10 } ] } 启动状态机
step_function_calculation 5007.12 ms {'id': 100, 'sleep': 5} {'result100': 'processed task: 100'}
step_function_calculation 10013.75 ms {'id': 103, 'sleep': 10} {'result103': 'processed task: 103'}
step_function_calculation 30018.05 ms {'id': 101, 'sleep': 30} {'result101': 'processed task: 101'}
step_function_aggregation 1.46 ms [{'result100': 'processed task: 100'}, {'result101': 'processed task: 101'}, {'result103': 'processed task: 103'}] 处理所有 step_function_calculation 返回结果组成的数组
  1. SQS 消息触发 step_function_start 启动完 Step Function 状态机立即结束
  2. 状态机根据输入中的 items 项调用相应数量的 step_function_calculation Lambda,每个调用的输入为 items 中的子元素
  3. Step Function 把所有 step_function_calculation 的输出组成一个数组作为 step_function_aggregation  的输入
  4. 每一个 Lambda 都在规定的时间内完成自己的任务,不存在任何无谓等待的环节

所有步骤成功后在 Step Function 中看到的该执行(Execution) 的图形显示为

step-function-2-1.png

测试二:Calculation 存在失败的情况,发送消息

    "items": [
            "id": 100,
            "sleep": 5
            "id": 101,
            "sleep": 50
            "id": 104,
            "sleep": 1

有一个 Calculation Lambda 在收到 id:104 时会抛出异常,执行失败。消息中加大了  101 任务的执行时间,在 104 进行了所有的重试后 101 仍未执行完,以此来考察 aggregation 在什么时机被触发。

Lambda Function 开始时间 结束时间
step_function_start 16:39:11,310 16:39:11,507
step_function_calculation(id:100) 16:39:12,737 16:39:17,742
step_function_calculation(id:101) 16:39:12,701 16:40:02,725
step_function_calculation(id:104) 16:39:12,733, 16:39:14,924, 16:39:19,103, 16:39:27,303 多次重试,每次重试 1 秒后结束
step_function_aggregation 16:39:29,529  

结果分析:

  1. 某一个 step_function_calculation Lambda 处理 id:104 的输入时尝试了四次(MaxAttemps:3, 好像有点出入)
  2. 由于某个任务的失败,最络 step_function_aggregation Lambda 执行的输入只是 {'Error': 'States.ExceedToleratedFailureThreshold', 'Cause': 'The specified tolerated failure threshold was exceeded'}, 其余两个能正常执行的结果
  3. step_function_aggregation 并未等待所有的 calculation 结束后才执行,而是有一个失败就会触发,以此通告整个状态机的执行失败

有任务执行失败的话在 Step Function 显示的状态机执行图形是

step-function-3.png

本文简单的尝试了 Step Function 动态异步调用 Lambda 函数的用法,实际使用中必定会复杂些。例如大数据的输入输出可能要借助于 S3 来转储,重试与捕获异常的机制需更多的考虑,在某个步骤失败时可以立即发送 SQS/SNS 消息进行通知而不必执行后续的步骤,等等。

Categories: AWS

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK