Migración de KCL 1.x a KCL 3.x - Amazon DynamoDB

Migración de KCL 1.x a KCL 3.x

Descripción general

En esta guía se proporcionan instrucciones para migrar la aplicación de consumidor de KCL 1.x a KCL 3.x. Debido a las diferencias de arquitectura entre KCL 1.x y KCL 3.x, la migración requiere actualizar varios componentes para garantizar la compatibilidad.

KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. Debe migrar primero el procesador de registros, el generador de procesadores de registros y las clases de procesos de trabajo al formato compatible con KCL 3.x y, a continuación, seguir los pasos de migración de KCL 1.x a KCL 3.x.

Pasos para realizar la migración

Paso 1: migración del procesador de registros

En el siguiente ejemplo se muestra un procesador de registros implementado para el adaptador de Kinesis de DynamoDB Streams de KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Migración de la clase RecordProcessor
  1. Cambie las interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor y com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware a com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor tal y como se indica a continuación:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Actualice las instrucciones de importación para los métodos initialize y processRecords:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Sustituya el método shutdown por los métodos nuevos siguientes: leaseLost, shardEnded y shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

A continuación, se muestra la versión actualizada de la clase del procesador de registros:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
nota

El adaptador de Kinesis de DynamoDB Streams ahora usa el modelo de registro de SDKv2. En SDKv2, los objetos AttributeValue complejos (BS, NS, M, L y SS) nunca devuelven un valor nulo. Use los métodos hasBs(), hasNs(), hasM(), hasL() y hasSs() para verificar si estos valores existen.

Paso 2: migración del generador de procesadores de registros

El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Migración de RecordProcessorFactory
  • Cambie la interfaz implementada de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory a software.amazon.kinesis.processor.ShardRecordProcessorFactory, tal y como se indica a continuación:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Paso 3: migración del proceso de trabajo

En la versión 3.0 de KCL, una nueva clase, llamada Scheduler, reemplaza la clase Worker. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Para migrar el proceso de trabajo
  1. Cambie la instrucción import para la clase Worker por las instrucciones de importación para las clases Scheduler y ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importe StreamTracker y cambie la importación de StreamsWorkerFactory a StreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. Elija la posición desde la que desea iniciar la aplicación. Puede ser TRIM_HORIZON o LATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Crear una instancia de StreamTracker.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Cree el objeto AmazonDynamoDBStreamsAdapterClient.

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Cree el objeto ConfigsBuilder.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Cree Scheduler tal como se muestra en el ejemplo siguiente:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
importante

La configuración CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X mantiene la compatibilidad entre el adaptador de Kinesis de DynamoDB Streams para KCL v3 y KCL v1, pero no entre KCL v2 y v3.

Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x

Para obtener una descripción detallada de las configuraciones introducidas después de KCL 1.x que son relevantes en KCL 3.x, consulte las configuraciones de KCL y la configuración del cliente de migración de KCL.

Configuraciones con valor predeterminado actualizado en KCL 3.x

billingMode

En la versión 1.x de KCL, el valor predeterminado de billingMode se establece en PROVISIONED. No obstante, con la versión 3.x de KCL, el valor predeterminado de billingMode es PAY_PER_REQUEST (modo bajo demanda). Le recomendamos que utilice el modo de capacidad bajo demanda para la tabla de arrendamiento a fin de ajustar automáticamente la capacidad en función del uso. Para obtener orientación sobre cómo utilizar la capacidad aprovisionada para las tablas de arrendamiento, consulte Best practices for the lease table with provisioned capacity mode.

idleTimeBetweenReadsInMillis

En la versión 1.x de KCL, el valor predeterminado de idleTimeBetweenReadsInMillis se establece en 1000 (o 1 segundo). La versión 3.x de KCL establece el valor predeterminado de idleTimeBetweenReadsInMillis en 1500 (o 1,5 segundos), pero el adaptador de Kinesis de Amazon DynamoDB Streams reemplaza el valor predeterminado por 1000 (o 1 segundo).

Nuevas configuraciones en KCL 3.x

leaseAssignmentIntervalMillis

Esta configuración define el intervalo de tiempo antes de que las particiones recién descubiertas comiencen a procesarse y se calcula como 1,5 × leaseAssignmentIntervalMillis. Si este ajuste no se configura explícitamente, el intervalo de tiempo se establece de forma predeterminada en 1,5 × failoverTimeMillis. El procesamiento de nuevas particiones implica examinar la tabla de arrendamiento y consultar un índice secundario global (GSI) en la tabla de arrendamiento. Al reducir leaseAssignmentIntervalMillis, aumenta la frecuencia de estas operaciones de análisis y consulta, lo que se traduce en mayores costos de DynamoDB. Recomendamos establecer este valor en 2000 para minimizar el retraso en el procesamiento de nuevas particiones.

shardConsumerDispatchPollIntervalMillis

Esta configuración define el intervalo entre sondeos sucesivos por parte del consumidor de particiones para activar las transiciones de estado. En la versión 1.x de KCL, este comportamiento se controlaba mediante el parámetro idleTimeInMillis, que no se exponía como un ajuste configurable. Con la versión 3.x de KCL, recomendamos establecer esta configuración para que coincida con el valor utilizado para idleTimeInMillis en la configuración de la versión 1.x de KCL.

Configuraciones que no anula el adaptador de Kinesis de Amazon DynamoDB Streams

shardSyncIntervalMillis

El adaptador de Kinesis de DynamoDB Streams compatible con la versión 1.x de KCL establece explícitamente shardSyncIntervalMillis en 0. Por el contrario, el adaptador de Kinesis de DynamoDB Streams compatible con la versión 3.x de KCL ya no establece un valor para esta configuración. Para mantener el mismo comportamiento del adaptador que en la versión 1.x, establezca el valor de esta configuración en 0.

leasesRecoveryAuditorExecutionFrequencyMillis

El adaptador de Kinesis de DynamoDB Streams compatible con la versión 1.x de KCL establece explícitamente leasesRecoveryAuditorExecutionFrequencyMillis en 1000. Por el contrario, el adaptador de Kinesis de DynamoDB Streams compatible con la versión 3.x de KCL ya no establece un valor predeterminado para esta configuración. Para mantener el mismo comportamiento del adaptador que en la versión 1.x, establezca el valor de esta configuración en 1000.

Paso 5: migración de KCL 2.x a KCL 3.x

Para garantizar una transición fluida y la compatibilidad con la última versión de la biblioteca de clientes de Kinesis (KCL), siga los pasos del 5 al 8 de las instrucciones de la guía de migración para actualizar de KCL 2.x a KCL 3.x.