There are more AWS SDK examples available in the AWS Doc SDK Examples
Send and receive batches of messages with Amazon SQS using an AWS SDK
The following code examples show how to:
Create an Amazon SQS queue.
Send batches of messages to the queue.
Receive batches of messages from the queue.
Delete batches of messages from the queue.
- Java
-
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. As shown in the following examples, you can handle batch message operations with Amazon SQS using two different approaches with the AWS SDK for Java 2.x:
SendRecvBatch.java uses explicit batch operations. You manually create message batches and call
sendMessageBatch()
anddeleteMessageBatch()
directly. You also handle batch responses, including any failed messages. This approach gives you full control over batch sizing and error handling. However, it requires more code to manage the batching logic.SimpleProducerConsumer.java uses the high-level
SqsAsyncBatchManager
library for automatic request batching. You make individualsendMessage()
anddeleteMessage()
calls with the same method signatures as the standard client. The SDK automatically buffers these calls and sends them as batch operations. This approach requires minimal code changes while providing batching performance benefits.Use explicit batching when you need fine-grained control over batch composition and error handling. Use automatic batching when you want to optimize performance with minimal code changes.
SendRecvBatch.java - Uses explicit batch operations with messages.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.SqsException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ /** * This code demonstrates basic message operations in Amazon Simple Queue Service (Amazon SQS). */ public class SendRecvBatch { private static final Logger LOGGER = LoggerFactory.getLogger(SendRecvBatch.class); private static final SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { usageDemo(); } /** * Send a batch of messages in a single request to an SQS queue. * This request may return overall success even when some messages were not sent. * The caller must inspect the Successful and Failed lists in the response and * resend any failed messages. * * @param queueUrl The URL of the queue to receive the messages. * @param messages The messages to send to the queue. Each message contains a body and attributes. * @return The response from SQS that contains the list of successful and failed messages. */ public static SendMessageBatchResponse sendMessages( String queueUrl, List<MessageEntry> messages) { try { List<SendMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { MessageEntry message = messages.get(i); entries.add(SendMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .messageBody(message.getBody()) .messageAttributes(message.getAttributes()) .build()); } SendMessageBatchRequest sendBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); SendMessageBatchResponse response = sqsClient.sendMessageBatch(sendBatchRequest); if (!response.successful().isEmpty()) { for (SendMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Message sent: {}: {}", resultEntry.messageId(), messages.get(Integer.parseInt(resultEntry.id())).getBody()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Failed to send: {}: {}", errorEntry.id(), messages.get(Integer.parseInt(errorEntry.id())).getBody()); } } return response; } catch (SqsException e) { LOGGER.error("Send messages failed to queue: {}", queueUrl, e); throw e; } } /** * Receive a batch of messages in a single request from an SQS queue. * * @param queueUrl The URL of the queue from which to receive messages. * @param maxNumber The maximum number of messages to receive (capped at 10 by SQS). * The actual number of messages received might be less. * @param waitTime The maximum time to wait (in seconds) before returning. When * this number is greater than zero, long polling is used. This * can result in reduced costs and fewer false empty responses. * @return The list of Message objects received. These each contain the body * of the message and metadata and custom attributes. */ public static List<Message> receiveMessages(String queueUrl, int maxNumber, int waitTime) { try { ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(maxNumber) .waitTimeSeconds(waitTime) .messageAttributeNames("All") .build(); List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages(); for (Message message : messages) { LOGGER.info("Received message: {}: {}", message.messageId(), message.body()); } return messages; } catch (SqsException e) { LOGGER.error("Couldn't receive messages from queue: {}", queueUrl, e); throw e; } } /** * Delete a batch of messages from a queue in a single request. * * @param queueUrl The URL of the queue from which to delete the messages. * @param messages The list of messages to delete. * @return The response from SQS that contains the list of successful and failed * message deletions. */ public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { entries.add(DeleteMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .receiptHandle(messages.get(i).receiptHandle()) .build()); } DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest); if (!response.successful().isEmpty()) { for (DeleteMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Deleted {}", messages.get(Integer.parseInt(resultEntry.id())).receiptHandle()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Could not delete {}", messages.get(Integer.parseInt(errorEntry.id())).receiptHandle()); } } return response; } catch (SqsException e) { LOGGER.error("Couldn't delete messages from queue {}", queueUrl, e); throw e; } } /** * Helper class to represent a message with body and attributes. */ public static class MessageEntry { private final String body; private final Map<String, MessageAttributeValue> attributes; public MessageEntry(String body, Map<String, MessageAttributeValue> attributes) { this.body = body; this.attributes = attributes != null ? attributes : new HashMap<>(); } public String getBody() { return body; } public Map<String, MessageAttributeValue> getAttributes() { return attributes; } } /** * Shows how to: * * Read the lines from a file and send the lines in * batches of 10 as messages to a queue. * * Receive the messages in batches until the queue is empty. * * Reassemble the lines of the file and verify they match the original file. */ public static void usageDemo() { LOGGER.info("-".repeat(88)); LOGGER.info("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!"); LOGGER.info("-".repeat(88)); String queueUrl = null; try { // Create a queue for the demo. String queueName = "sqs-usage-demo-message-wrapper-" + System.currentTimeMillis(); CreateQueueRequest createRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); queueUrl = sqsClient.createQueue(createRequest).queueUrl(); LOGGER.info("Created queue: {}", queueUrl); try (InputStream inputStream = SendRecvBatch.class.getResourceAsStream("/log4j2.xml"); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { List<String> lines = reader.lines().toList(); // Send file lines in batches. int batchSize = 10; LOGGER.info("Sending file lines in batches of {} as messages.", batchSize); for (int i = 0; i < lines.size(); i += batchSize) { List<MessageEntry> messageBatch = new ArrayList<>(); for (int j = i; j < Math.min(i + batchSize, lines.size()); j++) { String line = lines.get(j); if (line == null || line.trim().isEmpty()) { continue; // Skip empty lines. } Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("line", MessageAttributeValue.builder() .dataType("String") .stringValue(String.valueOf(j)) .build()); messageBatch.add(new MessageEntry(lines.get(j), attributes)); } sendMessages(queueUrl, messageBatch); System.out.print("."); System.out.flush(); } LOGGER.info("\nDone. Sent {} messages.", lines.size()); // Receive and process messages. LOGGER.info("Receiving, handling, and deleting messages in batches of {}.", batchSize); String[] receivedLines = new String[lines.size()]; boolean moreMessages = true; while (moreMessages) { List<Message> receivedMessages = receiveMessages(queueUrl, batchSize, 5); for (Message message : receivedMessages) { int lineNumber = Integer.parseInt(message.messageAttributes().get("line").stringValue()); receivedLines[lineNumber] = message.body(); } if (!receivedMessages.isEmpty()) { deleteMessages(queueUrl, receivedMessages); } else { moreMessages = false; } } LOGGER.info("\nDone."); // Verify that all lines were received correctly. boolean allLinesMatch = true; for (int i = 0; i < lines.size(); i++) { String originalLine = lines.get(i); String receivedLine = receivedLines[i] == null ? "" : receivedLines[i]; if (!originalLine.equals(receivedLine)) { allLinesMatch = false; break; } } if (allLinesMatch) { LOGGER.info("Successfully reassembled all file lines!"); } else { LOGGER.info("Uh oh, some lines were missed!"); } } } catch (SqsException e) { LOGGER.error("SQS operation failed", e); } catch (RuntimeException | IOException e) { LOGGER.error("Unexpected runtime error during demo", e); } finally { // Clean up by deleting the queue if it was created. if (queueUrl != null) { try { DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); LOGGER.info("Deleted queue: {}", queueUrl); } catch (SqsException e) { LOGGER.error("Failed to delete queue: {}", queueUrl, e); } } } LOGGER.info("Thanks for watching!"); LOGGER.info("-".repeat(88)); } }
SimpleProducerConsumer.java - Uses automatic batching of messages.
package com.example.sqs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import software.amazon.awssdk.core.exception.SdkException; import java.math.BigInteger; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Demonstrates the AWS SDK for Java 2.x Automatic Request Batching API for Amazon SQS. * * This example showcases the high-level SqsAsyncBatchManager library that provides * efficient batching and buffering for SQS operations. The batch manager offers * methods that directly mirror SqsAsyncClient methods—sendMessage, changeMessageVisibility, * deleteMessage, and receiveMessage—making it a drop-in replacement with minimal code changes. * * Key features of the SqsAsyncBatchManager: * - Automatic batching: The SDK automatically buffers individual requests and sends them * as batches when maxBatchSize (default: 10) or sendRequestFrequency (default: 200ms) * thresholds are reached * - Familiar API: Method signatures match SqsAsyncClient exactly, requiring no learning curve * - Background optimization: The batch manager maintains internal buffers and handles * batching logic transparently * - Asynchronous operations: All methods return CompletableFuture for non-blocking execution * * Performance benefits demonstrated: * - Reduced API calls: Multiple individual requests are consolidated into single batch operations * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process more messages per second * - Efficient resource utilization: Fewer network round trips and better connection reuse * * This example compares: * 1. Single-message operations using SqsAsyncClient directly * 2. Batch operations using SqsAsyncBatchManager with identical method calls * * Usage patterns: * - Set batch size to 1 to use SqsAsyncClient for baseline performance measurement * - Set batch size > 1 to use SqsAsyncBatchManager for optimized batch processing * - Monitor real-time throughput metrics to observe performance improvements * * Prerequisites: * - AWS SDK for Java 2.x version 2.28.0 or later * - An existing SQS queue * - Valid AWS credentials configured * * The program displays real-time metrics showing the dramatic performance difference * between individual operations and automatic batching. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Logger log = LoggerFactory.getLogger(SimpleProducerConsumer.class); /** * Runs the SQS batching demonstration with user-configured parameters. * * Prompts for queue name, thread counts, batch size, message size, and runtime. * Creates producer and consumer threads to demonstrate batching performance. * * @param args command line arguments (not used) * @throws InterruptedException if thread operations are interrupted */ public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); // Create SQS async client and batch manager for all operations. // The SqsAsyncBatchManager is created from the SqsAsyncClient using the // batchManager() factory method, which provides default batching configuration. // This high-level library automatically handles request buffering and batching // while maintaining the same method signatures as SqsAsyncClient. final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); final SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager(); final String queueUrl = sqsAsyncClient.getQueueUrl(GetQueueUrlRequest.builder() .queueName(queueName) .build()).join().queueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsAsyncClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(batchManager, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsAsyncClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(batchManager, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); // Close resources batchManager.close(); sqsAsyncClient.close(); } /** * Creates a random string of approximately the specified size in bytes. * * @param sizeByte the target size in bytes for the generated string * @return a random string encoded in base-32 */ private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * Sends messages individually using SqsAsyncClient for baseline performance measurement. * * This producer demonstrates traditional single-message operations without batching. * Each sendMessage() call results in a separate API request to SQS, providing * a performance baseline for comparison with the batch operations. * * The sendMessage() method signature is identical to SqsAsyncBatchManager.sendMessage(), * showing how the high-level batching library maintains API compatibility while * adding automatic optimization behind the scenes. */ private static class Producer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for single-message operations. * * @param sqsAsyncClient the SQS client for sending messages * @param queueUrl the URL of the target queue * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ Producer(SqsAsyncClient sqsAsyncClient, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends messages until the stop flag is set. * * Uses SqsAsyncClient.sendMessage() directly, resulting in one API call per message. * This approach provides baseline performance metrics for comparison with batching. * Each call blocks until the individual message is sent, demonstrating traditional * one-request-per-operation behavior. */ public void run() { try { while (!stop.get()) { sqsAsyncClient.sendMessage(SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()).join(); producedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * Sends messages using SqsAsyncBatchManager for automatic request batching and optimization. * * This producer demonstrates the AWS SDK for Java 2.x high-level batching library. * The SqsAsyncBatchManager automatically buffers individual sendMessage() calls and * sends them as batches when thresholds are reached: * - maxBatchSize: Maximum 10 messages per batch (default) * - sendRequestFrequency: 200ms timeout before sending partial batches (default) * * Key advantages of the batching approach: * - Identical API: batchManager.sendMessage() has the same signature as sqsAsyncClient.sendMessage() * - Automatic optimization: No code changes needed to benefit from batching * - Transparent buffering: The SDK handles batching logic internally * - Reduced API calls: Multiple messages sent in single batch requests * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process significantly more messages per second */ private static class BatchProducer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for batch operations. * * @param batchManager the batch manager for efficient message sending * @param queueUrl the URL of the target queue * @param batchSize the number of messages to send per batch * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ BatchProducer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends batches of messages using the high-level batching library. * * Notice how batchManager.sendMessage() uses the exact same method signature * and request builder pattern as SqsAsyncClient.sendMessage(). This demonstrates * the drop-in replacement capability of the SqsAsyncBatchManager. * * The SDK automatically: * - Buffers individual sendMessage() calls internally * - Groups them into batch requests when thresholds are met * - Sends SendMessageBatchRequest operations to SQS * - Returns individual CompletableFuture responses for each message * * This transparent batching provides significant performance improvements * without requiring changes to application logic or error handling patterns. */ public void run() { try { while (!stop.get()) { // Send multiple messages using the high-level batch manager. // Each batchManager.sendMessage() call uses identical syntax to // sqsAsyncClient.sendMessage(), demonstrating API compatibility. // The SDK automatically buffers these calls and sends them as // batch operations when maxBatchSize (10) or sendRequestFrequency (200ms) // thresholds are reached, significantly improving throughput. for (int i = 0; i < batchSize; i++) { CompletableFuture<SendMessageResponse> future = batchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()); // Handle the response asynchronously future.whenComplete((response, throwable) -> { if (throwable == null) { producedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchProducer: Failed to send message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } // Small delay to allow batching to occur Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchProducer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages individually using SqsAsyncClient for baseline measurement. * * This consumer demonstrates traditional single-message operations without batching. * Each receiveMessage() and deleteMessage() call results in separate API requests, * providing a performance baseline for comparison with batch operations. * * The method signatures are identical to SqsAsyncBatchManager methods: * - receiveMessage() matches batchManager.receiveMessage() * - deleteMessage() matches batchManager.deleteMessage() * * This API consistency allows easy migration to the high-level batching library. */ private static class Consumer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for single-message operations. * * @param sqsAsyncClient the SQS client for receiving messages * @param queueUrl the URL of the source queue * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ Consumer(SqsAsyncClient sqsAsyncClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using traditional single-request operations. * * Uses SqsAsyncClient methods directly: * - receiveMessage(): One API call per receive operation * - deleteMessage(): One API call per delete operation * * This approach demonstrates the baseline performance without batching optimization. * Compare these method calls with the identical signatures used in BatchConsumer * to see how the high-level batching library maintains API compatibility. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResponse result = sqsAsyncClient.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build()).join(); if (!result.messages().isEmpty()) { final Message m = result.messages().get(0); // Note: deleteMessage() signature identical to batchManager.deleteMessage() sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()) .build()).join(); consumedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { log.error(e.getMessage()); } } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages using SqsAsyncBatchManager for automatic optimization. * * This consumer demonstrates the AWS SDK for Java 2.x high-level batching library * for message consumption. The SqsAsyncBatchManager provides two key optimizations: * * 1. Receive optimization: Maintains an internal buffer of messages fetched in the * background, so receiveMessage() calls return immediately from the buffer * 2. Delete batching: Automatically buffers deleteMessage() calls and sends them * as DeleteMessageBatchRequest operations when thresholds are reached * * Key features: * - Identical API: receiveMessage() and deleteMessage() have the same signatures * as SqsAsyncClient methods, making this a true drop-in replacement * - Background fetching: The batch manager continuously fetches messages to keep * the internal buffer populated, reducing receive latency * - Automatic delete batching: Individual deleteMessage() calls are buffered and * sent as batch operations (up to 10 per batch, 200ms frequency) * - Transparent optimization: No application logic changes needed to benefit * * Performance benefits: * - Reduced API calls through automatic batching of delete operations * - Lower latency for receives due to background message buffering * - Higher overall throughput with fewer network round trips */ private static class BatchConsumer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for batch operations. * * @param batchManager the batch manager for efficient message processing * @param queueUrl the URL of the source queue * @param batchSize the maximum number of messages to receive per batch * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ BatchConsumer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using the high-level batching library. * * Demonstrates the key advantage of SqsAsyncBatchManager: identical method signatures * with automatic optimization. Notice how: * * - batchManager.receiveMessage() uses the same syntax as sqsAsyncClient.receiveMessage() * - batchManager.deleteMessage() uses the same syntax as sqsAsyncClient.deleteMessage() * * Behind the scenes, the batch manager: * 1. Maintains an internal message buffer populated by background fetching * 2. Returns messages immediately from the buffer (reduced latency) * 3. Automatically batches deleteMessage() calls into DeleteMessageBatchRequest operations * 4. Sends batch deletes when maxBatchSize (10) or sendRequestFrequency (200ms) is reached * * This provides significant performance improvements with zero code changes * compared to traditional SqsAsyncClient usage patterns. */ public void run() { try { while (!stop.get()) { // Receive messages using the high-level batch manager. // This call uses identical syntax to sqsAsyncClient.receiveMessage() // but benefits from internal message buffering for improved performance. final ReceiveMessageResponse result = batchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(Math.min(batchSize, 10)) .build()).join(); if (!result.messages().isEmpty()) { final List<Message> messages = result.messages(); // Delete messages using the batch manager. // Each deleteMessage() call uses identical syntax to SqsAsyncClient // but the SDK automatically buffers these calls and sends them // as DeleteMessageBatchRequest operations for optimal performance. for (Message message : messages) { CompletableFuture<DeleteMessageResponse> future = batchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build()); future.whenComplete((response, throwable) -> { if (throwable == null) { consumedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchConsumer: Failed to delete message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } } // Small delay to prevent tight polling Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchConsumer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * Displays real-time throughput statistics every second. * * This thread logs the current count of produced and consumed messages * to help you monitor the performance comparison. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; /** * Creates a monitoring thread that displays throughput statistics. * * @param producedCount shared counter for messages sent * @param consumedCount shared counter for messages processed * @param stop shared flag to signal thread termination */ Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } /** * Logs throughput statistics every second until stopped. * * Displays the current count of produced and consumed messages * to help monitor the performance comparison between batching strategies. */ public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }
-
For API details, see the following topics in AWS SDK for Java 2.x API Reference.
-
- Python
-
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. Create functions to wrap Amazon SQS message functions.
import logging import sys import boto3 from botocore.exceptions import ClientError import queue_wrapper logger = logging.getLogger(__name__) sqs = boto3.resource("sqs") def send_messages(queue, messages): """ Send a batch of messages in a single request to an SQS queue. This request may return overall success even when some messages were not sent. The caller must inspect the Successful and Failed lists in the response and resend any failed messages. :param queue: The queue to receive the messages. :param messages: The messages to send to the queue. These are simplified to contain only the message body and attributes. :return: The response from SQS that contains the list of successful and failed messages. """ try: entries = [ { "Id": str(ind), "MessageBody": msg["body"], "MessageAttributes": msg["attributes"], } for ind, msg in enumerate(messages) ] response = queue.send_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info( "Message sent: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Failed to send: %s: %s", msg_meta["MessageId"], messages[int(msg_meta["Id"])]["body"], ) except ClientError as error: logger.exception("Send messages failed to queue: %s", queue) raise error else: return response def receive_messages(queue, max_number, wait_time): """ Receive a batch of messages in a single request from an SQS queue. :param queue: The queue from which to receive messages. :param max_number: The maximum number of messages to receive. The actual number of messages received might be less. :param wait_time: The maximum time to wait (in seconds) before returning. When this number is greater than zero, long polling is used. This can result in reduced costs and fewer false empty responses. :return: The list of Message objects received. These each contain the body of the message and metadata and custom attributes. """ try: messages = queue.receive_messages( MessageAttributeNames=["All"], MaxNumberOfMessages=max_number, WaitTimeSeconds=wait_time, ) for msg in messages: logger.info("Received message: %s: %s", msg.message_id, msg.body) except ClientError as error: logger.exception("Couldn't receive messages from queue: %s", queue) raise error else: return messages def delete_messages(queue, messages): """ Delete a batch of messages from a queue in a single request. :param queue: The queue from which to delete the messages. :param messages: The list of messages to delete. :return: The response from SQS that contains the list of successful and failed message deletions. """ try: entries = [ {"Id": str(ind), "ReceiptHandle": msg.receipt_handle} for ind, msg in enumerate(messages) ] response = queue.delete_messages(Entries=entries) if "Successful" in response: for msg_meta in response["Successful"]: logger.info("Deleted %s", messages[int(msg_meta["Id"])].receipt_handle) if "Failed" in response: for msg_meta in response["Failed"]: logger.warning( "Could not delete %s", messages[int(msg_meta["Id"])].receipt_handle ) except ClientError: logger.exception("Couldn't delete messages from queue %s", queue) else: return response
Use the wrapper functions to send and receive messages in batches.
def usage_demo(): """ Shows how to: * Read the lines from this Python file and send the lines in batches of 10 as messages to a queue. * Receive the messages in batches until the queue is empty. * Reassemble the lines of the file and verify they match the original file. """ def pack_message(msg_path, msg_body, msg_line): return { "body": msg_body, "attributes": { "path": {"StringValue": msg_path, "DataType": "String"}, "line": {"StringValue": str(msg_line), "DataType": "String"}, }, } def unpack_message(msg): return ( msg.message_attributes["path"]["StringValue"], msg.body, int(msg.message_attributes["line"]["StringValue"]), ) print("-" * 88) print("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!") print("-" * 88) queue = queue_wrapper.create_queue("sqs-usage-demo-message-wrapper") with open(__file__) as file: lines = file.readlines() line = 0 batch_size = 10 received_lines = [None] * len(lines) print(f"Sending file lines in batches of {batch_size} as messages.") while line < len(lines): messages = [ pack_message(__file__, lines[index], index) for index in range(line, min(line + batch_size, len(lines))) ] line = line + batch_size send_messages(queue, messages) print(".", end="") sys.stdout.flush() print(f"Done. Sent {len(lines) - 1} messages.") print(f"Receiving, handling, and deleting messages in batches of {batch_size}.") more_messages = True while more_messages: received_messages = receive_messages(queue, batch_size, 2) print(".", end="") sys.stdout.flush() for message in received_messages: path, body, line = unpack_message(message) received_lines[line] = body if received_messages: delete_messages(queue, received_messages) else: more_messages = False print("Done.") if all([lines[index] == received_lines[index] for index in range(len(lines))]): print(f"Successfully reassembled all file lines!") else: print(f"Uh oh, some lines were missed!") queue.delete() print("Thanks for watching!") print("-" * 88)
-
For API details, see the following topics in AWS SDK for Python (Boto3) API Reference.
-