Usar o agrupamento em lotes automático de solicitações para Amazon SQS com o AWS SDK for Java 2.x - AWS SDK for Java 2.x

Usar o agrupamento em lotes automático de solicitações para Amazon SQS com o AWS SDK for Java 2.x

A API de agrupamento em lotes automático de solicitações para Amazon SQS é uma biblioteca de alto nível que fornece uma maneira eficiente de agrupar e armazenar solicitações em buffer para operações do SQS. Ao usar a API de agrupamento em lotes, você reduz o número de solicitações para o SQS, o que melhora o throughput e minimiza os custos.

Como os métodos da API em lote correspondem aos métodos SqsAsyncClient: sendMessage, changeMessageVisibility, deleteMessage, receiveMessage, você pode usar a API em lote como uma substituição imediata com o mínimo de alterações.

Este tópico fornece uma visão geral de como configurar e trabalhar com a API de agrupamento em lotes automático de solicitações para o Amazon SQS.

Confira os pré-requisitos

Você precisa usar a versão 2.28.0 ou posterior do SDK para Java 2.x para ter acesso à API de agrupamento em lotes. O pom.xml do Maven deve conter pelo menos os elementos a seguir.

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 Versão mais recente

Criar um gerenciador de lotes

A API de agrupamento em lotes automático de solicitações é implementada pela interface SqsAsyncBatchManager. É possível criar uma instância do gerente de duas maneiras.

Configuração padrão usando SqsAsyncClient

A maneira mais simples de criar um gerenciador de lotes é chamar o método de fábrica batchManager em uma instância SqsAsyncClient existente. A abordagem simples é mostrada no trecho a seguir.

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

Quando você usa essa abordagem, a instância SqsAsyncBatchManager usa os valores padrão que são mostrados na tabela na seção Substituir configurações para SqsAsyncBatchManager. Além disso, a instância SqsAsyncBatchManager usa o ExecutorService da instância SqsAsyncClient da qual foi criada.

Configuração personalizada usando SqsAsyncBatchManager.Builder

Para casos de uso mais avançados, você pode personalizar o gerenciador de lotes usando o SqsAsyncBatchManager.Builder. Ao usar essa abordagem para criar uma instância SqsAsyncBatchManager, você pode ajustar o comportamento do agrupamento em lotes. O trecho a seguir mostra um exemplo de como usar o compilador para personalizar o comportamento em lote.

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

Ao usar essa abordagem, é possível ajustar as configurações do objeto BatchOverrideConfiguration que são mostradas na tabela na seção Substituir configurações para SqsAsyncBatchManager. Você também pode fornecer uma personalização ScheduledExecutorService para o gerenciador de lotes usando essa abordagem.

Enviar mensagens

Para enviar mensagens com o gerenciador de lotes, use o método SqsAsyncBatchManager#sendMessage. O SDK armazena as solicitações em buffer e as envia em lote quando os valores maxBatchSize ou sendRequestFrequency são atingidos.

O exemplo a seguir mostra uma solicitação sendMessage imediatamente seguida por outra solicitação. Nesse caso, o SDK envia as duas mensagens em um único lote.

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

Alterar o tempo limite de visibilidade de mensagens

É possível alterar o tempo limite de visibilidade das mensagens em um lote usando o método SqsAsyncBatchManager#changeMessageVisibility. O SDK armazena as solicitações em buffer e as envia em lote quando os valores maxBatchSize ou sendRequestFrequency são atingidos.

O exemplo a seguir mostra como chamar o método changeMessageVisibility.

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

Exclua mensagens

É possível excluir mensagens em um lote usando o método SqsAsyncBatchManager#deleteMessage. O SDK armazena as solicitações em buffer e as envia em lote quando os valores maxBatchSize ou sendRequestFrequency são atingidos.

O exemplo a seguir mostra como é possível chamar o método deleteMessage.

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

Receber mensagens

Usar configurações padrão

Quando você pesquisa o método SqsAsyncBatchManager#receiveMessage na aplicação, o gerenciador de lotes busca mensagens do buffer interno, que o SDK atualiza automaticamente em segundo plano.

O exemplo a seguir mostra como chamar o método receiveMessage.

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

Usar configurações personalizadas

Se quiser personalizar ainda mais a solicitação, por exemplo, definindo tempos de espera personalizados e especificando o número de mensagens a serem recuperadas, você poderá personalizar a solicitação conforme mostrado no exemplo a seguir.

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
nota

Se você chamar receiveMessage com um ReceiveMessageRequest que inclua qualquer um dos parâmetros a seguir, o SDK ignorará o gerenciador de lotes e enviará uma solicitação receiveMessage assíncrona regular:

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

Substituir configurações para SqsAsyncBatchManager

É possível ajustar as seguintes configurações ao criar uma instância SqsAsyncBatchManager. A lista de configurações a seguir está disponível no BatchOverrideConfiguration.Builder.

Configuração Descrição Valor padrão
maxBatchSize Número máximo de solicitações por lote para cada SendMessageBatchRequest, ChangeMessageVisibilityBatchRequest ou DeleteMessageBatchRequest. O valor máximo é 10. 10
sendRequestFrequency

Tempo antes do envio de um lote, a menos que maxBatchSize seja atingido antes. Valores mais altos podem reduzir as solicitações, mas aumentar a latência.

200 ms
receiveMessageVisibilityTimeout Tempo limite de visibilidade para mensagens. Se não especificado, o padrão da fila será usado. Padrão da fila
receiveMessageMinWaitDuration Tempo mínimo de espera para solicitações receiveMessage. Evite definir como 0 para evitar o desperdício da CPU. 50 ms
receiveMessageSystemAttributeNames Lista de nomes de atributos do sistema para solicitar chamadas receiveMessage. Nenhum
receiveMessageAttributeNames Lista de nomes de atributos para solicitar chamadas receiveMessage. Nenhum