Usar o agrupamento em lotes automático de solicitações para Amazon SQS com o AWS SDK for Java 2.x
A API de agrupamento em lotes automático de solicitações para Amazon SQS é uma biblioteca de alto nível que fornece uma maneira eficiente de agrupar e armazenar solicitações em buffer para operações do SQS. Ao usar a API de agrupamento em lotes, você reduz o número de solicitações para o SQS, o que melhora o throughput e minimiza os custos.
Como os métodos da API em lote correspondem aos métodos SqsAsyncClient: sendMessage, changeMessageVisibility, deleteMessage, receiveMessage, você pode usar a API em lote como uma substituição imediata com o mínimo de alterações.
Este tópico fornece uma visão geral de como configurar e trabalhar com a API de agrupamento em lotes automático de solicitações para o Amazon SQS.
Confira os pré-requisitos
Você precisa usar a versão 2.28.0 ou posterior do SDK para Java 2.x para ter acesso à API de agrupamento em lotes. O pom.xml do Maven deve conter pelo menos os elementos a seguir.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.231</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>
Criar um gerenciador de lotes
A API de agrupamento em lotes automático de solicitações é implementada pela interface SqsAsyncBatchManager
Configuração padrão usando SqsAsyncClient
A maneira mais simples de criar um gerenciador de lotes é chamar o método de fábrica batchManager em uma instância SqsAsyncClient
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
Quando você usa essa abordagem, a instância SqsAsyncBatchManager usa os valores padrão que são mostrados na tabela na seção Substituir configurações para SqsAsyncBatchManager. Além disso, a instância SqsAsyncBatchManager usa o ExecutorService da instância SqsAsyncClient da qual foi criada.
Configuração personalizada usando SqsAsyncBatchManager.Builder
Para casos de uso mais avançados, você pode personalizar o gerenciador de lotes usando o SqsAsyncBatchManager.BuilderSqsAsyncBatchManager, você pode ajustar o comportamento do agrupamento em lotes. O trecho a seguir mostra um exemplo de como usar o compilador para personalizar o comportamento em lote.
SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();
Ao usar essa abordagem, é possível ajustar as configurações do objeto BatchOverrideConfiguration que são mostradas na tabela na seção Substituir configurações para SqsAsyncBatchManager. Você também pode fornecer uma personalização ScheduledExecutorService
Enviar mensagens
Para enviar mensagens com o gerenciador de lotes, use o método SqsAsyncBatchManager#sendMessage. O SDK armazena as solicitações em buffer e as envia em lote quando os valores maxBatchSize ou sendRequestFrequency são atingidos.
O exemplo a seguir mostra uma solicitação sendMessage imediatamente seguida por outra solicitação. Nesse caso, o SDK envia as duas mensagens em um único lote.
// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();
Alterar o tempo limite de visibilidade de mensagens
É possível alterar o tempo limite de visibilidade das mensagens em um lote usando o método SqsAsyncBatchManager#changeMessageVisibilitymaxBatchSize ou sendRequestFrequency são atingidos.
O exemplo a seguir mostra como chamar o método changeMessageVisibility.
CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();
Exclua mensagens
É possível excluir mensagens em um lote usando o método SqsAsyncBatchManager#deleteMessagemaxBatchSize ou sendRequestFrequency são atingidos.
O exemplo a seguir mostra como é possível chamar o método deleteMessage.
CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();
Receber mensagens
Usar configurações padrão
Quando você pesquisa o método SqsAsyncBatchManager#receiveMessage na aplicação, o gerenciador de lotes busca mensagens do buffer interno, que o SDK atualiza automaticamente em segundo plano.
O exemplo a seguir mostra como chamar o método receiveMessage.
CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));
Usar configurações personalizadas
Se quiser personalizar ainda mais a solicitação, por exemplo, definindo tempos de espera personalizados e especificando o número de mensagens a serem recuperadas, você poderá personalizar a solicitação conforme mostrado no exemplo a seguir.
CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
nota
Se você chamar receiveMessage com um ReceiveMessageRequest que inclua qualquer um dos parâmetros a seguir, o SDK ignorará o gerenciador de lotes e enviará uma solicitação receiveMessage assíncrona regular:
-
messageAttributeNames -
messageSystemAttributeNames -
messageSystemAttributeNamesWithStrings -
overrideConfiguration
Substituir configurações para SqsAsyncBatchManager
É possível ajustar as seguintes configurações ao criar uma instância SqsAsyncBatchManager. A lista de configurações a seguir está disponível no BatchOverrideConfiguration.Builder
| Configuração | Descrição | Valor padrão |
|---|---|---|
maxBatchSize |
Número máximo de solicitações por lote para cada SendMessageBatchRequest, ChangeMessageVisibilityBatchRequest ou DeleteMessageBatchRequest. O valor máximo é 10. |
10 |
sendRequestFrequency |
Tempo antes do envio de um lote, a menos que |
200 ms |
receiveMessageVisibilityTimeout |
Tempo limite de visibilidade para mensagens. Se não especificado, o padrão da fila será usado. | Padrão da fila |
receiveMessageMinWaitDuration |
Tempo mínimo de espera para solicitações receiveMessage. Evite definir como 0 para evitar o desperdício da CPU. |
50 ms |
receiveMessageSystemAttributeNames |
Lista de nomes de atributos do sistemareceiveMessage. |
Nenhum |
receiveMessageAttributeNames |
Lista de nomes de atributos para solicitar chamadas receiveMessage. |
Nenhum |