Rust for JavaScript developer: SQS batch error handling with AWS Lambd
source link: https://dfrasca.hashnode.dev/rust-for-javascript-developers-sqs-batch-error-handling-with-aws-lambda
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Subscribe to my newsletter and never miss my upcoming articles
I will enhance the AWS SQS comparison using the partial batch response in this post. The partial batch response was announced back on Nov 2021, and it is helping us, developers, to handle partial failure.
As a part of the mini-series Rust for JavaScript developers you can check out the other parts:
Part 1 I have shown a basic comparison for AWS SQS.
Part 2 I have shown a basic comparison for AWS AppConfig.
Part 3 I have shown a basic comparison for reading processing a CSV file.
In the last few years, I have switched languages multiple times between .NET, JavaScript and Rust, and the knowledge acquired with one language is transferable to a new one. Therefore, we need mentally map the similarity to pick it quickly.
Because I was doing this with a friend who was curious to see Serverless Rust in action, I wrote small posts about it.
The Basic
Rust is coming with many built-in features, for example:
JSRust
npmcargo
npm initcargo init
npm installcargo install
npm run buildcargo build
package. jsonCargo.toml
package-lock. jsonCargo.lock
webpackcargo build
lintcargo clippy
prettiercargo fmt
doc generationcargo doc
test library like jestcargo test
Generate a new SAM based Serverless App
sam init --location gh:aws-samples/cookiecutter-aws-sam-rust
Amazon SQS
The challenge before the partial batch response was announced, it was dealing with errors. Suppose one of the SQS records fails while processing, the entire batch of messages is considered failed. Many of us kept track of success and failure messages and deleted the processed ones programmatically to go around this limitation.
Because the code is very similar to this one, I will report just the changes needed, starting with the aws sam template
AWSTemplateFormatVersion: 2010-09-09
Transform: 'AWS::Serverless-2016-10-31'
Description: Writer Lambda.
##########################################################################
# Global values that are applied to all resources #
##########################################################################
Globals:
Function:
MemorySize: 1024
Architectures: ["arm64"]
Handler: bootstrap
Runtime: provided.al2
Timeout: 29
Layers:
- !Sub arn:aws:lambda:${AWS::Region}:580247275435:layer:LambdaInsightsExtension-Arm64:1
Environment:
Variables:
RUST_BACKTRACE: 1
RUST_LOG: info
Resources:
##########################################################################
# Lambda Function #
##########################################################################
LambdaFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../build/handler
Policies:
- AWSLambdaBasicExecutionRole
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: <ARN of the SQS queue>
BatchSize: 10
FunctionResponseTypes:
- ReportBatchItemFailures
The code will be:
pub async fn execute(client: &aws_sdk_dynamodb::Client, event: LambdaEvent<SqsEvent>,) -> Result<Value, Error> {
let failed_message: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let mut tasks = Vec::with_capacity(event.payload.records.len());
let shared_client = Arc::from(client.clone());
for record in event.payload.records.into_iter() {
let shared_client = shared_client.clone();
let message_id = record.message_id.unwrap();
let failed_message = failed_message.clone();
tasks.push(tokio::spawn(async move {
if let Some(body) = &record.body {
let request = serde_json::from_str::<MyStruct>(&body);
if let Ok(request) = request {
DoSomething::new()
.execute(&shared_client, &request)
.map_or_else(|e| {
failed_message.lock().unwrap().push(message_id.clone());
}, |_| ());
}
}
}));
}
join_all(tasks).await;
let response = BatchItemFailures {
batch_item_failures: failed_message.lock().unwrap().clone()
.into_iter()
.map(|message_id| {
return ItemIdentifier {
item_identifier: message_id,
};
})
.collect(),
};
Ok(serde_json::to_value(response).unwrap())
}
It is not much different from the TypeScript version:
export const handler = async (event: SQSEvent): Promise<BatchItemFailures> => {
const failedMessageIds: string[] = [];
await Promise.all(
event.Records.map(async (record: SQSRecord) => {
try {
//DoSomething(record)
} catch (error) {
failedMessageIds.push(record.messageId);
}
}),
);
const batchItemFailures: BatchItemFailures = {
batchItemFailures: failedMessageIds.map(id => {
return {
itemIdentifier: id
}
})
};
return batchItemFailures;
};
Rust is more strict, and because I use tokio::spawn inside a loop, the failed_message vector is moved out and is no longer available because it violates the single borrow rule, so I need to use:
Arc<Mutex<Vec<String>>>
I could not find a way to set the SQS identifier inside the return of the join_all where I could filter down for the failures.
Conclusion
Rust syntax in a serverless application is possibly more clumsy than in other languages. Still, I am not using Rust every day, and I do not notice anymore the extra main(), or the unwrap() or the Mutex/Arc. It is just part of the language, and I got used by now.
My goal is to demonstrate that you can use a different programming language, reuse your skill sets and achieve maybe better results based on the requirements (choose the best tool for the job). Having the same code written in other programming languages helps me move quickly to Rust, and I hope it will help you facilitate the migration to Rust.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK