Cambios en el procesamiento automático de solicitudes por lotes de Amazon SQS de la versión 1 a la 2 - AWS SDK for Java 2.x

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cambios en el procesamiento automático de solicitudes por lotes de Amazon SQS de la versión 1 a la 2

En este tema se detallan los cambios en el procesamiento automático de solicitudes por lotes para Amazon SQS entre la versión 1 y la 2 de AWS SDK for Java.

Cambios de alto nivel

La versión AWS SDK for Java 1.x realiza almacenamiento en búfer del cliente mediante una clase AmazonSQSBufferedAsyncClient independiente que requiere inicialización explícita para el procesamiento de solicitudes por lotes.

AWS SDK for Java 2.x simplifica y mejora la funcionalidad de almacenamiento en búfer con SqsAsyncBatchManager. La implementación de esta interfaz proporciona capacidades automáticas de procesamiento de solicitudes por lotes integradas directamente con el SqsAsyncClient estándar. Para obtener más información sobre el SqsAsyncBatchManager de la v2, consulte el tema Uso de procesamiento automático de solicitudes por lotes para Amazon SQS con AWS SDK for Java 2.x de esta guía.

Cambio v1 v2

dependencias Maven

<dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-bom</artifactId> <version>1.12.7821</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sqs</artifactId> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bom</artifactId> <version>2.31.152</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sqs</artifactId> </dependency> </dependencies>
Nombres de paquetes com.amazonaws.services.sqs.buffered software.amazon.awssdk.services.sqs.batchmanager
Nombres de clase

AmazonSQSBufferedAsyncClient

SqsAsyncBatchManager

1 Última versión. 2 Última versión.

Uso del procesamiento automático de solicitudes por lotes de SQS

Cambio v1 v2
Creación de un administrador de lotes
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync);
SqsAsyncClient asyncClient = SqsAsyncClient.create(); SqsAsyncBatchManager sqsAsyncBatchManager = asyncClient.batchManager();
Creación de un administrador de lotes con configuración personalizada
AmazonSQSAsync sqsAsync = new AmazonSQSAsyncClient(); QueueBufferConfig queueBufferConfig = new QueueBufferConfig() .withMaxBatchOpenMs(200) .withMaxBatchSize(10) .withMinReceiveWaitTimeMs(1000) .withVisibilityTimeoutSeconds(20) .withReceiveMessageAttributeNames(messageAttributeValues); AmazonSQSAsync bufferedSqs = new AmazonSQSBufferedAsyncClient(sqsAsync, queueBufferConfig);
BatchOverrideConfiguration batchOverrideConfiguration = BatchOverrideConfiguration.builder() .sendRequestFrequency(Duration.ofMillis(200)) .maxBatchSize(10) .receiveMessageMinWaitDuration(Duration.ofMillis(1000)) .receiveMessageVisibilityTimeout(Duration.ofSeconds(20)) .receiveMessageSystemAttributeNames(messageSystemAttributeNames) .receiveMessageAttributeNames(messageAttributeValues) .build(); SqsAsyncBatchManager sqsAsyncBatchManager = SqsAsyncBatchManager.builder() .overrideConfiguration(batchOverrideConfiguration) .client(SqsAsyncClient.create()) .scheduledExecutor(Executors.newScheduledThreadPool(8)) .build();
Envío de mensajes
Future<SendMessageResult> sendResultFuture = bufferedSqs.sendMessageAsync(new SendMessageRequest() .withQueueUrl(queueUrl) .withMessageBody(body));
CompletableFuture<SendMessageResponse> sendCompletableFuture = sqsAsyncBatchManager.sendMessage( SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(body) .build());
Cómo eliminar mensajes
Future<DeleteMessageResult> deletResultFuture = bufferedSqs.deleteMessageAsync(new DeleteMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<DeleteMessageResponse> deleteResultCompletableFuture = sqsAsyncBatchManager.deleteMessage( DeleteMessageRequest.builder() .queueUrl(queueUrl) .build());
Cambio de visibilidad de los mensajes
Future<ChangeMessageVisibilityResult> changeVisibilityResultFuture = bufferedSqs.changeMessageVisibilityAsync (new ChangeMessageVisibilityRequest() .withQueueUrl(queueUrl) .withVisibilityTimeout(20));
CompletableFuture<ChangeMessageVisibilityResponse> changeResponseCompletableFuture = sqsAsyncBatchManager.changeMessageVisibility( ChangeMessageVisibilityRequest.builder() .queueUrl(queueUrl) .visibilityTimeout(20) .build());
Recepción de mensajes
ReceiveMessageResult receiveResult = bufferedSqs.receiveMessage( new ReceiveMessageRequest() .withQueueUrl(queueUrl));
CompletableFuture<ReceiveMessageResponse> responseCompletableFuture = sqsAsyncBatchManager.receiveMessage( ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build());

Diferencias entre tipos de devolución asincrónica

Cambio v1 v2
Tipo de retorno Future<ResultType> CompletableFuture<ResponseType>
Mecanismo de devolución de llamada Requiere un AsyncHandler con métodos onSuccess y onError separados Utiliza las API CompletableFuture proporcionadas por el JDK, como whenComplete(), thenCompose(), thenApply()
Tratamiento de excepciones Utiliza el método AsyncHandler#onError() Utiliza las API CompletableFuture proporcionadas por el JDK, como exceptionally(), handle(), whenComplete()
Cancelación Compatibilidad básica a través de Future.cancel() Al cancelar un CompletableFuture principal se cancelan automáticamente todos los futuros dependientes de la cadena

Diferencias en el tratamiento de finalización asincrónica

Cambio v1 v2
Implementación del controlador de respuestas
Future<ReceiveMessageResult> future = bufferedSqs.receiveMessageAsync( receiveRequest, new AsyncHandler<ReceiveMessageRequest, ReceiveMessageResult>() { @Override public void onSuccess(ReceiveMessageRequest request, ReceiveMessageResult result) { List<Message> messages = result.getMessages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.getMessageId()); System.out.println("Body: " + message.getBody()); } } @Override public void onError(Exception e) { System.err.println("Error receiving messages: " + e.getMessage()); e.printStackTrace(); } } );
CompletableFuture<ReceiveMessageResponse> completableFuture = sqsAsyncBatchManager .receiveMessage(ReceiveMessageRequest.builder() .queueUrl(queueUrl).build()) .whenComplete((receiveMessageResponse, throwable) -> { if (throwable != null) { System.err.println("Error receiving messages: " + throwable.getMessage()); throwable.printStackTrace(); } else { List<Message> messages = receiveMessageResponse.messages(); System.out.println("Received " + messages.size() + " messages"); for (Message message : messages) { System.out.println("Message ID: " + message.messageId()); System.out.println("Body: " + message.body()); } } });

Parámetros principales de configuración

Parámetro v1 v2
Tamaño máximo de lote maxBatchSize (de forma predeterminada, 10 solicitudes por lote) maxBatchSize (de forma predeterminada, 10 solicitudes por lote)
Tiempo de espera por lotes maxBatchOpenMs (de forma predeterminada, 200 ms) sendRequestFrequency (de forma predeterminada, 200 ms)
Tiempo de espera de visibilidad visibilityTimeoutSeconds (-1 para cola de forma predeterminada) receiveMessageVisibilityTimeout (cola de forma predeterminada)
Tiempo de espera mínimo longPollWaitTimeoutSeconds (20 s cuando longPoll es True) receiveMessageMinWaitDuration (de forma predeterminada, 50 ms)
Atributos de mensajes Configure usando ReceiveMessageRequest receiveMessageAttributeNames(ninguno de forma predeterminada)
Atributos del sistema Configure usando ReceiveMessageRequest receiveMessageSystemAttributeNames(ninguno de forma predeterminada)
Sondeo largo longPoll (el valor predeterminado es True) No se admite para evitar que las conexiones abiertas esperen hasta que el servidor envíe los mensajes
Tiempo de espera máximo para sondeos prolongados longPollWaitTimeoutSeconds (de forma predeterminada, 20 s) No se admite para evitar que las conexiones abiertas esperen hasta que el servidor envíe los mensajes
Número máximo de lotes de recepción de captura previa almacenados en el cliente maxDoneReceiveBatches (10 lotes) No se admite porque se trata internamente
Número máximo de lotes de salida activos procesados de forma simultánea maxInflightOutboundBatches (de forma predeterminada, 5 lotes) No se admite porque se trata internamente
Número máximo de lotes de recepción activos procesados de forma simultánea maxInflightReceiveBatches (de forma predeterminada, 10 lotes) No se admite porque se trata internamente