Migrazione da KCL 1.x a KCL 3.x - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione da KCL 1.x a KCL 3.x

Panoramica

Questa guida fornisce istruzioni per la migrazione dell'applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze architettoniche tra KCL 1.x e KCL 3.x, la migrazione richiede l'aggiornamento di diversi componenti per garantire la compatibilità.

KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare il record processor, il record processor factory e le classi worker al formato compatibile con KCL 3.x e seguire i passaggi di migrazione per la migrazione da KCL 1.x a KCL 3.x.

Fasi della migrazione

Fase 1: Migrazione del processore di registrazione

L'esempio seguente mostra un processore di registrazione implementato per l'adattatore KCL 1.x DynamoDB Streams Kinesis:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe RecordProcessor
  1. Modificate le interfacce da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware verso com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor come segue:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Aggiorna le istruzioni di importazione per i processRecords metodi initialize and:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Sostituisci il metodo shutdown con i seguenti nuovi metodi: leaseLost, shardEnded e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Di seguito è riportata la versione aggiornata della classe dei processori di registrazione:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Nota

L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, AttributeValue gli oggetti complessi (,BS,, NS ML,SS) non restituiscono mai null. Utilizzate i hasSs() metodi hasBs() hasNs()hasM(),hasL(),,, per verificare se questi valori esistono.

Fase 2: Migrazione della fabbrica del processore di registrazione

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è riportato un esempio di fabbrica KCL 1.x:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Per migrare il RecordProcessorFactory
  • Modificare l'interfaccia implementata da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory asoftware.amazon.kinesis.processor.ShardRecordProcessorFactory, come segue:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Di seguito è riportato un esempio della fabbrica di processori di registrazione nella versione 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Fase 3: Migrazione del lavoratore

Nella versione 3.0 di KCL, una nuova classe, chiamata Scheduler, sostituisce la classe Worker. Di seguito è riportato un esempio di worker KCL 1.x:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Per migrare il lavoratore
  1. Modifica la dichiarazione import per la classe Worker nelle dichiarazioni di importazione delle classi Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importa StreamTracker e modifica l'importazione di StreamsWorkerFactory to. StreamsSchedulerFactory

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. Scegli la posizione da cui avviare l'applicazione. Può essere TRIM_HORIZON oLATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Crea un'StreamTrackeristanza.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Create l'AmazonDynamoDBStreamsAdapterClientoggetto.

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Create l'ConfigsBuilderoggetto.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Create il file Scheduler come mostrato nell'esempio seguente:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Importante

L'CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2Ximpostazione mantiene la compatibilità tra DynamoDB Streams Kinesis Adapter for KCL v3 e KCL v1, non tra KCL v2 e v3.

Fase 4: Panoramica e consigli sulla configurazione di KCL 3.x

Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta Configurazioni KCL e configurazione del client di migrazione KCL.

Configurazioni con valore predefinito di aggiornamento in KCL 3.x

billingMode

Nella versione KCL 1.x, il valore predefinito per è impostato su. billingMode PROVISIONED Tuttavia, con la versione 3.x di KCL, l'impostazione predefinita billingMode è PAY_PER_REQUEST (modalità su richiesta). Ti consigliamo di utilizzare la modalità di capacità su richiesta per la tua tabella di leasing per regolare automaticamente la capacità in base all'utilizzo. Per indicazioni sull'utilizzo della capacità assegnata per le tabelle di leasing, consulta Best practice per la tabella di leasing con modalità di capacità assegnata.

idleTimeBetweenReadsInMillis

Nella versione KCL 1.x, il valore predefinito per idleTimeBetweenReadsInMillis è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinito dleTimeBetweenReadsInMillis per i su 1.500 (o 1,5 secondi), ma Amazon DynamoDB Streams Kinesis Adapter sostituisce il valore predefinito su 1.000 (o 1 secondo).

Nuove configurazioni in KCL 3.x

leaseAssignmentIntervalMillis

Questa configurazione definisce l'intervallo di tempo prima che i frammenti appena scoperti inizino l'elaborazione e viene calcolata come 1,5 ×. leaseAssignmentIntervalMillis Se questa impostazione non è configurata in modo esplicito, l'intervallo di tempo predefinito è 1,5 ×. failoverTimeMillis L'elaborazione di nuovi shard prevede la scansione della tabella di leasing e l'interrogazione di un indice secondario globale (GSI) sulla tabella di leasing. La riduzione di leaseAssignmentIntervalMillis aumenta la frequenza di queste operazioni di scansione e interrogazione, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 per ridurre al minimo il ritardo nell'elaborazione di nuovi shard.

shardConsumerDispatchPollIntervalMillis

Questa configurazione definisce l'intervallo tra i sondaggi successivi da parte del consumatore dello shard per attivare le transizioni di stato. Nella versione KCL 1.x, questo comportamento era controllato dal idleTimeInMillis parametro, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, consigliamo di impostare questa configurazione in modo che corrisponda al valore utilizzato idleTimeInMillis nella configurazione della versione 1.x di KCL.

Configurazioni non sovrascritte da Amazon DynamoDB Streams Kinesis Adapter

shardSyncIntervalMillis

L'adattatore Kinesis DynamoDB Streams compatibile con la versione 1.x di KCL viene impostato esplicitamente su 0. shardSyncIntervalMillis In confronto, il DynamoDB Streams Kinesis Adapter compatibile con la versione 3.x di KCL non imposta più un valore per questa configurazione. Per mantenere lo stesso comportamento dell'adattatore della versione 1.x, imposta il valore di questa configurazione su 0.

leasesRecoveryAuditorExecutionFrequencyMillis

L'adattatore Kinesis DynamoDB Streams compatibile con la versione 1.x di KCL è impostato esplicitamente su 1000. leasesRecoveryAuditorExecutionFrequencyMillis In confronto, il DynamoDB Streams Kinesis Adapter compatibile con la versione 3.x di KCL non imposta più un valore predefinito per questa configurazione. Per mantenere lo stesso comportamento dell'adattatore della versione 1.x, imposta il valore di questa configurazione su 1000.

Fase 5: Migrazione da KCL 2.x a KCL 3.x

Per garantire una transizione e una compatibilità fluide con l'ultima versione di Kinesis Client Library (KCL), segui i passaggi 5-8 nelle istruzioni della guida alla migrazione per l'aggiornamento da KCL 2.x a KCL 3.x.