Uso de procesamiento automático de solicitudes por lotes para Amazon SQS con AWS SDK for Java 2.x - AWS SDK for Java 2.x

Uso de procesamiento automático de solicitudes por lotes para Amazon SQS con AWS SDK for Java 2.x

La API Procesamiento automático de solicitudes por lotes para Amazon SQS es una biblioteca de alto nivel que proporciona una forma eficaz de procesar por lotes y almacenar en búfer las solicitudes de operaciones de SQS. Al utilizar la API de procesamiento por lotes, reduce el número de solicitudes a SQS, lo que mejora el rendimiento y minimiza los costos.

Dado que los métodos de la API por lotes coinciden con los métodos de SqsAsyncClient (sendMessage, changeMessageVisibility, deleteMessage, receiveMessage), puede utilizar la API por lotes como sustitución directa con cambios mínimos.

En este tema se ofrece información general sobre cómo configurar y trabajar con la API Procesamiento automático de solicitudes por lotes para Amazon SQS.

Requisitos previos

Debe usar la versión 2.28.0 o posterior del SDK para Java 2.x para tener acceso a la API de procesamiento por lotes. Su pom.xml de Maven debe contener al menos los siguientes elementos.

<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.28.231</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>

1 Última versión

Creación de un administrador de lotes

La API de procesamiento automático de solicitudes por lotes se implementa mediante la interfaz SqsAsyncBatchManager. Puede crear una instancia del administrador de dos formas.

Configuración predeterminada mediante SqsAsyncClient

La forma más sencilla de crear un administrador de lotes es llamar al método de fábrica batchManager en una instancia de SqsAsyncClient existente. Este método se muestra en el fragmento de código siguiente.

SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();

Al usar este método, la instancia SqsAsyncBatchManager usa los valores predeterminados que se muestran en la tabla de la sección Anulación de los ajustes de configuración de SqsAsyncBatchManager. Además, la instancia SqsAsyncBatchManager usa el ExecutorService de la instancia de SqsAsyncClient desde la que se creó.

Configuración personalizada mediante SqsAsyncBatchManager.Builder

Para casos de uso más avanzados, puede personalizar el administrador de lotes mediante SqsAsyncBatchManager.Builder. Si usa este método para crear una instancia de SqsAsyncBatchManager, puede afinar el comportamiento del procesamiento por lotes. El siguiente fragmento de código ofrece un ejemplo de cómo utilizar el compilador para personalizar el comportamiento de procesamiento por lotes.

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(5)) .overrideConfiguration(b -> b .receiveMessageMinWaitDuration(Duration.ofSeconds(10)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(1)) .receiveMessageAttributeNames(Collections.singletonList("*")) .receiveMessageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.ALL))) .build();

Si utiliza este método, puede ajustar la configuración del objeto BatchOverrideConfiguration que se muestra en la tabla de la sección Anulación de los ajustes de configuración de SqsAsyncBatchManager. También puede proporcionar un ScheduledExecutorService personalizado para el administrador de lotes mediante este método.

Envío de mensajes

Para enviar mensajes con el administrador de lotes, utilice el método SqsAsyncBatchManager#sendMessage. El SDK almacena en búfer las solicitudes y las envía como un lote cuando se alcanzan los valores maxBatchSize o sendRequestFrequency.

En el siguiente ejemplo, se muestra una solicitud sendMessage seguida inmediatamente de otra solicitud. En este caso, el SDK envía ambos mensajes en un único lote.

// Sending the first message CompletableFuture<SendMessageResponse> futureOne = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("One").queueUrl("queue")); // Sending the second message CompletableFuture<SendMessageResponse> futureTwo = sqsAsyncBatchManager.sendMessage(r -> r.messageBody("Two").queueUrl("queue")); // Waiting for both futures to complete and retrieving the responses SendMessageResponse messageOne = futureOne.join(); SendMessageResponse messageTwo = futureTwo.join();

Cambio del tiempo de espera de visibilidad de mensajes

Puede cambiar el tiempo de espera de visibilidad de los mensajes de un lote utilizando el método SqsAsyncBatchManager#changeMessageVisibility. El SDK almacena en búfer las solicitudes y las envía como un lote cuando se alcanzan los valores maxBatchSize o sendRequestFrequency.

En el siguiente ejemplo, se muestra cómo llamar al método changeMessageVisibility.

CompletableFuture<ChangeMessageVisibilityResponse> futureOne = sqsAsyncBatchManager.changeMessageVisibility(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); ChangeMessageVisibilityResponse response = futureOne.join();

Cómo eliminar mensajes

Puede eliminar los mensajes de un lote con el método SqsAsyncBatchManager#deleteMessage. El SDK almacena en búfer las solicitudes y las envía como un lote cuando se alcanzan los valores maxBatchSize o sendRequestFrequency.

En el siguiente ejemplo se muestra cómo llamar al método deleteMessage.

CompletableFuture<DeleteMessageResponse> futureOne = sqsAsyncBatchManager.deleteMessage(r -> r.receiptHandle("receiptHandle") .queueUrl("queue")); DeleteMessageResponse response = futureOne.join();

Recepción de mensajes

Uso de la configuración predeterminada

Al sondear el método SqsAsyncBatchManager#receiveMessage en la aplicación, el administrador de lotes extrae los mensajes de su búfer interno, que el SDK actualiza automáticamente en segundo plano.

En el siguiente ejemplo, se muestra cómo llamar al método receiveMessage.

CompletableFuture<ReceiveMessageResponse> responseFuture = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl"));

Uso de la configuración personalizada

Si quiere personalizar aún más la solicitud, por ejemplo, configurando tiempos de espera personalizados y especificando el número de mensajes que se van a recuperar, puede personalizar la solicitud como se muestra en el siguiente ejemplo.

CompletableFuture<ReceiveMessageResponse> response = sqsAsyncBatchManager.receiveMessage(r -> r.queueUrl("queueUrl") .waitTimeSeconds(5) .visibilityTimeout(20));
nota

Si llama a receiveMessage con un parámetro ReceiveMessageRequest que incluya alguno de los siguientes parámetros, el SDK omite el administrador de lotes y envía una solicitud asincrónica receiveMessage normal:

  • messageAttributeNames

  • messageSystemAttributeNames

  • messageSystemAttributeNamesWithStrings

  • overrideConfiguration

Anulación de los ajustes de configuración de SqsAsyncBatchManager

Puede ajustar la configuración siguiente al crear una instancia SqsAsyncBatchManager. La siguiente lista de ajustes está disponible en BatchOverrideConfiguration.Builder.

Ajuste Descripción Valor predeterminado
maxBatchSize Número máximo de solicitudes por lote para cada SendMessageBatchRequest, ChangeMessageVisibilityBatchRequest o DeleteMessageBatchRequest. El valor máximo es 10. 10
sendRequestFrequency

Tiempo antes de enviar un lote, a menos que se alcance antes maxBatchSize. Los valores más altos pueden reducir las solicitudes pero aumentar la latencia.

200 ms
receiveMessageVisibilityTimeout Tiempo de espera de visibilidad para los mensajes. Si no se especifica, se utiliza la configuración predeterminada de la cola. Configuración predeterminada de la cola
receiveMessageMinWaitDuration Tiempo mínimo de espera para solicitudes receiveMessage. Evite establecer el valor en 0 para evitar un uso excesivo de la CPU. 50 ms
receiveMessageSystemAttributeNames Lista de nombres de atributos del sistema para solicitar para llamadas receiveMessage. Ninguno
receiveMessageAttributeNames Lista de nombres de atributos para solicitar para llamadas receiveMessage. Ninguno