Migrazione di KLC da 1.x a 3.x - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione di KLC da 1.x a 3.x

Panoramica

Questa guida fornisce istruzioni per la migrazione dell’applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze di architettura tra KCL 1.x e KCL 3.x, la migrazione richiede l’aggiornamento di diversi componenti per garantire la compatibilità.

KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare l’elaboratore di record, il generatore dell’elaboratore di record e le classi di lavoratori al formato compatibile con KCL 3.x e seguire la procedura di migrazione per la migrazione da KCL 1.x a KCL 3.x.

Fasi della migrazione

Fase 1: migrare l’elaboratore di record

L’esempio seguente mostra un elaboratore di record implementato per l’Adattatore Kinesis per i flussi DynamoDB con KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe RecordProcessor
  1. Modifica le interfacce da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware verso software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor come segue:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Aggiorna le istruzioni di importazione per i metodi initialize e processRecords:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Sostituisci il metodo shutdownRequested con i seguenti nuovi metodi: leaseLost, shardEnded e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Segue la versione aggiornata della classe dell’elaboratore di record:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Nota

L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, AttributeValue gli oggetti complessi (,BS,, NS ML,SS) non restituiscono mai null. Utilizza i metodi hasBs(), hasNs(), hasM(), hasL(), hasSs() per verificare se questi valori esistono.

Fase 2: migrare il generatore dell’elaboratore di record

La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è illustrato un esempio di un generatore di KCL 1.x:

package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Per migrare RecordProcessorFactory
  • Modifica l’interfaccia implementata da com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory a software.amazon.kinesis.processor.ShardRecordProcessorFactory, come segue:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Di seguito è riportato un esempio di generatore di elaboratore di record in 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Fase 3: eseguire la migrazione del lavoratore

Nella versione 3.0 della KCL, una nuova classe, denominata Pianificatore, sostituisce la classe Lavoratore. Di seguito è illustrato un esempio di un lavoratore di KCL 1.x:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Per migrare il lavoratore
  1. Modifica la dichiarazione import per la classe Worker nelle dichiarazioni di importazione delle classi Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importa StreamTracker e modifica l’importazione di StreamsWorkerFactory in StreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
  3. Seleziona la posizione da cui avviare l’applicazione. Può essere TRIM_HORIZON o LATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Creazione di un’istanza StreamTracker.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Crea l’oggetto AmazonDynamoDBStreamsAdapterClient.

    import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Crea l’oggetto ConfigsBuilder.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Crea Scheduler con ConfigsBuilder come mostrato nell’esempio seguente:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Importante

L’impostazione CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X mantiene la compatibilità tra l’Adattatore Kinesis per i flussi DynamoDB per KCL v3 e KCL v1, non tra KCL v2 e v3.

Fase 4: panoramica e consigli sulla configurazione di KCL 3.x

Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta KCL configurations and KCL migration client configuration.

Importante

Invece di creare direttamente oggetti di checkpointConfig, coordinatorConfig, leaseManagementConfig, metricsConfig, processorConfig e retrievalConfig, si consiglia di utilizzare ConfigsBuilder per impostare le configurazioni in KCL 3.x e versioni successive per evitare problemi di inizializzazione del Pianificatore. ConfigsBuilder offre un modo più flessibile e gestibile per configurare l’applicazione KCL.

Configurazioni con aggiornamento del valore predefinito in KCL 3.x

billingMode

Nella versione 1.x di KCL, il valore predefinito per billingMode è impostato su PROVISIONED. Tuttavia, con la versione 3.x di KCL, l’impostazione predefinita billingMode è PAY_PER_REQUEST (modalità on demand). Si consiglia di utilizzare la modalità con capacità on demand per la tabella di lease per regolare automaticamente la capacità in base all’utilizzo. Per indicazioni sull’utilizzo della capacità allocata per le tabelle di lease, consulta Best practices for the lease table with provisioned capacity mode.

idleTimeBetweenReadsInMillis

Nella versione 1.x di KCL, il valore predefinito per idleTimeBetweenReadsInMillis è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinito per idleTimeBetweenReadsInMillis su 1.500 (o 1,5 secondi), ma l’Adattatore Kinesis per i flussi Amazon DynamoDB sostituisce il valore predefinito su 1.000 (o 1 secondo).

Nuove configurazioni in KCL 3.x

leaseAssignmentIntervalMillis

Questa configurazione definisce l’intervallo di tempo prima che gli shard di nuovo riscontro inizino l’elaborazione e viene calcolata come 1,5 × leaseAssignmentIntervalMillis. Se questa impostazione non è configurata in modo esplicito, l’intervallo di tempo predefinito è 1,5 × failoverTimeMillis. L’elaborazione di nuovi shard prevede la scansione della tabella di lease e l’interrogazione di un indice secondario globale (GSI) sulla tabella di lease. La riduzione di leaseAssignmentIntervalMillis aumenta la frequenza delle operazioni Scan e Query, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 (o 2 secondi) per ridurre al minimo il ritardo nell’elaborazione di nuovi shard.

shardConsumerDispatchPollIntervalMillis

Questa configurazione definisce l’intervallo tra polling successivi da parte del consumer dello shard per attivare le transizioni di stato. Nella versione 1.x di KCL, questo comportamento era controllato dal parametro idleTimeInMillis, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, si consiglia di impostare questa configurazione in modo che corrisponda al valore utilizzato per idleTimeInMillis nella configurazione della versione 1.x di KCL.

Fase 5: migrare da KCL 2.x a KCL 3.x

Per garantire una transizione e una compatibilità fluide con l’ultima versione di Kinesis Client Library (KCL), segui le fasi 5 – 8 nelle istruzioni della guida alla migrazione per l’aggiornamento da KCL 2.x a KCL 3.x.

Per la risoluzione dei problemi più comuni di KCL 3.x, consulta Troubleshooting KCL consumer applications.