Hay más ejemplos de AWS SDK disponibles en el GitHub repositorio de ejemplos de AWS Doc SDK
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplos de Amazon SQS usando SDK para Java 2.x
Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes AWS SDK for Java 2.x mediante Amazon SQS.
Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las distintas funciones de servicio, es posible ver las acciones en contexto en los escenarios relacionados.
Los escenarios son ejemplos de código que muestran cómo llevar a cabo una tarea específica a través de llamadas a varias funciones dentro del servicio o combinado con otros Servicios de AWS.
En cada ejemplo se incluye un enlace al código de origen completo, con instrucciones de configuración y ejecución del código en el contexto.
Introducción
En los siguientes ejemplos de código se muestra cómo empezar a utilizar Amazon SQS.
- SDK para Java 2.x
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.services.sqs.paginators.ListQueuesIterable; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class HelloSQS { public static void main(String[] args) { SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); listQueues(sqsClient); sqsClient.close(); } public static void listQueues(SqsClient sqsClient) { try { ListQueuesIterable listQueues = sqsClient.listQueuesPaginator(); listQueues.stream() .flatMap(r -> r.queueUrls().stream()) .forEach(content -> System.out.println(" Queue URL: " + content.toLowerCase())); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
-
Para obtener más información sobre la API, consulta ListQueuesla Referencia AWS SDK for Java 2.x de la API.
-
Acciones
En el siguiente ejemplo de código, se muestra cómo utilizar CreateQueue
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.ListQueuesRequest; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.List; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class SQSExample { public static void main(String[] args) { String queueName = "queue" + System.currentTimeMillis(); SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); // Perform various tasks on the Amazon SQS queue. String queueUrl = createQueue(sqsClient, queueName); listQueues(sqsClient); listQueuesFilter(sqsClient, queueUrl); List<Message> messages = receiveMessages(sqsClient, queueUrl); sendBatchMessages(sqsClient, queueUrl); changeMessages(sqsClient, queueUrl, messages); deleteMessages(sqsClient, queueUrl, messages); sqsClient.close(); } public static String createQueue(SqsClient sqsClient, String queueName) { try { System.out.println("\nCreate Queue"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void listQueues(SqsClient sqsClient) { System.out.println("\nList Queues"); String prefix = "que"; try { ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build(); ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest); for (String url : listQueuesResponse.queueUrls()) { System.out.println(url); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void listQueuesFilter(SqsClient sqsClient, String queueUrl) { // List queues with filters String namePrefix = "queue"; ListQueuesRequest filterListRequest = ListQueuesRequest.builder() .queueNamePrefix(namePrefix) .build(); ListQueuesResponse listQueuesFilteredResponse = sqsClient.listQueues(filterListRequest); System.out.println("Queue URLs with prefix: " + namePrefix); for (String url : listQueuesFilteredResponse.queueUrls()) { System.out.println(url); } System.out.println("\nSend message"); try { sqsClient.sendMessage(SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody("Hello world!") .delaySeconds(10) .build()); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void sendBatchMessages(SqsClient sqsClient, String queueUrl) { System.out.println("\nSend multiple messages"); try { SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(SendMessageBatchRequestEntry.builder().id("id1").messageBody("Hello from msg 1").build(), SendMessageBatchRequestEntry.builder().id("id2").messageBody("msg 2").delaySeconds(10) .build()) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl) { System.out.println("\nReceive messages"); try { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null; } public static void changeMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { System.out.println("\nChange Message Visibility"); try { for (Message message : messages) { ChangeMessageVisibilityRequest req = ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .visibilityTimeout(100) .build(); sqsClient.changeMessageVisibility(req); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { System.out.println("\nDelete Messages"); try { for (Message message : messages) { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build(); sqsClient.deleteMessage(deleteMessageRequest); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
-
Para obtener más información sobre la API, consulta CreateQueuela Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar DeleteMessage
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. try { for (Message message : messages) { DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build(); sqsClient.deleteMessage(deleteMessageRequest); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); }
-
Para obtener más información sobre la API, consulta DeleteMessagela Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar DeleteQueue
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.SqsException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class DeleteQueue { public static void main(String[] args) { final String usage = """ Usage: <queueName> Where: queueName - The name of the Amazon SQS queue to delete. """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String queueName = args[0]; SqsClient sqs = SqsClient.builder() .region(Region.US_WEST_2) .build(); deleteSQSQueue(sqs, queueName); sqs.close(); } public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
-
Para obtener más información sobre la API, consulta DeleteQueuela Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar GetQueueUrl
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl();
-
Para obtener más información sobre la API, consulta GetQueueUrlla Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar ListQueues
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. String prefix = "que"; try { ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().queueNamePrefix(prefix).build(); ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest); for (String url : listQueuesResponse.queueUrls()) { System.out.println(url); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); }
-
Para obtener más información sobre la API, consulta ListQueuesla Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar ReceiveMessage
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. try { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null;
-
Para obtener más información sobre la API, consulta ReceiveMessagela Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar SendMessage
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. A continuación se muestran dos ejemplos de la
SendMessage
operación:-
Envía un mensaje con un cuerpo y un retraso
-
Envía un mensaje con un cuerpo y los atributos del mensaje
Envía un mensaje con un cuerpo y un retraso.
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SqsException; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class SendMessages { public static void main(String[] args) { final String usage = """ Usage: <queueName> <message> Where: queueName - The name of the queue. message - The message to send. """; if (args.length != 2) { System.out.println(usage); System.exit(1); } String queueName = args[0]; String message = args[1]; SqsClient sqsClient = SqsClient.builder() .region(Region.US_WEST_2) .build(); sendMessage(sqsClient, queueName, message); sqsClient.close(); } public static void sendMessage(SqsClient sqsClient, String queueName, String message) { try { CreateQueueRequest request = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(request); GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); SendMessageRequest sendMsgRequest = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(message) .delaySeconds(5) .build(); sqsClient.sendMessage(sendMsgRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
Envía un mensaje con el cuerpo y los atributos del mensaje.
/** * <p>This method demonstrates how to add message attributes to a message. * Each attribute must specify a name, value, and data type. You use a Java Map to supply the attributes. The map's * key is the attribute name, and you specify the map's entry value using a builder that includes the attribute * value and data type.</p> * * <p>The data type must start with one of "String", "Number" or "Binary". You can optionally * define a custom extension by using a "." and your extension.</p> * * <p>The SQS Developer Guide provides more information on @see <a * href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">message * attributes</a>.</p> * * @param thumbailPath Filesystem path of the image. * @param queueUrl URL of the SQS queue. */ static void sendMessageWithAttributes(Path thumbailPath, String queueUrl) { Map<String, MessageAttributeValue> messageAttributeMap; try { messageAttributeMap = Map.of( "Name", MessageAttributeValue.builder() .stringValue("Jane Doe") .dataType("String").build(), "Age", MessageAttributeValue.builder() .stringValue("42") .dataType("Number.int").build(), "Image", MessageAttributeValue.builder() .binaryValue(SdkBytes.fromByteArray(Files.readAllBytes(thumbailPath))) .dataType("Binary.jpg").build() ); } catch (IOException e) { LOGGER.error("An I/O exception occurred reading thumbnail image: {}", e.getMessage(), e); throw new RuntimeException(e); } SendMessageRequest request = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody("Hello SQS") .messageAttributes(messageAttributeMap) .build(); try { SendMessageResponse sendMessageResponse = SQS_CLIENT.sendMessage(request); LOGGER.info("Message ID: {}", sendMessageResponse.messageId()); } catch (SqsException e) { LOGGER.error("Exception occurred sending message: {}", e.getMessage(), e); throw new RuntimeException(e); } }
-
Para obtener más información sobre la API, consulte SendMessagela Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar SendMessageBatch
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(SendMessageBatchRequestEntry.builder().id("id1").messageBody("Hello from msg 1").build(), SendMessageBatchRequestEntry.builder().id("id2").messageBody("msg 2").delaySeconds(10) .build()) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest);
-
Para obtener más información sobre la API, consulta SendMessageBatchla Referencia AWS SDK for Java 2.x de la API.
-
En el siguiente ejemplo de código, se muestra cómo utilizar SetQueueAttributes
.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Configure un Amazon SQS para que utilice el cifrado del lado del servidor (SSE) mediante una clave KMS personalizada.
public static void addEncryption(String queueName, String kmsMasterKeyAlias) { SqsClient sqsClient = SqsClient.create(); GetQueueUrlRequest urlRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); GetQueueUrlResponse getQueueUrlResponse; try { getQueueUrlResponse = sqsClient.getQueueUrl(urlRequest); } catch (QueueDoesNotExistException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } String queueUrl = getQueueUrlResponse.queueUrl(); Map<QueueAttributeName, String> attributes = Map.of( QueueAttributeName.KMS_MASTER_KEY_ID, kmsMasterKeyAlias, QueueAttributeName.KMS_DATA_KEY_REUSE_PERIOD_SECONDS, "140" // Set the data key reuse period to 140 seconds. ); // This is how long SQS can reuse the data key before requesting a new one from KMS. SetQueueAttributesRequest attRequest = SetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributes(attributes) .build(); try { sqsClient.setQueueAttributes(attRequest); LOGGER.info("The attributes have been applied to {}", queueName); } catch (InvalidAttributeNameException | InvalidAttributeValueException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } finally { sqsClient.close(); } }
-
Para obtener más información sobre la API, consulte la referencia de SetQueueAttributesla AWS SDK for Java 2.x API.
-
Escenarios
El siguiente ejemplo de código muestra cómo crear una aplicación de mensajería mediante Amazon SQS.
- SDK para Java 2.x
-
Muestra cómo usar la API de Amazon SQS para desarrollar una API de REST de Spring que envíe y recupere mensajes.
Para obtener el código fuente completo y las instrucciones sobre cómo configurarla y ejecutarla, consulte el ejemplo completo en GitHub
. Servicios utilizados en este ejemplo
Amazon Comprehend
Amazon SQS
El siguiente ejemplo de código indica cómo crear y publicar en un tema FIFO de Amazon SNS.
- SDK para Java 2.x
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Este ejemplo
-
crea un tema FIFO de Amazon SNS, dos colas FIFO de Amazon SQS y una cola estándar.
-
suscribe las colas al tema y publica un mensaje en el tema.
La prueba
verifica la recepción del mensaje en cada cola. El ejemplo completo también muestra la incorporación de políticas de acceso y, al final, elimina los recursos. public class PriceUpdateExample { public final static SnsClient snsClient = SnsClient.create(); public final static SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <topicName> <wholesaleQueueFifoName> <retailQueueFifoName> <analyticsQueueName>\n\n" + "Where:\n" + " fifoTopicName - The name of the FIFO topic that you want to create. \n\n" + " wholesaleQueueARN - The name of a SQS FIFO queue that will be created for the wholesale consumer. \n\n" + " retailQueueARN - The name of a SQS FIFO queue that will created for the retail consumer. \n\n" + " analyticsQueueARN - The name of a SQS standard queue that will be created for the analytics consumer. \n\n"; if (args.length != 4) { System.out.println(usage); System.exit(1); } final String fifoTopicName = args[0]; final String wholeSaleQueueName = args[1]; final String retailQueueName = args[2]; final String analyticsQueueName = args[3]; // For convenience, the QueueData class holds metadata about a queue: ARN, URL, // name and type. List<QueueData> queues = List.of( new QueueData(wholeSaleQueueName, QueueType.FIFO), new QueueData(retailQueueName, QueueType.FIFO), new QueueData(analyticsQueueName, QueueType.Standard)); // Create queues. createQueues(queues); // Create a topic. String topicARN = createFIFOTopic(fifoTopicName); // Subscribe each queue to the topic. subscribeQueues(queues, topicARN); // Allow the newly created topic to send messages to the queues. addAccessPolicyToQueuesFINAL(queues, topicARN); // Publish a sample price update message with payload. publishPriceUpdate(topicARN, "{\"product\": 214, \"price\": 79.99}", "Consumables"); // Clean up resources. deleteSubscriptions(queues); deleteQueues(queues); deleteTopic(topicARN); } public static String createFIFOTopic(String topicName) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = Map.of( "FifoTopic", "true", "ContentBasedDeduplication", "false", "FifoThroughputScope", "MessageGroup"); CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); String topicArn = response.topicArn(); System.out.println("The topic ARN is" + topicArn); return topicArn; } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static void subscribeQueues(List<QueueData> queues, String topicARN) { queues.forEach(queue -> { SubscribeRequest subscribeRequest = SubscribeRequest.builder() .topicArn(topicARN) .endpoint(queue.queueARN) .protocol("sqs") .build(); // Subscribe to the endpoint by using the SNS service client. // Only Amazon SQS queues can receive notifications from an Amazon SNS FIFO // topic. SubscribeResponse subscribeResponse = snsClient.subscribe(subscribeRequest); System.out.println("The queue [" + queue.queueARN + "] subscribed to the topic [" + topicARN + "]"); queue.subscriptionARN = subscribeResponse.subscriptionArn(); }); } public static void publishPriceUpdate(String topicArn, String payload, String groupId) { try { // Create and publish a message that updates the wholesale price. String subject = "Price Update"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; MessageAttributeValue msgAttValue = MessageAttributeValue.builder() .dataType("String") .stringValue(attributeValue) .build(); Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put(attributeName, msgAttValue); PublishRequest pubRequest = PublishRequest.builder() .topicArn(topicArn) .subject(subject) .message(payload) .messageGroupId(groupId) .messageDeduplicationId(dedupId) .messageAttributes(attributes) .build(); final PublishResponse response = snsClient.publish(pubRequest); System.out.println(response.messageId()); System.out.println(response.sequenceNumber()); System.out.println("Message was published to " + topicArn); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
El siguiente ejemplo de código muestra cómo detectar personas y objetos en un vídeo con Amazon Rekognition.
- SDK para Java 2.x
-
Muestra cómo utilizar la API Java de Amazon Rekognition para crear una aplicación que detecte rostros y objetos en vídeos ubicados en un bucket de Amazon Simple Storage Service (Amazon S3). La aplicación envía al administrador una notificación por correo electrónico con los resultados mediante Amazon Simple Email Service (Amazon SES).
Para obtener el código fuente completo y las instrucciones sobre cómo configurarlo y ejecutarlo, consulte el ejemplo completo en. GitHub
Servicios utilizados en este ejemplo
Amazon Rekognition
Amazon S3
Amazon SES
Amazon SNS
Amazon SQS
El siguiente ejemplo de código muestra cómo utilizar la biblioteca de clientes extendida de Amazon SQS para trabajar con mensajes de Amazon SQS de gran tamaño.
- SDK para Java 2.x
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.ExpirationStatus; import software.amazon.awssdk.services.s3.model.LifecycleExpiration; import software.amazon.awssdk.services.s3.model.LifecycleRule; import software.amazon.awssdk.services.s3.model.LifecycleRuleFilter; import software.amazon.awssdk.services.s3.model.ListObjectVersionsRequest; import software.amazon.awssdk.services.s3.model.ListObjectVersionsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import java.util.Arrays; import java.util.List; import java.util.UUID; /** * Example of using Amazon SQS Extended Client Library for Java 2.x. */ public class SqsExtendedClientExample { private static final Logger logger = LoggerFactory.getLogger(SqsExtendedClientExample.class); private String s3BucketName; private String queueUrl; private final String queueName; private final S3Client s3Client; private final SqsClient sqsExtendedClient; private final int messageSize; /** * Constructor with default clients and message size. */ public SqsExtendedClientExample() { this(S3Client.create(), 300000); } /** * Constructor with custom S3 client and message size. * * @param s3Client The S3 client to use * @param messageSize The size of the test message to create */ public SqsExtendedClientExample(S3Client s3Client, int messageSize) { this.s3Client = s3Client; this.messageSize = messageSize; // Generate a unique bucket name. this.s3BucketName = UUID.randomUUID() + "-" + DateTimeFormat.forPattern("yyMMdd-hhmmss").print(new DateTime()); // Generate a unique queue name. this.queueName = "MyQueue-" + UUID.randomUUID(); // Configure the SQS extended client. final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, s3BucketName); this.sqsExtendedClient = new AmazonSQSExtendedClient(SqsClient.builder().build(), extendedClientConfig); } public static void main(String[] args) { SqsExtendedClientExample example = new SqsExtendedClientExample(); try { example.setup(); example.sendAndReceiveMessage(); } finally { example.cleanup(); } } /** * Send a large message and receive it back. * * @return The received message */ public Message sendAndReceiveMessage() { try { // Create a large message. char[] chars = new char[messageSize]; Arrays.fill(chars, 'x'); String largeMessage = new String(chars); // Send the message. final SendMessageRequest sendMessageRequest = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(largeMessage) .build(); sqsExtendedClient.sendMessage(sendMessageRequest); logger.info("Sent message of size: {}", largeMessage.length()); // Receive and return the message. final ReceiveMessageResponse receiveMessageResponse = sqsExtendedClient.receiveMessage( ReceiveMessageRequest.builder().queueUrl(queueUrl).build()); List<Message> messages = receiveMessageResponse.messages(); if (messages.isEmpty()) { throw new RuntimeException("No messages received"); } Message message = messages.getFirst(); logger.info("\nMessage received."); logger.info(" ID: {}", message.messageId()); logger.info(" Receipt handle: {}", message.receiptHandle()); logger.info(" Message body size: {}", message.body().length()); logger.info(" Message body (first 5 characters): {}", message.body().substring(0, 5)); return message; } catch (RuntimeException e) { logger.error("Error during message processing: {}", e.getMessage(), e); throw e; } }
-
Para obtener más información, consulte la Guía para desarrolladores de AWS SDK for Java 2.x.
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
El siguiente ejemplo de código muestra cómo trabajar con notificaciones de eventos de S3 de una forma orientada a objetos.
- SDK para Java 2.x
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Este ejemplo muestra cómo procesar un evento de notificación de S3 utilizando Amazon SQS.
/** * This method receives S3 event notifications by using an SqsAsyncClient. * After the client receives the messages it deserializes the JSON payload and logs them. It uses * the S3EventNotification class (part of the S3 event notification API for Java) to deserialize * the JSON payload and access the messages in an object-oriented way. * * @param queueUrl The URL of the AWS SQS queue that receives the S3 event notifications. * @see <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/eventnotifications/s3/model/package-summary.html">S3EventNotification API</a>. * <p> * To use S3 event notification serialization/deserialization to objects, add the following * dependency to your Maven pom.xml file. * <dependency> * <groupId>software.amazon.awssdk</groupId> * <artifactId>s3-event-notifications</artifactId> * <version><LATEST></version> * </dependency> * <p> * The S3 event notification API became available with version 2.25.11 of the Java SDK. * <p> * This example shows the use of the API with AWS SQS, but it can be used to process S3 event notifications * in AWS SNS or AWS Lambda as well. * <p> * Note: The S3EventNotification class does not work with messages routed through AWS EventBridge. */ static void processS3Events(String bucketName, String queueUrl, String queueArn) { try { // Configure the bucket to send Object Created and Object Tagging notifications to an existing SQS queue. s3Client.putBucketNotificationConfiguration(b -> b .notificationConfiguration(ncb -> ncb .queueConfigurations(qcb -> qcb .events(Event.S3_OBJECT_CREATED, Event.S3_OBJECT_TAGGING) .queueArn(queueArn))) .bucket(bucketName) ).join(); triggerS3EventNotifications(bucketName); // Wait for event notifications to propagate. Thread.sleep(Duration.ofSeconds(5).toMillis()); boolean didReceiveMessages = true; while (didReceiveMessages) { // Display the number of messages that are available in the queue. sqsClient.getQueueAttributes(b -> b .queueUrl(queueUrl) .attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES) ).thenAccept(attributeResponse -> logger.info("Approximate number of messages in the queue: {}", attributeResponse.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES))) .join(); // Receive the messages. ReceiveMessageResponse response = sqsClient.receiveMessage(b -> b .queueUrl(queueUrl) ).get(); logger.info("Count of received messages: {}", response.messages().size()); didReceiveMessages = !response.messages().isEmpty(); // Create a collection to hold the received message for deletion // after we log the messages. HashSet<DeleteMessageBatchRequestEntry> messagesToDelete = new HashSet<>(); // Process each message. response.messages().forEach(message -> { logger.info("Message id: {}", message.messageId()); // Deserialize JSON message body to a S3EventNotification object // to access messages in an object-oriented way. S3EventNotification event = S3EventNotification.fromJson(message.body()); // Log the S3 event notification record details. if (event.getRecords() != null) { event.getRecords().forEach(record -> { String eventName = record.getEventName(); String key = record.getS3().getObject().getKey(); logger.info(record.toString()); logger.info("Event name is {} and key is {}", eventName, key); }); } // Add logged messages to collection for batch deletion. messagesToDelete.add(DeleteMessageBatchRequestEntry.builder() .id(message.messageId()) .receiptHandle(message.receiptHandle()) .build()); }); // Delete messages. if (!messagesToDelete.isEmpty()) { sqsClient.deleteMessageBatch(DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(messagesToDelete) .build() ).join(); } } // End of while block. } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
En el siguiente ejemplo de código, se muestra cómo:
Crear un tema (FIFO o no FIFO)
Suscribir varias colas al tema con la opción de aplicar un filtro
Publicar mensajes en el tema
Sondear las colas en busca de los mensajes recibidos
- SDK para Java 2.x
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. package com.example.sns; import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.CreateTopicRequest; import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.DeleteTopicRequest; import software.amazon.awssdk.services.sns.model.DeleteTopicResponse; import software.amazon.awssdk.services.sns.model.MessageAttributeValue; import software.amazon.awssdk.services.sns.model.PublishRequest; import software.amazon.awssdk.services.sns.model.PublishResponse; import software.amazon.awssdk.services.sns.model.SetSubscriptionAttributesRequest; import software.amazon.awssdk.services.sns.model.SnsException; import software.amazon.awssdk.services.sns.model.SubscribeRequest; import software.amazon.awssdk.services.sns.model.SubscribeResponse; import software.amazon.awssdk.services.sns.model.UnsubscribeRequest; import software.amazon.awssdk.services.sns.model.UnsubscribeResponse; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Scanner; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * <p> * For more information, see the following documentation topic: * <p> * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * <p> * This Java example performs these tasks: * <p> * 1. Gives the user three options to choose from. * 2. Creates an Amazon Simple Notification Service (Amazon SNS) topic. * 3. Creates an Amazon Simple Queue Service (Amazon SQS) queue. * 4. Gets the SQS queue Amazon Resource Name (ARN) attribute. * 5. Attaches an AWS Identity and Access Management (IAM) policy to the queue. * 6. Subscribes to the SQS queue. * 7. Publishes a message to the topic. * 8. Displays the messages. * 9. Deletes the received message. * 10. Unsubscribes from the topic. * 11. Deletes the SNS topic. */ public class SNSWorkflow { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) { final String usage = "\n" + "Usage:\n" + " <fifoQueueARN>\n\n" + "Where:\n" + " accountId - Your AWS account Id value."; if (args.length != 1) { System.out.println(usage); System.exit(1); } SnsClient snsClient = SnsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); SqsClient sqsClient = SqsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); Scanner in = new Scanner(System.in); String accountId = args[0]; String useFIFO; String duplication = "n"; String topicName; String deduplicationID = null; String groupId = null; String topicArn; String sqsQueueName; String sqsQueueUrl; String sqsQueueArn; String subscriptionArn; boolean selectFIFO = false; String message; List<Message> messageList; List<String> filterList = new ArrayList<>(); String msgAttValue = ""; System.out.println(DASHES); System.out.println("Welcome to messaging with topics and queues."); System.out.println("In this scenario, you will create an SNS topic and subscribe an SQS queue to the topic.\n" + "You can select from several options for configuring the topic and the subscriptions for the queue.\n" + "You can then post to the topic and see the results in the queue."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("SNS topics can be configured as FIFO (First-In-First-Out).\n" + "FIFO topics deliver messages in order and support deduplication and message filtering.\n" + "Would you like to work with FIFO topics? (y/n)"); useFIFO = in.nextLine(); if (useFIFO.compareTo("y") == 0) { selectFIFO = true; System.out.println("You have selected FIFO"); System.out.println(" Because you have chosen a FIFO topic, deduplication is supported.\n" + " Deduplication IDs are either set in the message or automatically generated from content using a hash function.\n" + " If a message is successfully published to an SNS FIFO topic, any message published and determined to have the same deduplication ID,\n" + " within the five-minute deduplication interval, is accepted but not delivered.\n" + " For more information about deduplication, see https://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html."); System.out.println( "Would you like to use content-based deduplication instead of entering a deduplication ID? (y/n)"); duplication = in.nextLine(); if (duplication.compareTo("y") == 0) { System.out.println("Please enter a group id value"); groupId = in.nextLine(); } else { System.out.println("Please enter deduplication Id value"); deduplicationID = in.nextLine(); System.out.println("Please enter a group id value"); groupId = in.nextLine(); } } System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a topic."); System.out.println("Enter a name for your SNS topic."); topicName = in.nextLine(); if (selectFIFO) { System.out.println("Because you have selected a FIFO topic, '.fifo' must be appended to the topic name."); topicName = topicName + ".fifo"; System.out.println("The name of the topic is " + topicName); topicArn = createFIFO(snsClient, topicName, duplication); System.out.println("The ARN of the FIFO topic is " + topicArn); } else { System.out.println("The name of the topic is " + topicName); topicArn = createSNSTopic(snsClient, topicName); System.out.println("The ARN of the non-FIFO topic is " + topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Create an SQS queue."); System.out.println("Enter a name for your SQS queue."); sqsQueueName = in.nextLine(); if (selectFIFO) { sqsQueueName = sqsQueueName + ".fifo"; } sqsQueueUrl = createQueue(sqsClient, sqsQueueName, selectFIFO); System.out.println("The queue URL is " + sqsQueueUrl); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Get the SQS queue ARN attribute."); sqsQueueArn = getSQSQueueAttrs(sqsClient, sqsQueueUrl); System.out.println("The ARN of the new queue is " + sqsQueueArn); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Attach an IAM policy to the queue."); // Define the policy to use. Make sure that you change the REGION if you are // running this code // in a different region. String policy = """ { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "sns.amazonaws.com" }, "Action": "sqs:SendMessage", "Resource": "arn:aws:sqs:us-east-1:%s:%s", "Condition": { "ArnEquals": { "aws:SourceArn": "arn:aws:sns:us-east-1:%s:%s" } } } ] } """.formatted(accountId, sqsQueueName, accountId, topicName); setQueueAttr(sqsClient, sqsQueueUrl, policy); System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Subscribe to the SQS queue."); if (selectFIFO) { System.out.println( "If you add a filter to this subscription, then only the filtered messages will be received in the queue.\n" + "For information about message filtering, see https://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html\n" + "For this example, you can filter messages by a \"tone\" attribute."); System.out.println("Would you like to filter messages for " + sqsQueueName + "'s subscription to the topic " + topicName + "? (y/n)"); String filterAns = in.nextLine(); if (filterAns.compareTo("y") == 0) { boolean moreAns = false; System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); while (!moreAns) { System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": filterList.add("cheerful"); break; case "2": filterList.add("funny"); break; case "3": filterList.add("serious"); break; case "4": filterList.add("sincere"); break; default: moreAns = true; break; } } } } subscriptionArn = subQueue(snsClient, topicArn, sqsQueueArn, filterList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Publish a message to the topic."); if (selectFIFO) { System.out.println("Would you like to add an attribute to this message? (y/n)"); String msgAns = in.nextLine(); if (msgAns.compareTo("y") == 0) { System.out.println("You can filter messages by one or more of the following \"tone\" attributes."); System.out.println("1. cheerful"); System.out.println("2. funny"); System.out.println("3. serious"); System.out.println("4. sincere"); System.out.println("Select a number or choose 0 to end."); String ans = in.nextLine(); switch (ans) { case "1": msgAttValue = "cheerful"; break; case "2": msgAttValue = "funny"; break; case "3": msgAttValue = "serious"; break; default: msgAttValue = "sincere"; break; } System.out.println("Selected value is " + msgAttValue); } System.out.println("Enter a message."); message = in.nextLine(); pubMessageFIFO(snsClient, message, topicArn, msgAttValue, duplication, groupId, deduplicationID); } else { System.out.println("Enter a message."); message = in.nextLine(); pubMessage(snsClient, message, topicArn); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Display the message. Press any key to continue."); in.nextLine(); messageList = receiveMessages(sqsClient, sqsQueueUrl, msgAttValue); for (Message mes : messageList) { System.out.println("Message Id: " + mes.messageId()); System.out.println("Full Message: " + mes.body()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. Delete the received message. Press any key to continue."); in.nextLine(); deleteMessages(sqsClient, sqsQueueUrl, messageList); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Unsubscribe from the topic and delete the queue. Press any key to continue."); in.nextLine(); unSub(snsClient, subscriptionArn); deleteSQSQueue(sqsClient, sqsQueueName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete the topic. Press any key to continue."); in.nextLine(); deleteSNSTopic(snsClient, topicArn); System.out.println(DASHES); System.out.println("The SNS/SQS workflow has completed successfully."); System.out.println(DASHES); } public static void deleteSNSTopic(SnsClient snsClient, String topicArn) { try { DeleteTopicRequest request = DeleteTopicRequest.builder() .topicArn(topicArn) .build(); DeleteTopicResponse result = snsClient.deleteTopic(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); System.out.println(queueName + " was successfully deleted."); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void unSub(SnsClient snsClient, String subscriptionArn) { try { UnsubscribeRequest request = UnsubscribeRequest.builder() .subscriptionArn(subscriptionArn) .build(); UnsubscribeResponse result = snsClient.unsubscribe(request); System.out.println("Status was " + result.sdkHttpResponse().statusCode() + "\nSubscription was removed for " + request.subscriptionArn()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (Message msg : messages) { DeleteMessageBatchRequestEntry entry = DeleteMessageBatchRequestEntry.builder() .id(msg.messageId()) .build(); entries.add(entry); } DeleteMessageBatchRequest deleteMessageBatchRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); sqsClient.deleteMessageBatch(deleteMessageBatchRequest); System.out.println("The batch delete of messages was successful"); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl, String msgAttValue) { try { if (msgAttValue.isEmpty()) { ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveMessageRequest).messages(); } else { // We know there are filters on the message. ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .messageAttributeNames(msgAttValue) // Include other message attributes if needed. .maxNumberOfMessages(5) .build(); return sqsClient.receiveMessage(receiveRequest).messages(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return null; } public static void pubMessage(SnsClient snsClient, String message, String topicArn) { try { PublishRequest request = PublishRequest.builder() .message(message) .topicArn(topicArn) .build(); PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status is " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void pubMessageFIFO(SnsClient snsClient, String message, String topicArn, String msgAttValue, String duplication, String groupId, String deduplicationID) { try { PublishRequest request; // Means the user did not choose to use a message attribute. if (msgAttValue.isEmpty()) { if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { request = PublishRequest.builder() .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .topicArn(topicArn) .build(); } } else { Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); messageAttributes.put(msgAttValue, MessageAttributeValue.builder() .dataType("String") .stringValue("true") .build()); if (duplication.compareTo("y") == 0) { request = PublishRequest.builder() .message(message) .messageGroupId(groupId) .topicArn(topicArn) .build(); } else { // Create a publish request with the message and attributes. request = PublishRequest.builder() .topicArn(topicArn) .message(message) .messageDeduplicationId(deduplicationID) .messageGroupId(groupId) .messageAttributes(messageAttributes) .build(); } } // Publish the message to the topic. PublishResponse result = snsClient.publish(request); System.out .println(result.messageId() + " Message sent. Status was " + result.sdkHttpResponse().statusCode()); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Subscribe to the SQS queue. public static String subQueue(SnsClient snsClient, String topicArn, String queueArn, List<String> filterList) { try { SubscribeRequest request; if (filterList.isEmpty()) { // No filter subscription is added. request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); return result.subscriptionArn(); } else { request = SubscribeRequest.builder() .protocol("sqs") .endpoint(queueArn) .returnSubscriptionArn(true) .topicArn(topicArn) .build(); SubscribeResponse result = snsClient.subscribe(request); System.out.println("The queue " + queueArn + " has been subscribed to the topic " + topicArn + "\n" + "with the subscription ARN " + result.subscriptionArn()); String attributeName = "FilterPolicy"; Gson gson = new Gson(); String jsonString = "{\"tone\": []}"; JsonObject jsonObject = gson.fromJson(jsonString, JsonObject.class); JsonArray toneArray = jsonObject.getAsJsonArray("tone"); for (String value : filterList) { toneArray.add(new JsonPrimitive(value)); } String updatedJsonString = gson.toJson(jsonObject); System.out.println(updatedJsonString); SetSubscriptionAttributesRequest attRequest = SetSubscriptionAttributesRequest.builder() .subscriptionArn(result.subscriptionArn()) .attributeName(attributeName) .attributeValue(updatedJsonString) .build(); snsClient.setSubscriptionAttributes(attRequest); return result.subscriptionArn(); } } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } // Attach a policy to the queue. public static void setQueueAttr(SqsClient sqsClient, String queueUrl, String policy) { try { Map<software.amazon.awssdk.services.sqs.model.QueueAttributeName, String> attrMap = new HashMap<>(); attrMap.put(QueueAttributeName.POLICY, policy); SetQueueAttributesRequest attributesRequest = SetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributes(attrMap) .build(); sqsClient.setQueueAttributes(attributesRequest); System.out.println("The policy has been successfully attached."); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static String getSQSQueueAttrs(SqsClient sqsClient, String queueUrl) { // Specify the attributes to retrieve. List<QueueAttributeName> atts = new ArrayList<>(); atts.add(QueueAttributeName.QUEUE_ARN); GetQueueAttributesRequest attributesRequest = GetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributeNames(atts) .build(); GetQueueAttributesResponse response = sqsClient.getQueueAttributes(attributesRequest); Map<String, String> queueAtts = response.attributesAsStrings(); for (Map.Entry<String, String> queueAtt : queueAtts.entrySet()) return queueAtt.getValue(); return ""; } public static String createQueue(SqsClient sqsClient, String queueName, Boolean selectFIFO) { try { System.out.println("\nCreate Queue"); if (selectFIFO) { Map<QueueAttributeName, String> attrs = new HashMap<>(); attrs.put(QueueAttributeName.FIFO_QUEUE, "true"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(attrs) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } else { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); } } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createSNSTopic(SnsClient snsClient, String topicName) { CreateTopicResponse result; try { CreateTopicRequest request = CreateTopicRequest.builder() .name(topicName) .build(); result = snsClient.createTopic(request); return result.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } public static String createFIFO(SnsClient snsClient, String topicName, String duplication) { try { // Create a FIFO topic by using the SNS service client. Map<String, String> topicAttributes = new HashMap<>(); if (duplication.compareTo("n") == 0) { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); } else { topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "true"); } CreateTopicRequest topicRequest = CreateTopicRequest.builder() .name(topicName) .attributes(topicAttributes) .build(); CreateTopicResponse response = snsClient.createTopic(topicRequest); return response.topicArn(); } catch (SnsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return ""; } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
En el siguiente ejemplo de código, se muestra cómo:
Crear una cola de Amazon SQS.
Enviar lotes de mensajes a la cola.
Recibir lotes de mensajes de la cola.
Eliminar lotes de mensajes de una cola.
- SDK para Java 2.x
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Como se muestra en los siguientes ejemplos, puede gestionar las operaciones de mensajes por lotes con Amazon SQS mediante dos enfoques diferentes: AWS SDK for Java 2.x
SendRecvBatch.java utiliza operaciones por lotes explícitas. Los lotes de mensajes se crean manualmente y se llaman directamente
sendMessageBatch()
.deleteMessageBatch()
También gestionas las respuestas por lotes, incluidos los mensajes fallidos. Este enfoque le proporciona un control total sobre el tamaño de los lotes y la gestión de errores. Sin embargo, requiere más código para gestionar la lógica de procesamiento por lotes.SimpleProducerConsumer.java usa la
SqsAsyncBatchManager
biblioteca de alto nivel para el procesamiento automático de solicitudes por lotes. Las llamadas individualessendMessage()
y lasdeleteMessage()
llamadas se realizan con las mismas firmas de métodos que en el cliente estándar. El SDK almacena automáticamente estas llamadas en búfer y las envía como operaciones por lotes. Este enfoque requiere cambios mínimos en el código y, al mismo tiempo, proporciona beneficios en el rendimiento del procesamiento por lotes.Utilice el procesamiento por lotes explícito cuando necesite un control detallado sobre la composición de los lotes y la gestión de errores. Utilice el procesamiento por lotes automático cuando desee optimizar el rendimiento con cambios mínimos en el código.
SendRecvBatch.java: utiliza operaciones por lotes explícitas con los mensajes.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.SqsException; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ /** * This code demonstrates basic message operations in Amazon Simple Queue Service (Amazon SQS). */ public class SendRecvBatch { private static final Logger LOGGER = LoggerFactory.getLogger(SendRecvBatch.class); private static final SqsClient sqsClient = SqsClient.create(); public static void main(String[] args) { usageDemo(); } /** * Send a batch of messages in a single request to an SQS queue. * This request may return overall success even when some messages were not sent. * The caller must inspect the Successful and Failed lists in the response and * resend any failed messages. * * @param queueUrl The URL of the queue to receive the messages. * @param messages The messages to send to the queue. Each message contains a body and attributes. * @return The response from SQS that contains the list of successful and failed messages. */ public static SendMessageBatchResponse sendMessages( String queueUrl, List<MessageEntry> messages) { try { List<SendMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { MessageEntry message = messages.get(i); entries.add(SendMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .messageBody(message.getBody()) .messageAttributes(message.getAttributes()) .build()); } SendMessageBatchRequest sendBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); SendMessageBatchResponse response = sqsClient.sendMessageBatch(sendBatchRequest); if (!response.successful().isEmpty()) { for (SendMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Message sent: {}: {}", resultEntry.messageId(), messages.get(Integer.parseInt(resultEntry.id())).getBody()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Failed to send: {}: {}", errorEntry.id(), messages.get(Integer.parseInt(errorEntry.id())).getBody()); } } return response; } catch (SqsException e) { LOGGER.error("Send messages failed to queue: {}", queueUrl, e); throw e; } } /** * Receive a batch of messages in a single request from an SQS queue. * * @param queueUrl The URL of the queue from which to receive messages. * @param maxNumber The maximum number of messages to receive (capped at 10 by SQS). * The actual number of messages received might be less. * @param waitTime The maximum time to wait (in seconds) before returning. When * this number is greater than zero, long polling is used. This * can result in reduced costs and fewer false empty responses. * @return The list of Message objects received. These each contain the body * of the message and metadata and custom attributes. */ public static List<Message> receiveMessages(String queueUrl, int maxNumber, int waitTime) { try { ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(maxNumber) .waitTimeSeconds(waitTime) .messageAttributeNames("All") .build(); List<Message> messages = sqsClient.receiveMessage(receiveRequest).messages(); for (Message message : messages) { LOGGER.info("Received message: {}: {}", message.messageId(), message.body()); } return messages; } catch (SqsException e) { LOGGER.error("Couldn't receive messages from queue: {}", queueUrl, e); throw e; } } /** * Delete a batch of messages from a queue in a single request. * * @param queueUrl The URL of the queue from which to delete the messages. * @param messages The list of messages to delete. * @return The response from SQS that contains the list of successful and failed * message deletions. */ public static DeleteMessageBatchResponse deleteMessages(String queueUrl, List<Message> messages) { try { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for (int i = 0; i < messages.size(); i++) { entries.add(DeleteMessageBatchRequestEntry.builder() .id(String.valueOf(i)) .receiptHandle(messages.get(i).receiptHandle()) .build()); } DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(entries) .build(); DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(deleteRequest); if (!response.successful().isEmpty()) { for (DeleteMessageBatchResultEntry resultEntry : response.successful()) { LOGGER.info("Deleted {}", messages.get(Integer.parseInt(resultEntry.id())).receiptHandle()); } } if (!response.failed().isEmpty()) { for (BatchResultErrorEntry errorEntry : response.failed()) { LOGGER.warn("Could not delete {}", messages.get(Integer.parseInt(errorEntry.id())).receiptHandle()); } } return response; } catch (SqsException e) { LOGGER.error("Couldn't delete messages from queue {}", queueUrl, e); throw e; } } /** * Helper class to represent a message with body and attributes. */ public static class MessageEntry { private final String body; private final Map<String, MessageAttributeValue> attributes; public MessageEntry(String body, Map<String, MessageAttributeValue> attributes) { this.body = body; this.attributes = attributes != null ? attributes : new HashMap<>(); } public String getBody() { return body; } public Map<String, MessageAttributeValue> getAttributes() { return attributes; } } /** * Shows how to: * * Read the lines from a file and send the lines in * batches of 10 as messages to a queue. * * Receive the messages in batches until the queue is empty. * * Reassemble the lines of the file and verify they match the original file. */ public static void usageDemo() { LOGGER.info("-".repeat(88)); LOGGER.info("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!"); LOGGER.info("-".repeat(88)); String queueUrl = null; try { // Create a queue for the demo. String queueName = "sqs-usage-demo-message-wrapper-" + System.currentTimeMillis(); CreateQueueRequest createRequest = CreateQueueRequest.builder() .queueName(queueName) .build(); queueUrl = sqsClient.createQueue(createRequest).queueUrl(); LOGGER.info("Created queue: {}", queueUrl); try (InputStream inputStream = SendRecvBatch.class.getResourceAsStream("/log4j2.xml"); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { List<String> lines = reader.lines().toList(); // Send file lines in batches. int batchSize = 10; LOGGER.info("Sending file lines in batches of {} as messages.", batchSize); for (int i = 0; i < lines.size(); i += batchSize) { List<MessageEntry> messageBatch = new ArrayList<>(); for (int j = i; j < Math.min(i + batchSize, lines.size()); j++) { String line = lines.get(j); if (line == null || line.trim().isEmpty()) { continue; // Skip empty lines. } Map<String, MessageAttributeValue> attributes = new HashMap<>(); attributes.put("line", MessageAttributeValue.builder() .dataType("String") .stringValue(String.valueOf(j)) .build()); messageBatch.add(new MessageEntry(lines.get(j), attributes)); } sendMessages(queueUrl, messageBatch); System.out.print("."); System.out.flush(); } LOGGER.info("\nDone. Sent {} messages.", lines.size()); // Receive and process messages. LOGGER.info("Receiving, handling, and deleting messages in batches of {}.", batchSize); String[] receivedLines = new String[lines.size()]; boolean moreMessages = true; while (moreMessages) { List<Message> receivedMessages = receiveMessages(queueUrl, batchSize, 5); for (Message message : receivedMessages) { int lineNumber = Integer.parseInt(message.messageAttributes().get("line").stringValue()); receivedLines[lineNumber] = message.body(); } if (!receivedMessages.isEmpty()) { deleteMessages(queueUrl, receivedMessages); } else { moreMessages = false; } } LOGGER.info("\nDone."); // Verify that all lines were received correctly. boolean allLinesMatch = true; for (int i = 0; i < lines.size(); i++) { String originalLine = lines.get(i); String receivedLine = receivedLines[i] == null ? "" : receivedLines[i]; if (!originalLine.equals(receivedLine)) { allLinesMatch = false; break; } } if (allLinesMatch) { LOGGER.info("Successfully reassembled all file lines!"); } else { LOGGER.info("Uh oh, some lines were missed!"); } } } catch (SqsException e) { LOGGER.error("SQS operation failed", e); } catch (RuntimeException | IOException e) { LOGGER.error("Unexpected runtime error during demo", e); } finally { // Clean up by deleting the queue if it was created. if (queueUrl != null) { try { DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); LOGGER.info("Deleted queue: {}", queueUrl); } catch (SqsException e) { LOGGER.error("Failed to delete queue: {}", queueUrl, e); } } } LOGGER.info("Thanks for watching!"); LOGGER.info("-".repeat(88)); } }
SimpleProducerConsumer.java: utiliza el procesamiento automático de mensajes por lotes.
package com.example.sqs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import software.amazon.awssdk.core.exception.SdkException; import java.math.BigInteger; import java.util.List; import java.util.Random; import java.util.Scanner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** * Demonstrates the AWS SDK for Java 2.x Automatic Request Batching API for Amazon SQS. * * This example showcases the high-level SqsAsyncBatchManager library that provides * efficient batching and buffering for SQS operations. The batch manager offers * methods that directly mirror SqsAsyncClient methods—sendMessage, changeMessageVisibility, * deleteMessage, and receiveMessage—making it a drop-in replacement with minimal code changes. * * Key features of the SqsAsyncBatchManager: * - Automatic batching: The SDK automatically buffers individual requests and sends them * as batches when maxBatchSize (default: 10) or sendRequestFrequency (default: 200ms) * thresholds are reached * - Familiar API: Method signatures match SqsAsyncClient exactly, requiring no learning curve * - Background optimization: The batch manager maintains internal buffers and handles * batching logic transparently * - Asynchronous operations: All methods return CompletableFuture for non-blocking execution * * Performance benefits demonstrated: * - Reduced API calls: Multiple individual requests are consolidated into single batch operations * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process more messages per second * - Efficient resource utilization: Fewer network round trips and better connection reuse * * This example compares: * 1. Single-message operations using SqsAsyncClient directly * 2. Batch operations using SqsAsyncBatchManager with identical method calls * * Usage patterns: * - Set batch size to 1 to use SqsAsyncClient for baseline performance measurement * - Set batch size > 1 to use SqsAsyncBatchManager for optimized batch processing * - Monitor real-time throughput metrics to observe performance improvements * * Prerequisites: * - AWS SDK for Java 2.x version 2.28.0 or later * - An existing SQS queue * - Valid AWS credentials configured * * The program displays real-time metrics showing the dramatic performance difference * between individual operations and automatic batching. */ public class SimpleProducerConsumer { // The maximum runtime of the program. private final static int MAX_RUNTIME_MINUTES = 60; private final static Logger log = LoggerFactory.getLogger(SimpleProducerConsumer.class); /** * Runs the SQS batching demonstration with user-configured parameters. * * Prompts for queue name, thread counts, batch size, message size, and runtime. * Creates producer and consumer threads to demonstrate batching performance. * * @param args command line arguments (not used) * @throws InterruptedException if thread operations are interrupted */ public static void main(String[] args) throws InterruptedException { final Scanner input = new Scanner(System.in); System.out.print("Enter the queue name: "); final String queueName = input.nextLine(); System.out.print("Enter the number of producers: "); final int producerCount = input.nextInt(); System.out.print("Enter the number of consumers: "); final int consumerCount = input.nextInt(); System.out.print("Enter the number of messages per batch: "); final int batchSize = input.nextInt(); System.out.print("Enter the message size in bytes: "); final int messageSizeByte = input.nextInt(); System.out.print("Enter the run time in minutes: "); final int runTimeMinutes = input.nextInt(); // Create SQS async client and batch manager for all operations. // The SqsAsyncBatchManager is created from the SqsAsyncClient using the // batchManager() factory method, which provides default batching configuration. // This high-level library automatically handles request buffering and batching // while maintaining the same method signatures as SqsAsyncClient. final SqsAsyncClient sqsAsyncClient = SqsAsyncClient.create(); final SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager(); final String queueUrl = sqsAsyncClient.getQueueUrl(GetQueueUrlRequest.builder() .queueName(queueName) .build()).join().queueUrl(); // The flag used to stop producer, consumer, and monitor threads. final AtomicBoolean stop = new AtomicBoolean(false); // Start the producers. final AtomicInteger producedCount = new AtomicInteger(); final Thread[] producers = new Thread[producerCount]; for (int i = 0; i < producerCount; i++) { if (batchSize == 1) { producers[i] = new Producer(sqsAsyncClient, queueUrl, messageSizeByte, producedCount, stop); } else { producers[i] = new BatchProducer(batchManager, queueUrl, batchSize, messageSizeByte, producedCount, stop); } producers[i].start(); } // Start the consumers. final AtomicInteger consumedCount = new AtomicInteger(); final Thread[] consumers = new Thread[consumerCount]; for (int i = 0; i < consumerCount; i++) { if (batchSize == 1) { consumers[i] = new Consumer(sqsAsyncClient, queueUrl, consumedCount, stop); } else { consumers[i] = new BatchConsumer(batchManager, queueUrl, batchSize, consumedCount, stop); } consumers[i].start(); } // Start the monitor thread. final Thread monitor = new Monitor(producedCount, consumedCount, stop); monitor.start(); // Wait for the specified amount of time then stop. Thread.sleep(TimeUnit.MINUTES.toMillis(Math.min(runTimeMinutes, MAX_RUNTIME_MINUTES))); stop.set(true); // Join all threads. for (int i = 0; i < producerCount; i++) { producers[i].join(); } for (int i = 0; i < consumerCount; i++) { consumers[i].join(); } monitor.interrupt(); monitor.join(); // Close resources batchManager.close(); sqsAsyncClient.close(); } /** * Creates a random string of approximately the specified size in bytes. * * @param sizeByte the target size in bytes for the generated string * @return a random string encoded in base-32 */ private static String makeRandomString(int sizeByte) { final byte[] bs = new byte[(int) Math.ceil(sizeByte * 5 / 8)]; new Random().nextBytes(bs); bs[0] = (byte) ((bs[0] | 64) & 127); return new BigInteger(bs).toString(32); } /** * Sends messages individually using SqsAsyncClient for baseline performance measurement. * * This producer demonstrates traditional single-message operations without batching. * Each sendMessage() call results in a separate API request to SQS, providing * a performance baseline for comparison with the batch operations. * * The sendMessage() method signature is identical to SqsAsyncBatchManager.sendMessage(), * showing how the high-level batching library maintains API compatibility while * adding automatic optimization behind the scenes. */ private static class Producer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for single-message operations. * * @param sqsAsyncClient the SQS client for sending messages * @param queueUrl the URL of the target queue * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ Producer(SqsAsyncClient sqsAsyncClient, String queueUrl, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends messages until the stop flag is set. * * Uses SqsAsyncClient.sendMessage() directly, resulting in one API call per message. * This approach provides baseline performance metrics for comparison with batching. * Each call blocks until the individual message is sent, demonstrating traditional * one-request-per-operation behavior. */ public void run() { try { while (!stop.get()) { sqsAsyncClient.sendMessage(SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()).join(); producedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Producer: " + e.getMessage()); System.exit(1); } } } /** * Sends messages using SqsAsyncBatchManager for automatic request batching and optimization. * * This producer demonstrates the AWS SDK for Java 2.x high-level batching library. * The SqsAsyncBatchManager automatically buffers individual sendMessage() calls and * sends them as batches when thresholds are reached: * - maxBatchSize: Maximum 10 messages per batch (default) * - sendRequestFrequency: 200ms timeout before sending partial batches (default) * * Key advantages of the batching approach: * - Identical API: batchManager.sendMessage() has the same signature as sqsAsyncClient.sendMessage() * - Automatic optimization: No code changes needed to benefit from batching * - Transparent buffering: The SDK handles batching logic internally * - Reduced API calls: Multiple messages sent in single batch requests * - Lower costs: Fewer API calls result in reduced SQS charges * - Higher throughput: Batch operations process significantly more messages per second */ private static class BatchProducer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger producedCount; final AtomicBoolean stop; final String theMessage; /** * Creates a producer thread for batch operations. * * @param batchManager the batch manager for efficient message sending * @param queueUrl the URL of the target queue * @param batchSize the number of messages to send per batch * @param messageSizeByte the size of messages to generate * @param producedCount shared counter for tracking sent messages * @param stop shared flag to signal thread termination */ BatchProducer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, int messageSizeByte, AtomicInteger producedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.producedCount = producedCount; this.stop = stop; this.theMessage = makeRandomString(messageSizeByte); } /** * Continuously sends batches of messages using the high-level batching library. * * Notice how batchManager.sendMessage() uses the exact same method signature * and request builder pattern as SqsAsyncClient.sendMessage(). This demonstrates * the drop-in replacement capability of the SqsAsyncBatchManager. * * The SDK automatically: * - Buffers individual sendMessage() calls internally * - Groups them into batch requests when thresholds are met * - Sends SendMessageBatchRequest operations to SQS * - Returns individual CompletableFuture responses for each message * * This transparent batching provides significant performance improvements * without requiring changes to application logic or error handling patterns. */ public void run() { try { while (!stop.get()) { // Send multiple messages using the high-level batch manager. // Each batchManager.sendMessage() call uses identical syntax to // sqsAsyncClient.sendMessage(), demonstrating API compatibility. // The SDK automatically buffers these calls and sends them as // batch operations when maxBatchSize (10) or sendRequestFrequency (200ms) // thresholds are reached, significantly improving throughput. for (int i = 0; i < batchSize; i++) { CompletableFuture<SendMessageResponse> future = batchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(theMessage) .build()); // Handle the response asynchronously future.whenComplete((response, throwable) -> { if (throwable == null) { producedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchProducer: Failed to send message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } // Small delay to allow batching to occur Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchProducer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { log.error("BatchProducer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages individually using SqsAsyncClient for baseline measurement. * * This consumer demonstrates traditional single-message operations without batching. * Each receiveMessage() and deleteMessage() call results in separate API requests, * providing a performance baseline for comparison with batch operations. * * The method signatures are identical to SqsAsyncBatchManager methods: * - receiveMessage() matches batchManager.receiveMessage() * - deleteMessage() matches batchManager.deleteMessage() * * This API consistency allows easy migration to the high-level batching library. */ private static class Consumer extends Thread { final SqsAsyncClient sqsAsyncClient; final String queueUrl; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for single-message operations. * * @param sqsAsyncClient the SQS client for receiving messages * @param queueUrl the URL of the source queue * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ Consumer(SqsAsyncClient sqsAsyncClient, String queueUrl, AtomicInteger consumedCount, AtomicBoolean stop) { this.sqsAsyncClient = sqsAsyncClient; this.queueUrl = queueUrl; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using traditional single-request operations. * * Uses SqsAsyncClient methods directly: * - receiveMessage(): One API call per receive operation * - deleteMessage(): One API call per delete operation * * This approach demonstrates the baseline performance without batching optimization. * Compare these method calls with the identical signatures used in BatchConsumer * to see how the high-level batching library maintains API compatibility. */ public void run() { try { while (!stop.get()) { try { final ReceiveMessageResponse result = sqsAsyncClient.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build()).join(); if (!result.messages().isEmpty()) { final Message m = result.messages().get(0); // Note: deleteMessage() signature identical to batchManager.deleteMessage() sqsAsyncClient.deleteMessage(DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()) .build()).join(); consumedCount.incrementAndGet(); } } catch (SdkException | java.util.concurrent.CompletionException e) { log.error(e.getMessage()); } } } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("Consumer: " + e.getMessage()); System.exit(1); } } } /** * Receives and deletes messages using SqsAsyncBatchManager for automatic optimization. * * This consumer demonstrates the AWS SDK for Java 2.x high-level batching library * for message consumption. The SqsAsyncBatchManager provides two key optimizations: * * 1. Receive optimization: Maintains an internal buffer of messages fetched in the * background, so receiveMessage() calls return immediately from the buffer * 2. Delete batching: Automatically buffers deleteMessage() calls and sends them * as DeleteMessageBatchRequest operations when thresholds are reached * * Key features: * - Identical API: receiveMessage() and deleteMessage() have the same signatures * as SqsAsyncClient methods, making this a true drop-in replacement * - Background fetching: The batch manager continuously fetches messages to keep * the internal buffer populated, reducing receive latency * - Automatic delete batching: Individual deleteMessage() calls are buffered and * sent as batch operations (up to 10 per batch, 200ms frequency) * - Transparent optimization: No application logic changes needed to benefit * * Performance benefits: * - Reduced API calls through automatic batching of delete operations * - Lower latency for receives due to background message buffering * - Higher overall throughput with fewer network round trips */ private static class BatchConsumer extends Thread { final SqsAsyncBatchManager batchManager; final String queueUrl; final int batchSize; final AtomicInteger consumedCount; final AtomicBoolean stop; /** * Creates a consumer thread for batch operations. * * @param batchManager the batch manager for efficient message processing * @param queueUrl the URL of the source queue * @param batchSize the maximum number of messages to receive per batch * @param consumedCount shared counter for tracking processed messages * @param stop shared flag to signal thread termination */ BatchConsumer(SqsAsyncBatchManager batchManager, String queueUrl, int batchSize, AtomicInteger consumedCount, AtomicBoolean stop) { this.batchManager = batchManager; this.queueUrl = queueUrl; this.batchSize = batchSize; this.consumedCount = consumedCount; this.stop = stop; } /** * Continuously receives and deletes messages using the high-level batching library. * * Demonstrates the key advantage of SqsAsyncBatchManager: identical method signatures * with automatic optimization. Notice how: * * - batchManager.receiveMessage() uses the same syntax as sqsAsyncClient.receiveMessage() * - batchManager.deleteMessage() uses the same syntax as sqsAsyncClient.deleteMessage() * * Behind the scenes, the batch manager: * 1. Maintains an internal message buffer populated by background fetching * 2. Returns messages immediately from the buffer (reduced latency) * 3. Automatically batches deleteMessage() calls into DeleteMessageBatchRequest operations * 4. Sends batch deletes when maxBatchSize (10) or sendRequestFrequency (200ms) is reached * * This provides significant performance improvements with zero code changes * compared to traditional SqsAsyncClient usage patterns. */ public void run() { try { while (!stop.get()) { // Receive messages using the high-level batch manager. // This call uses identical syntax to sqsAsyncClient.receiveMessage() // but benefits from internal message buffering for improved performance. final ReceiveMessageResponse result = batchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages(Math.min(batchSize, 10)) .build()).join(); if (!result.messages().isEmpty()) { final List<Message> messages = result.messages(); // Delete messages using the batch manager. // Each deleteMessage() call uses identical syntax to SqsAsyncClient // but the SDK automatically buffers these calls and sends them // as DeleteMessageBatchRequest operations for optimal performance. for (Message message : messages) { CompletableFuture<DeleteMessageResponse> future = batchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(message.receiptHandle()) .build()); future.whenComplete((response, throwable) -> { if (throwable == null) { consumedCount.incrementAndGet(); } else if (!(throwable instanceof java.util.concurrent.CancellationException) && !(throwable.getMessage() != null && throwable.getMessage().contains("executor not accepting a task"))) { log.error("BatchConsumer: Failed to delete message", throwable); } // Ignore CancellationException and executor shutdown errors - expected during shutdown }); } } // Small delay to prevent tight polling Thread.sleep(10); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("BatchConsumer interrupted: " + e.getMessage()); } catch (SdkException | java.util.concurrent.CompletionException e) { // Handle both SdkException and CompletionException from async operations. // If this unlikely condition occurs, stop. log.error("BatchConsumer: " + e.getMessage()); System.exit(1); } } } /** * Displays real-time throughput statistics every second. * * This thread logs the current count of produced and consumed messages * to help you monitor the performance comparison. */ private static class Monitor extends Thread { private final AtomicInteger producedCount; private final AtomicInteger consumedCount; private final AtomicBoolean stop; /** * Creates a monitoring thread that displays throughput statistics. * * @param producedCount shared counter for messages sent * @param consumedCount shared counter for messages processed * @param stop shared flag to signal thread termination */ Monitor(AtomicInteger producedCount, AtomicInteger consumedCount, AtomicBoolean stop) { this.producedCount = producedCount; this.consumedCount = consumedCount; this.stop = stop; } /** * Logs throughput statistics every second until stopped. * * Displays the current count of produced and consumed messages * to help monitor the performance comparison between batching strategies. */ public void run() { try { while (!stop.get()) { Thread.sleep(1000); log.info("produced messages = " + producedCount.get() + ", consumed messages = " + consumedCount.get()); } } catch (InterruptedException e) { // Allow the thread to exit. } } } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
El siguiente ejemplo de código muestra cómo utilizar la biblioteca de mensajería Java de Amazon SQS para trabajar con la interfaz JMS.
- SDK para Java 2.x
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Los siguientes ejemplos funcionan con colas estándar de Amazon SQS e incluyen:
-
Envío de un mensaje de texto.
-
Recibir mensajes de forma sincrónica.
-
Recibir mensajes de forma asíncrona.
-
Recibir mensajes mediante el modo CLIENT_ACNOWLEDGE.
-
Recibir mensajes mediante el modo UNORDERED_ACNOWLEDGE.
-
Uso de Spring para inyectar dependencias.
-
Una clase de utilidad que proporciona los métodos comunes utilizados en los otros ejemplos.
Para obtener más información sobre el uso de JMS con Amazon SQS, consulte la Guía para desarrolladores de Amazon SQS.
Envío de un mensaje de texto.
/** * This method establishes a connection to a standard Amazon SQS queue using the Amazon SQS * Java Messaging Library and sends text messages to it. It uses JMS (Java Message Service) API * with automatic acknowledgment mode to ensure reliable message delivery, and automatically * manages all messaging resources. * * @throws JMSException If there is a problem connecting to or sending messages to the queue */ public static void doSendTextMessage() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try (SQSConnection connection = connectionFactory.createConnection()) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session that uses the JMS auto-acknowledge mode. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); createAndSendMessages(session, producer); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed"); } /** * This method reads text input from the keyboard and sends each line as a separate message * to a standard Amazon SQS queue using the Amazon SQS Java Messaging Library. It continues * to accept input until the user enters an empty line, using JMS (Java Message Service) API to * handle the message delivery. * * @param session The JMS session used to create messages * @param producer The JMS message producer used to send messages to the queue */ private static void createAndSendMessages(Session session, MessageProducer producer) { BufferedReader inputReader = new BufferedReader( new InputStreamReader(System.in, Charset.defaultCharset())); try { String input; while (true) { LOGGER.info("Enter message to send (leave empty to exit): "); input = inputReader.readLine(); if (input == null || input.isEmpty()) break; TextMessage message = session.createTextMessage(input); producer.send(message); LOGGER.info("Send message {}", message.getJMSMessageID()); } } catch (EOFException e) { // Just return on EOF } catch (IOException e) { LOGGER.error("Failed reading input: {}", e.getMessage(), e); } catch (JMSException e) { LOGGER.error("Failed sending message: {}", e.getMessage(), e); } }
Recibir mensajes de forma sincrónica.
/** * This method receives messages from a standard Amazon SQS queue using the Amazon SQS Java * Messaging Library. It creates a connection to the queue using JMS (Java Message Service), * waits for messages to arrive, and processes them one at a time. The method handles all * necessary setup and cleanup of messaging resources. * * @throws JMSException If there is a problem connecting to or receiving messages from the queue */ public static void doReceiveMessageSync() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create a connection. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); connection.start(); receiveMessages(consumer); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed"); } /** * This method continuously checks for new messages from a standard Amazon SQS queue using * the Amazon SQS Java Messaging Library. It waits up to 20 seconds for each message, processes * it using JMS (Java Message Service), and confirms receipt. The method stops checking for * messages after 20 seconds of no activity. * * @param consumer The JMS message consumer that receives messages from the queue */ private static void receiveMessages(MessageConsumer consumer) { try { while (true) { LOGGER.info("Waiting for messages..."); // Wait 1 minute for a message Message message = consumer.receive(Duration.ofSeconds(20).toMillis()); if (message == null) { LOGGER.info("Shutting down after 20 seconds of silence."); break; } SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); LOGGER.info("Acknowledged message {}", message.getJMSMessageID()); } } catch (JMSException e) { LOGGER.error("Error receiving from SQS: {}", e.getMessage(), e); } }
Recibir mensajes de forma asíncrona.
/** * This method sets up automatic message handling for a standard Amazon SQS queue using the * Amazon SQS Java Messaging Library. It creates a listener that processes messages as soon * as they arrive using JMS (Java Message Service), runs for 5 seconds, then cleans up all * messaging resources. * * @throws JMSException If there is a problem connecting to or receiving messages from the queue */ public static void doReceiveMessageAsync() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create a connection. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); // Create a session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); try { // Create a consumer for the queue. MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Provide an implementation of the MessageListener interface, which has a single 'onMessage' method. // We use a lambda expression for the implementation. consumer.setMessageListener(message -> { try { SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); } catch (JMSException e) { LOGGER.error("Error processing message: {}", e.getMessage()); } }); // Start receiving incoming messages. connection.start(); LOGGER.info("Waiting for messages..."); } catch (JMSException e) { throw new RuntimeException(e); } try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } } // The connection closes automatically. This also closes the session. LOGGER.info( "Connection closed" ); }
Recibir mensajes mediante el modo CLIENT_ACNOWLEDGE.
/** * This method demonstrates how message acknowledgment affects message processing in a standard * Amazon SQS queue using the Amazon SQS Java Messaging Library. It sends messages to the queue, * then shows how JMS (Java Message Service) client acknowledgment mode handles both explicit * and implicit message confirmations, including how acknowledging one message can automatically * acknowledge previous messages. * * @throws JMSException If there is a problem with the messaging operations */ public static void doReceiveMessagesSyncClientAcknowledge() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try (SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, TIME_OUT_SECONDS); // Create a session with client acknowledge mode. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Create a producer and consumer. MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Open the connection. connection.start(); // Send two text messages. sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it. receiveMessage(consumer, false); // Receive another message and acknowledge it. receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue, LOGGER.info("Waiting for visibility timeout..."); try { Thread.sleep(TIME_OUT_MILLIS); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for visibility timeout", e); Thread.currentThread().interrupt(); throw new RuntimeException("Processing interrupted", e); } /* We will attempt to receive another message, but none will be available. This is because in CLIENT_ACKNOWLEDGE mode, when we acknowledged the second message, all previous messages were automatically acknowledged as well. Therefore, although we never directly acknowledged the first message, it was implicitly acknowledged when we confirmed the second one. */ receiveMessage(consumer, true); } // The connection closes automatically. This also closes the session. LOGGER.info("Connection closed."); } /** * Sends a text message using the specified JMS MessageProducer and Session. * * @param producer The JMS MessageProducer used to send the message * @param session The JMS Session used to create the text message * @param messageText The text content to be sent in the message * @throws JMSException If there is an error creating or sending the message */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it. producer.send(session.createTextMessage(messageText)); } /** * Receives and processes a message from a JMS queue using the specified consumer. * The method waits for a message until the configured timeout period is reached. * If a message is received, it is logged and optionally acknowledged based on the * acknowledge parameter. * * @param consumer The JMS MessageConsumer used to receive messages from the queue * @param acknowledge Boolean flag indicating whether to acknowledge the message. * If true, the message will be acknowledged after processing * @throws JMSException If there is an error receiving, processing, or acknowledging the message */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message. Message message = consumer.receive(TIME_OUT_MILLIS); if (message == null) { LOGGER.info("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text. LOGGER.info("Received: {} Acknowledged: {}", ((TextMessage) message).getText(), acknowledge); // Acknowledge the message if asked. if (acknowledge) message.acknowledge(); } }
Recibir mensajes mediante el modo UNORDERED_ACNOWLEDGE.
/** * Demonstrates message acknowledgment behavior in UNORDERED_ACKNOWLEDGE mode with Amazon SQS JMS. * In this mode, each message must be explicitly acknowledged regardless of receive order. * Unacknowledged messages return to the queue after the visibility timeout expires, * unlike CLIENT_ACKNOWLEDGE mode where acknowledging one message acknowledges all previous messages. * * @throws JMSException If a JMS-related error occurs during message operations */ public static void doReceiveMessagesUnorderedAcknowledge() throws JMSException { // Create a connection factory. SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); // Create the connection in a try-with-resources statement so that it's closed automatically. try( SQSConnection connection = connectionFactory.createConnection() ) { // Create the queue if needed. SqsJmsExampleUtils.ensureQueueExists(connection, QUEUE_NAME, TIME_OUT_SECONDS); // Create a session with unordered acknowledge mode. Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); // Create the producer and consumer. MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); // Open a connection. connection.start(); // Send two text messages. sendMessage(producer, session, "Message 1"); sendMessage(producer, session, "Message 2"); // Receive a message and don't acknowledge it. receiveMessage(consumer, false); // Receive another message and acknowledge it. receiveMessage(consumer, true); // Wait for the visibility time out, so that unacknowledged messages reappear in the queue. LOGGER.info("Waiting for visibility timeout..."); try { Thread.sleep(TIME_OUT_MILLIS); } catch (InterruptedException e) { LOGGER.error("Interrupted while waiting for visibility timeout", e); Thread.currentThread().interrupt(); throw new RuntimeException("Processing interrupted", e); } /* We will attempt to receive another message, and we'll get the first message again. This occurs because in UNORDERED_ACKNOWLEDGE mode, each message requires its own separate acknowledgment. Since we only acknowledged the second message, the first message remains in the queue for redelivery. */ receiveMessage(consumer, true); LOGGER.info("Connection closed."); } // The connection closes automatically. This also closes the session. } /** * Sends a text message to an Amazon SQS queue using JMS. * * @param producer The JMS MessageProducer for the queue * @param session The JMS Session for message creation * @param messageText The message content * @throws JMSException If message creation or sending fails */ private static void sendMessage(MessageProducer producer, Session session, String messageText) throws JMSException { // Create a text message and send it. producer.send(session.createTextMessage(messageText)); } /** * Synchronously receives a message from an Amazon SQS queue using the JMS API * with an acknowledgment parameter. * * @param consumer The JMS MessageConsumer for the queue * @param acknowledge If true, acknowledges the message after receipt * @throws JMSException If message reception or acknowledgment fails */ private static void receiveMessage(MessageConsumer consumer, boolean acknowledge) throws JMSException { // Receive a message. Message message = consumer.receive(TIME_OUT_MILLIS); if (message == null) { LOGGER.info("Queue is empty!"); } else { // Since this queue has only text messages, cast the message object and print the text. LOGGER.info("Received: {} Acknowledged: {}", ((TextMessage) message).getText(), acknowledge); // Acknowledge the message if asked. if (acknowledge) message.acknowledge(); } }
Uso de Spring para inyectar dependencias.
package com.example.sqs.jms.spring; import com.amazon.sqs.javamessaging.SQSConnection; import com.example.sqs.jms.SqsJmsExampleUtils; import jakarta.jms.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.support.FileSystemXmlApplicationContext; import java.io.File; import java.net.URL; import java.util.concurrent.TimeUnit; /** * Demonstrates how to send and receive messages using the Amazon SQS Java Messaging Library * with Spring Framework integration. This example connects to a standard Amazon SQS message * queue using Spring's dependency injection to configure the connection and messaging components. * The application uses the JMS (Java Message Service) API to handle message operations. */ public class SpringExample { private static final Integer POLLING_SECONDS = 15; private static final String SPRING_XML_CONFIG_FILE = "SpringExampleConfiguration.xml.txt"; private static final Logger LOGGER = LoggerFactory.getLogger(SpringExample.class); /** * Demonstrates sending and receiving messages through a standard Amazon SQS message queue * using Spring Framework configuration. This method loads connection settings from an XML file, * establishes a messaging session using the Amazon SQS Java Messaging Library, and processes * messages using JMS (Java Message Service) operations. If the queue doesn't exist, it will * be created automatically. * * @param args Command line arguments (not used) */ public static void main(String[] args) { URL resource = SpringExample.class.getClassLoader().getResource(SPRING_XML_CONFIG_FILE); File springFile = new File(resource.getFile()); if (!springFile.exists() || !springFile.canRead()) { LOGGER.error("File " + SPRING_XML_CONFIG_FILE + " doesn't exist or isn't readable."); System.exit(1); } try (FileSystemXmlApplicationContext context = new FileSystemXmlApplicationContext("file://" + springFile.getAbsolutePath())) { Connection connection; try { connection = context.getBean(Connection.class); } catch (NoSuchBeanDefinitionException e) { LOGGER.error("Can't find the JMS connection to use: " + e.getMessage(), e); System.exit(2); return; } String queueName; try { queueName = context.getBean("queueName", String.class); } catch (NoSuchBeanDefinitionException e) { LOGGER.error("Can't find the name of the queue to use: " + e.getMessage(), e); System.exit(3); return; } try { if (connection instanceof SQSConnection) { SqsJmsExampleUtils.ensureQueueExists((SQSConnection) connection, queueName, SqsJmsExampleUtils.QUEUE_VISIBILITY_TIMEOUT); } // Create the JMS session. Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); SqsJmsExampleUtils.sendTextMessage(session, queueName); MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); receiveMessages(consumer); } catch (JMSException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } } // Spring context autocloses. Managed Spring beans that implement AutoClosable, such as the // 'connection' bean, are also closed. LOGGER.info("Context closed"); } /** * Continuously checks for and processes messages from a standard Amazon SQS message queue * using the Amazon SQS Java Messaging Library underlying the JMS API. This method waits for incoming messages, * processes them when they arrive, and acknowledges their receipt using JMS (Java Message * Service) operations. The method will stop checking for messages after 15 seconds of * inactivity. * * @param consumer The JMS message consumer used to receive messages from the queue */ private static void receiveMessages(MessageConsumer consumer) { try { while (true) { LOGGER.info("Waiting for messages..."); // Wait 15 seconds for a message. Message message = consumer.receive(TimeUnit.SECONDS.toMillis(POLLING_SECONDS)); if (message == null) { LOGGER.info("Shutting down after {} seconds of silence.", POLLING_SECONDS); break; } SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); LOGGER.info("Message acknowledged."); } } catch (JMSException e) { LOGGER.error("Error receiving from SQS.", e); } } }
Definiciones de Spring Bean.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd "> <!-- Define the AWS Region --> <bean id="region" class="software.amazon.awssdk.regions.Region" factory-method="of"> <constructor-arg value="us-east-1"/> </bean> <bean id="credentialsProviderBean" class="software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider" factory-method="create"/> <bean id="clientBuilder" class="software.amazon.awssdk.services.sqs.SqsClient" factory-method="builder"/> <bean id="regionSetClientBuilder" factory-bean="clientBuilder" factory-method="region"> <constructor-arg ref="region"/> </bean> <!-- Configure the Builder with Credentials Provider --> <bean id="sqsClient" factory-bean="regionSetClientBuilder" factory-method="credentialsProvider"> <constructor-arg ref="credentialsProviderBean"/> </bean> <bean id="providerConfiguration" class="com.amazon.sqs.javamessaging.ProviderConfiguration"> <property name="numberOfMessagesToPrefetch" value="5"/> </bean> <bean id="connectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"> <constructor-arg ref="providerConfiguration"/> <constructor-arg ref="clientBuilder"/> </bean> <bean id="connection" factory-bean="connectionFactory" factory-method="createConnection" init-method="start" destroy-method="close"/> <bean id="queueName" class="java.lang.String"> <constructor-arg value="SQSJMSClientExampleQueue"/> </bean> </beans>
Clase de utilidad que proporciona los métodos comunes utilizados en los demás ejemplos.
package com.example.sqs.jms; import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; import com.amazon.sqs.javamessaging.ProviderConfiguration; import com.amazon.sqs.javamessaging.SQSConnection; import com.amazon.sqs.javamessaging.SQSConnectionFactory; import jakarta.jms.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import java.time.Duration; import java.util.Base64; import java.util.Map; /** * This utility class provides helper methods for working with Amazon Simple Queue Service (Amazon SQS) * through the Java Message Service (JMS) interface. It contains common operations for managing message * queues and handling message delivery. */ public class SqsJmsExampleUtils { private static final Logger LOGGER = LoggerFactory.getLogger(SqsJmsExampleUtils.class); public static final Long QUEUE_VISIBILITY_TIMEOUT = 5L; /** * This method verifies that a message queue exists and creates it if necessary. The method checks for * an existing queue first to optimize performance. * * @param connection The active connection to the messaging service * @param queueName The name of the queue to verify or create * @param visibilityTimeout The duration in seconds that messages will be hidden after being received * @throws JMSException If there is an error accessing or creating the queue */ public static void ensureQueueExists(SQSConnection connection, String queueName, Long visibilityTimeout) throws JMSException { AmazonSQSMessagingClientWrapper client = connection.getWrappedAmazonSQSClient(); /* In most cases, you can do this with just a 'createQueue' call, but 'getQueueUrl' (called by 'queueExists') is a faster operation for the common case where the queue already exists. Also, many users and roles have permission to call 'getQueueUrl' but don't have permission to call 'createQueue'. */ if( !client.queueExists(queueName) ) { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(Map.of(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(visibilityTimeout))) .build(); client.createQueue( createQueueRequest ); } } /** * This method sends a simple text message to a specified message queue. It handles all necessary * setup for the message delivery process. * * @param session The active messaging session used to create and send the message * @param queueName The name of the queue where the message will be sent */ public static void sendTextMessage(Session session, String queueName) { // Rest of implementation... try { MessageProducer producer = session.createProducer( session.createQueue( queueName) ); Message message = session.createTextMessage("Hello world!"); producer.send(message); } catch (JMSException e) { LOGGER.error( "Error receiving from SQS", e ); } } /** * This method processes incoming messages and logs their content based on the message type. * It supports text messages, binary data, and Java objects. * * @param message The message to be processed and logged * @throws JMSException If there is an error reading the message content */ public static void handleMessage(Message message) throws JMSException { // Rest of implementation... LOGGER.info( "Got message {}", message.getJMSMessageID() ); LOGGER.info( "Content: "); if(message instanceof TextMessage txtMessage) { LOGGER.info( "\t{}", txtMessage.getText() ); } else if(message instanceof BytesMessage byteMessage){ // Assume the length fits in an int - SQS only supports sizes up to 256k so that // should be true byte[] bytes = new byte[(int)byteMessage.getBodyLength()]; byteMessage.readBytes(bytes); LOGGER.info( "\t{}", Base64.getEncoder().encodeToString( bytes ) ); } else if( message instanceof ObjectMessage) { ObjectMessage objMessage = (ObjectMessage) message; LOGGER.info( "\t{}", objMessage.getObject() ); } } /** * This method sets up automatic message processing for a specified queue. It creates a listener * that will receive and handle incoming messages without blocking the main program. * * @param session The active messaging session * @param queueName The name of the queue to monitor * @param connection The active connection to the messaging service */ public static void receiveMessagesAsync(Session session, String queueName, Connection connection) { // Rest of implementation... try { // Create a consumer for the queue. MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); // Provide an implementation of the MessageListener interface, which has a single 'onMessage' method. // We use a lambda expression for the implementation. consumer.setMessageListener(message -> { try { SqsJmsExampleUtils.handleMessage(message); message.acknowledge(); } catch (JMSException e) { LOGGER.error("Error processing message: {}", e.getMessage()); } }); // Start receiving incoming messages. connection.start(); } catch (JMSException e) { throw new RuntimeException(e); } try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * This method performs cleanup operations after message processing is complete. It receives * any messages in the specified queue, removes the message queue and closes all * active connections to prevent resource leaks. * * @param queueName The name of the queue to be removed * @param visibilityTimeout The duration in seconds that messages are hidden after being received * @throws JMSException If there is an error during the cleanup process */ public static void cleanUpExample(String queueName, Long visibilityTimeout) throws JMSException { LOGGER.info("Performing cleanup."); SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); try (SQSConnection connection = connectionFactory.createConnection() ) { ensureQueueExists(connection, queueName, visibilityTimeout); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); receiveMessagesAsync(session, queueName, connection); SqsClient sqsClient = connection.getWrappedAmazonSQSClient().getAmazonSQSClient(); try { String queueUrl = sqsClient.getQueueUrl(b -> b.queueName(queueName)).queueUrl(); sqsClient.deleteQueue(b -> b.queueUrl(queueUrl)); LOGGER.info("Queue deleted: {}", queueUrl); } catch (SdkException e) { LOGGER.error("Error during SQS operations: ", e); } } LOGGER.info("Clean up: Connection closed"); } /** * This method creates a background task that sends multiple messages to a specified queue * after waiting for a set time period. The task operates independently to ensure efficient * message processing without interrupting other operations. * * @param queueName The name of the queue where messages will be sent * @param secondsToWait The number of seconds to wait before sending messages * @param numMessages The number of messages to send * @param visibilityTimeout The duration in seconds that messages remain hidden after being received * @return A task that can be executed to send the messages */ public static Runnable sendAMessageAsync(String queueName, Long secondsToWait, Integer numMessages, Long visibilityTimeout) { return () -> { try { Thread.sleep(Duration.ofSeconds(secondsToWait).toMillis()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } try { SQSConnectionFactory connectionFactory = new SQSConnectionFactory( new ProviderConfiguration(), SqsClient.create() ); try (SQSConnection connection = connectionFactory.createConnection()) { ensureQueueExists(connection, queueName, visibilityTimeout); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); for (int i = 1; i <= numMessages; i++) { MessageProducer producer = session.createProducer(session.createQueue(queueName)); producer.send(session.createTextMessage("Hello World " + i + "!")); } } } catch (JMSException e) { LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } }; } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
El siguiente ejemplo de código muestra cómo realizar una operación de etiquetado con Amazon SQS.
- SDK para Java 2.x
-
nota
Hay más información al respecto. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. El siguiente ejemplo crea etiquetas para una cola, enumera las etiquetas y elimina una etiqueta.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ListQueueTagsResponse; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; import software.amazon.awssdk.services.sqs.model.SqsException; import java.util.Map; import java.util.UUID; /** * Before running this Java V2 code example, set up your development environment, including your credentials. For more * information, see the <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html">AWS * SDK for Java Developer Guide</a>. */ public class TagExamples { static final SqsClient sqsClient = SqsClient.create(); static final String queueName = "TagExamples-queue-" + UUID.randomUUID().toString().replace("-", "").substring(0, 20); private static final Logger LOGGER = LoggerFactory.getLogger(TagExamples.class); public static void main(String[] args) { final String queueUrl; try { queueUrl = sqsClient.createQueue(b -> b.queueName(queueName)).queueUrl(); LOGGER.info("Queue created. The URL is: {}", queueUrl); } catch (RuntimeException e) { LOGGER.error("Program ending because queue was not created."); throw new RuntimeException(e); } try { addTags(queueUrl); listTags(queueUrl); removeTags(queueUrl); } catch (RuntimeException e) { LOGGER.error("Program ending because of an error in a method."); } finally { try { sqsClient.deleteQueue(b -> b.queueUrl(queueUrl)); LOGGER.info("Queue successfully deleted. Program ending."); sqsClient.close(); } catch (RuntimeException e) { LOGGER.error("Program ending."); } finally { sqsClient.close(); } } } /** This method demonstrates how to use a Java Map to a tag a aueue. * @param queueUrl The URL of the queue to tag. */ public static void addTags(String queueUrl) { // Build a map of the tags. final Map<String, String> tagsToAdd = Map.of( "Team", "Development", "Priority", "Beta", "Accounting ID", "456def"); try { // Add tags to the queue using a Consumer<TagQueueRequest.Builder> parameter. sqsClient.tagQueue(b -> b .queueUrl(queueUrl) .tags(tagsToAdd) ); } catch (QueueDoesNotExistException e) { LOGGER.error("Queue does not exist: {}", e.getMessage(), e); throw new RuntimeException(e); } } /** This method demonstrates how to view the tags for a queue. * @param queueUrl The URL of the queue whose tags you want to list. */ public static void listTags(String queueUrl) { ListQueueTagsResponse response; try { // Call the listQueueTags method with a Consumer<ListQueueTagsRequest.Builder> parameter that creates a ListQueueTagsRequest. response = sqsClient.listQueueTags(b -> b .queueUrl(queueUrl)); } catch (SqsException e) { LOGGER.error("Exception thrown: {}", e.getMessage(), e); throw new RuntimeException(e); } // Log the tags. response.tags() .forEach((k, v) -> LOGGER.info("Key: {} -> Value: {}", k, v)); } /** * This method demonstrates how to remove tags from a queue. * @param queueUrl The URL of the queue whose tags you want to remove. */ public static void removeTags(String queueUrl) { try { // Call the untagQueue method with a Consumer<UntagQueueRequest.Builder> parameter. sqsClient.untagQueue(b -> b .queueUrl(queueUrl) .tagKeys("Accounting ID") // Remove a single tag. ); } catch (SqsException e) { LOGGER.error("Exception thrown: {}", e.getMessage(), e); throw new RuntimeException(e); } } }
-
Para obtener detalles sobre la API, consulte los siguientes temas en la Referencia de la API de AWS SDK for Java 2.x .
-
Ejemplos de tecnología sin servidor
En el siguiente ejemplo de código se muestra cómo implementar una función de Lambda que recibe un evento activado al recibir mensajes de una cola de SQS. La función recupera los mensajes del parámetro de eventos y registra el contenido de cada mensaje.
- SDK para Java 2.x
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos de tecnología sin servidor
. Uso de un evento de SQS con Lambda mediante Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; public class Function implements RequestHandler<SQSEvent, Void> { @Override public Void handleRequest(SQSEvent sqsEvent, Context context) { for (SQSMessage msg : sqsEvent.getRecords()) { processMessage(msg, context); } context.getLogger().log("done"); return null; } private void processMessage(SQSMessage msg, Context context) { try { context.getLogger().log("Processed message " + msg.getBody()); // TODO: Do interesting work based on the new message } catch (Exception e) { context.getLogger().log("An error occurred"); throw e; } } }
En el siguiente ejemplo de código se muestra cómo implementar una respuesta por lotes parcial para funciones de Lambda que reciben eventos de una cola de SQS. La función informa los errores de los elementos del lote en la respuesta y le indica a Lambda que vuelva a intentar esos mensajes más adelante.
- SDK para Java 2.x
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos de tecnología sin servidor
. Notificación de los errores de los elementos del lote de SQS con Lambda mediante Java.
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(message.getMessageId())); } } return new SQSBatchResponse(batchItemFailures); } }