Migración de KCL 1.x a KCL 3.x
Descripción general
En esta guía se proporcionan instrucciones para migrar la aplicación de consumidor de KCL 1.x a KCL 3.x. Debido a las diferencias de arquitectura entre KCL 1.x y KCL 3.x, la migración requiere actualizar varios componentes para garantizar la compatibilidad.
KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. KCL 1.x utiliza clases e interfaces diferentes en comparación con KCL 3.x. Debe migrar primero el procesador de registros, el generador de procesadores de registros y las clases de procesos de trabajo al formato compatible con KCL 3.x y, a continuación, seguir los pasos de migración de KCL 1.x a KCL 3.x.
Pasos para realizar la migración
Temas
Paso 1: migración del procesador de registros
En el siguiente ejemplo se muestra un procesador de registros implementado para el adaptador de Kinesis de DynamoDB Streams de KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Migración de la clase RecordProcessor
-
Cambie las interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ycom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
acom.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
tal y como se indica a continuación:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
Actualice las instrucciones de importación para los métodos
initialize
yprocessRecords
:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
Sustituya el método
shutdown
por los métodos nuevos siguientes:leaseLost
,shardEnded
yshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
A continuación, se muestra la versión actualizada de la clase del procesador de registros:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
nota
El adaptador de Kinesis de DynamoDB Streams ahora usa el modelo de registro de SDKv2. En SDKv2, los objetos AttributeValue
complejos (BS
, NS
, M
, L
y SS
) nunca devuelven un valor nulo. Use los métodos hasBs()
, hasNs()
, hasM()
, hasL()
y hasSs()
para verificar si estos valores existen.
Paso 2: migración del generador de procesadores de registros
El generador de procesadores de registros es responsable de la creación de procesadores de registros cuando se adquiere una asignación. A continuación, se muestra un ejemplo de un generador de la versión 1.x de KCL:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Migración de RecordProcessorFactory
-
Cambie la interfaz implementada de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
asoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, tal y como se indica a continuación:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
A continuación, se muestra un ejemplo de generador de procesadores de registros de la versión 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Paso 3: migración del proceso de trabajo
En la versión 3.0 de KCL, una nueva clase, llamada Scheduler, reemplaza la clase Worker. A continuación, se muestra un ejemplo de proceso de trabajo de la versión 1.x de KCL:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Para migrar el proceso de trabajo
-
Cambie la instrucción
import
para la claseWorker
por las instrucciones de importación para las clasesScheduler
yConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Importe
StreamTracker
y cambie la importación deStreamsWorkerFactory
aStreamsSchedulerFactory
.import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
Elija la posición desde la que desea iniciar la aplicación. Puede ser
TRIM_HORIZON
oLATEST
.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
Crear una instancia de
StreamTracker
.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
Cree el objeto
AmazonDynamoDBStreamsAdapterClient
.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
Cree el objeto
ConfigsBuilder
.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
Cree
Scheduler
tal como se muestra en el ejemplo siguiente:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
importante
La configuración CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
mantiene la compatibilidad entre el adaptador de Kinesis de DynamoDB Streams para KCL v3 y KCL v1, pero no entre KCL v2 y v3.
Paso 4: información general y recomendaciones sobre la configuración de KCL 3.x
Para obtener una descripción detallada de las configuraciones introducidas después de KCL 1.x que son relevantes en KCL 3.x, consulte las configuraciones de KCL y la configuración del cliente de migración de KCL.
Configuraciones con valor predeterminado actualizado en KCL 3.x
billingMode
-
En la versión 1.x de KCL, el valor predeterminado de
billingMode
se establece enPROVISIONED
. No obstante, con la versión 3.x de KCL, el valor predeterminado debillingMode
esPAY_PER_REQUEST
(modo bajo demanda). Le recomendamos que utilice el modo de capacidad bajo demanda para la tabla de arrendamiento a fin de ajustar automáticamente la capacidad en función del uso. Para obtener orientación sobre cómo utilizar la capacidad aprovisionada para las tablas de arrendamiento, consulte Best practices for the lease table with provisioned capacity mode. idleTimeBetweenReadsInMillis
-
En la versión 1.x de KCL, el valor predeterminado de
idleTimeBetweenReadsInMillis
se establece en 1000 (o 1 segundo). La versión 3.x de KCL establece el valor predeterminado de idleTimeBetweenReadsInMillis
en 1500 (o 1,5 segundos), pero el adaptador de Kinesis de Amazon DynamoDB Streams reemplaza el valor predeterminado por 1000 (o 1 segundo).
Nuevas configuraciones en KCL 3.x
leaseAssignmentIntervalMillis
-
Esta configuración define el intervalo de tiempo antes de que las particiones recién descubiertas comiencen a procesarse y se calcula como 1,5 ×
leaseAssignmentIntervalMillis
. Si este ajuste no se configura explícitamente, el intervalo de tiempo se establece de forma predeterminada en 1,5 ×failoverTimeMillis
. El procesamiento de nuevas particiones implica examinar la tabla de arrendamiento y consultar un índice secundario global (GSI) en la tabla de arrendamiento. Al reducirleaseAssignmentIntervalMillis
, aumenta la frecuencia de estas operaciones de análisis y consulta, lo que se traduce en mayores costos de DynamoDB. Recomendamos establecer este valor en 2000 para minimizar el retraso en el procesamiento de nuevas particiones. shardConsumerDispatchPollIntervalMillis
-
Esta configuración define el intervalo entre sondeos sucesivos por parte del consumidor de particiones para activar las transiciones de estado. En la versión 1.x de KCL, este comportamiento se controlaba mediante el parámetro
idleTimeInMillis
, que no se exponía como un ajuste configurable. Con la versión 3.x de KCL, recomendamos establecer esta configuración para que coincida con el valor utilizado paraidleTimeInMillis
en la configuración de la versión 1.x de KCL.
Configuraciones que no anula el adaptador de Kinesis de Amazon DynamoDB Streams
shardSyncIntervalMillis
-
El adaptador de Kinesis de DynamoDB Streams compatible con la versión 1.x de KCL establece explícitamente
shardSyncIntervalMillis
en 0. Por el contrario, el adaptador de Kinesis de DynamoDB Streams compatible con la versión 3.x de KCL ya no establece un valor para esta configuración. Para mantener el mismo comportamiento del adaptador que en la versión 1.x, establezca el valor de esta configuración en 0. leasesRecoveryAuditorExecutionFrequencyMillis
-
El adaptador de Kinesis de DynamoDB Streams compatible con la versión 1.x de KCL establece explícitamente
leasesRecoveryAuditorExecutionFrequencyMillis
en 1000. Por el contrario, el adaptador de Kinesis de DynamoDB Streams compatible con la versión 3.x de KCL ya no establece un valor predeterminado para esta configuración. Para mantener el mismo comportamiento del adaptador que en la versión 1.x, establezca el valor de esta configuración en 1000.
Paso 5: migración de KCL 2.x a KCL 3.x
Para garantizar una transición fluida y la compatibilidad con la última versión de la biblioteca de clientes de Kinesis (KCL), siga los pasos del 5 al 8 de las instrucciones de la guía de migración para actualizar de KCL 2.x a KCL 3.x.