AWS Lambda with SQS

Introduction

Before discussing about using AWS Lambda with SQS, let us discuss about some basics

What is AWS Lambda?

AWS Lambda lets you to run your code without provisioning servers. It follows event driven architecture - meaning if any event happens - your lambda code will be invoked automatically.

What is SQS?

SQS is a managed queuing service from AWS. You can send, store and retrieve messages from SQS. Internally, AWS stores your messages in many servers to have redundancy.

Why you would use SQS with Lambda? What are its use cases?

Let's consider a scenario - where you've multiple services in a microservices environment. One service, say Service A , is producing messages and another service ,say Service B , is consuming messages.

Let's assume that you're not using any kind of queuing system here. Now, tell me what would happen in below scenario

inter service communication without any queuing system
inter service communication without any queuing system
  • The consuming service Service B goes down. What would happen to the messages sent by Service A?

In this scenario, messages would get lost, which you might not want to happen.

In below diagram, these are same services ( Service A and Service B ) backed by AWS Lambda. Instead of directly sending messages between services, we use a queuing system with SQS Queue as intermediary.

Microservices with queuing system (Lambda-SQS)
Microservices with queuing system (Lambda-SQS)

When Service B goes down - the messages sent by Service A would be still in SQS queue. So, when the Service B comes up again - it can pick up where it left off - processing all the remaining messages in the queue.

Code

Enough theory. Let's write some code to see it in action. We'll create above architecture using AWS CDK

Queue Creation

You can use below code snippet to create Standard SQS queue.

const queue = new sqs.Queue(this, 'AwsLambdaSqsQueue');
SQS queue creation

Producer and Consumer service creation using Lambda

We're going to create 2 services - Service A and Service B using AWS Lambda. Service A would be writing to SQS Queue and Service B would be reading from the same SQS Queue. Both of these services are going to use AWS Lambda.

Below are the generic properties for both of these lambda functions. We use Node 16 as lambda runtime.

const nodeJsFunctionProps: NodejsFunctionProps = {
      bundling: {
        externalModules: [
          'aws-sdk', // Use the 'aws-sdk' available in the Lambda runtime
        ],
      },
      runtime: Runtime.NODEJS_16_X,
      memorySize: 256,
    };
Lambda function properties

Producer Lambda:

This lambda is responsible for writing messages to the queue and please don't forget to grant permissions to send messages to the queue ( as shown in last line of below code snippet).

const writeSqsFn = new NodejsFunction(this, 'writeSqs', {
      entry: path.join(__dirname, '../src/lambdas', 'write-sqs-msg.ts'),
      ...nodeJsFunctionProps,
      functionName: 'writeSqsMsg',
      environment: {
        queueUrl: queue.queueUrl,
      },
    });

    queue.grantSendMessages(writeSqsFn);

How to send (write) message to SQS:

In the below producer service lambda function, we're writing just 3 messages into queue. SendMessageRequest requires couple of properties - message body and queue url. The purpose of this lambda is just to simulate the producer functionality.

export const handler = async (event: any, context: any = {}): Promise<any> => {
  const queueUrl = process.env.queueUrl || '';
  const sqs = new AWS.SQS();

  for (let i = 0; i < 3; i++) {
    const msgId = i;
    const msgBody = { msgId, msg: `This is msg ${i + 1}` };

    const params: AWS.SQS.SendMessageRequest = {
      MessageBody: JSON.stringify(msgBody),
      QueueUrl: queueUrl,
    };

    await sqs.sendMessage(params).promise();
    console.log('Successfully sent msg to queue- msgId:', msgId);
  }
};

How to trigger the producer lambda:

As lambda is event driven, it will not run on its own. There should be some external event which invokes the lambda.

Just for the sake of this example, we're creating event bridge rule which would trigger this lambda every minute. In real world, this producer service could be another lambda based on some other event or some other service.

 const everyMinRateER = new events.Rule(this, 'everyMinRateER', {
      schedule: events.Schedule.expression('rate(1 minute)'),
    });

    everyMinRateER.addTarget(new targets.LambdaFunction(writeSqsFn));

Consumer Lambda:

At consuming end, we're creating a lambda function to read the messages from the queue. We've added queue as event source for this lambda.

const readSqsFn = new NodejsFunction(this, 'readsqs', {
      entry: path.join(__dirname, '../src/lambdas', 'read-sqs-msg.ts'),
      ...nodeJsFunctionProps,
      functionName: 'readSQSMsg',
    });

    readSqsFn.addEventSource(new SqsEventSource(queue));

When we add event source, lambda service (not the lambda function that you wrote) will invoke your lambda function with the message from the queue. The messages will be in event parameter of your lambda function.

Sample SQS Event

Below is one sample SQS Event

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

How to read messages from SQS using Lambda:

event object may have many records and each record in event will have body property which contains the message (as shown in above sample event). In below function, we're just printing the sqs message (which you can see in cloudwatch)

import { SQSEvent } from 'aws-lambda';
import * as AWS from 'aws-sdk';

export const handler = async (
  event: SQSEvent,
  context: any = {}
): Promise<any> => {
  for (const record of event.Records) {
    console.log('sqs record:', record?.body);
  }
};
Reading message from lambda

Deployment:

When you deploy above code using cdk deploy , all the AWS resources would be created.

From cloudwatch logs, you can see that the messages are sent to queue

Cloudwatch logs for sending messages to SQS
Cloudwatch logs for sending messages to SQS

You can even see the logs which are read from the queue

How the consumer lambda receives message from SQS queue?

Earlier, I've told that whenever there is a message in a queue, your consumer lambda will get that message. But, how it gets the message? Who delivers the message.

In the case of SQS with lambda, lambda service (not your lambda function) will poll the SQS queue for every few seconds and if the queue contains the message - the message is delivered as an event to the consuming lambda.

How frequently does the lambda runtime poll the queue?

It depends on your configuration. There are 2 types of polling - short polling and long polling.

In short polling, it will poll only subset of servers which may result in false positives - meaning messages will be there in the queue but you might get empty response as it polls only subnet of servers.

In long polling - you can specify the frequency (in terms number of seconds) with a maximum value of 20 seconds. For example, if you use long polling of 20 seconds, lambda service will poll your queue (all the servers for the queue) every 20 seconds to check for messages.

receiveMessageWaitTime property determines the polling frequency. If the value of this property is greater than 0 seconds, it is considered as long polling.

You'd use long polling, when you pass any value greater than zero seconds to receiveMessageWaitTime property. Here, we've passed 20 seconds - Lambda service will poll the queue every 20 seconds for any new messages.

 const queue = new sqs.Queue(this, 'AwsLambdaSqsQueue', {
      receiveMessageWaitTime: cdk.Duration.seconds(20),
    });

It is always recommended to use long polling.

Who pays for this lambda polling in SQS queue?

You'll not be charged for polling . However, it is not free.  Since polling involves calling SQS APIs, you'll be charged for calling SQS APIs but it is pretty cheap.

How many messages does lambda poll in your queue?

Lambda service will poll your queue in batches and each batch will contain specific number of messages as mentioned by the batch size. By default, lambda can poll up to 10 messages on each batch. While creating event source, you can pass the value to the batchSize property. This value represents  the number of records that AWS Lambda retrieves and sends to your lambda function on each batch.

  readSqsFn.addEventSource(
      new SqsEventSource(queue, {
        batchSize: 10,
      })
    );

Please note that lambda service will not wait for these many messages. If there are any large number of messages, it will fetch the messages based on the value of this property.

Order of messages

As you can see in the below cloudwatch screenshot,  we didn't receive the messages in the order they were sent. This is one of the characteristics of standard SQS queue.

In below diagram, we're sending messages in the following order - 1, 2 and finally 3. But while consuming, it is not guaranteed to have the same order it was sent.

When you write lambda for consuming standard SQS messages, you should not depend on the order of the messages.

Synchronous invocation

When you're using SQS with Lambda, Lambda performs synchronous invocation - meaning the lambda function will be invoked and wait for the execution to complete and return the response to the caller.

Deletion of messages

Another advantage of using Lambda with SQS is that as soon as the message is successfully processed, the message would be automatically deleted from the queue. You don't need to do this manually.

visibilityTimeout

When Service A is producing lot of messages - then, lambda service will scale out Service B by making many replicas for Service B - many lambdas would be processing the messages, in our case.

Scaling out
Scaling out

This begs another question - if there are many replicas of Service B. Would each replica pick the same message to process?

visibilityTimeout property to rescue. This property determines how long a message becomes invisible when it is delivered to a consumer. If a consumer picks a message, the message would be invisible for specified number of seconds - as dictated by visibilityTimeout property. During this period all the other consumers would not be able to see that message (even though the message still exists in the queue)

Let us assume that we've got 3 messages in the queue and 2 messages have been sent to consuming lambdas. These 2 messages (denoted by white color) would be invisible for specified amount of time.

You can set the visibility timeout while creating the queue itself

 const queue = new sqs.Queue(this, 'AwsLambdaSqsQueue', {
      receiveMessageWaitTime: cdk.Duration.seconds(20),
      visibilityTimeout: cdk.Duration.seconds(300),
    });
SQS queue creation with receiveMessageWaitTime and visibilityTimeout

Dead letter queue

Dead letter queue is another queue to store the messages which are not successful in processing. You can investigate these messages later.

If there is any error while processing the messages in the queue, the message will not be deleted after the execution of function. After some time (as specified in visibility Timeout), the message will become visible again in the queue and would be available for consumption again.

Lambda will poll for the message again and again till it reaches the threshold (as specified in  maxReceiveCount property) and then the message would be transferred to dead letter queue. If the error was due to some temporary glitch or was fixed between retries, the message would be processed by lambda and would be deleted from queue.

Dead letter queue
Dead letter queue

Lambda scaling & concurrency

Below diagram shows the mental model on how scaling works at high level when you use SQS with Lambda.

For the discussion of lambda scaling with SQS, let us assume that messages are sent to the queue from different producers and each producer is sending lots of messages.

Lambda service will poll your SQS queue in batches. It will start with 5 concurrent connections (let's call this poller ) and would increase the count based on the number of messages in the queue.

We don't have any control over this autoscaling of pollers as they are internally controlled by AWS. However, you have control over how many messages that you can receive in each batch and you can specify the same in  batchSize property.

Once the messages are received by poller, lambda service will auto scale your lambda functions based on the number of messages. You can control this autoscaling behavior by using reserved concurrency property. You can throttle lambda invocations by using this property.

One important thing to note here is that even when you use reserved concurrency to limit the parallel invocations of your lambda function, sqs lambda integration will not stop sending messages to your lambda function. Because of the reserved concurrency configuration, your lambda function will not be invoked and the messages would be sent back to the queue.

Weird behavior:

Let's consider a scenario where you've 1000 messages in the queue and you've configured reserved concurrency to 1 and batch size to 2.

Would all the messages be processed successfully? Let's see.

As we don't have any control over polling behavior, SQS lambda integration would send more messages from the sqs queue even though our reserved concurrency is set to 1. As we've reserved concurrency configured for the function, the additional messages would be sent back to the queue.

This process would happen again and again and the messages would be sent to DLQ after specific number of retries ( as mentioned in maxReceiveCount property).

To replicate this scenario, I've created SQS queue with DLQ configured with maxReceiveCount set to 5. This means after 5 retries, lambda service will send the message to DLQ.

    const queue = new sqs.Queue(this, 'AwsLambdaSqsQueue', {
      visibilityTimeout: cdk.Duration.seconds(300),
      receiveMessageWaitTime: cdk.Duration.seconds(20),
      deadLetterQueue: {
        queue: new sqs.Queue(this, 'AwsLambdaDlq'),
        maxReceiveCount: 5,
      },
    });

I've put 1000 messages in a queue and created consumer lambda as below. Please note that I've mentioned batchSize as 2 so that only 2 messages would be sent to consumer lambda function on each batch.

const readSqsFn = new NodejsFunction(this, 'readsqs', {
      entry: path.join(__dirname, '../src/lambdas', 'read-sqs-msg.ts'),
      ...nodeJsFunctionProps,
      functionName: 'readSQSMsg',
      environment: {
        queueUrl: queue.queueUrl,
      },
      reservedConcurrentExecutions: 1,
      timeout: Duration.seconds(300),
    });

    readSqsFn.addEventSource(
      new SqsEventSource(queue, {
        batchSize: 2,
      })
    );

Consumer lambda function code

Just to simulate some processing, I'm waiting for 10 seconds before printing the message.

export const handler = async (
  event: SQSEvent,
  context: any = {}
): Promise<any> => {
  console.log(`Number of records:${event.Records.length}`);
  await new Promise((r) => setTimeout(r, 10000));
  for (const record of event.Records) {
    console.log('sqs record:', record?.body);
  }
};

As you can see in the below screenshot, lambda service polls for more messages (570 messages in this case) than what we can handle (2 messages at a time - because of our configuration of reserved concurrency and batch size). But, we don't have control over over this behavior.

After sometime, some messages are processed. The messages which are not processed due to reservedConcurrency (currently set to 1), are sent back to the queue. You can see the same in below screenshot.

As mentioned earlier, the lambda service would send lot more messages even though our concurrency is set to 1. And, finally after 5 failed attempts(this is what I've configured maxReceiveCount for DLQ), the messages would land in DLQ.

Out of 1000 messages, 742 messages landed in DLQ which is NOT definitely what  we wanted.

The general expectation is that would take more time because of our configuration with respect to reserved concurrency but it will process all the messages. But that's not how SQS with lambda works.

You need to understand this behavior while designing event driven systems, if you're using standard SQS queue.

Solution

The solution to the above problem is to use FIFO (First In First Out) queue. There is no change to existing code except you need to mention it as FIFO queue at creation

const queue = new sqs.Queue(this, 'AwsLambdaSqsQueue', {
      fifo: true,
      visibilityTimeout: cdk.Duration.seconds(300),
      receiveMessageWaitTime: cdk.Duration.seconds(20),
      contentBasedDeduplication: true,
      deadLetterQueue: {
        queue: new sqs.Queue(this, 'AwsLambdaDlq', {
          fifo: true,
          contentBasedDeduplication: true,
        }),
        maxReceiveCount: 5,
      },
    });

If you're using FIFO queues, your consumer will be able to get 2 messages and will be able to process those messages without any issues

FIFO sqs queue output
FIFO sqs queue output

Error handling

Whenever there is some problem with processing the message, all the messages in that batch are sent back to queue again.

Let's say, you've 10 messages (with msgId from 1 to 10) in the queue. In the first batch , let's say you've got 5 messages - when there is an issue in processing any of those 5 messages, all the 5 messages would be sent back to queue.