Migration de KCL 1.x vers KCL 3.x - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Migration de KCL 1.x vers KCL 3.x

Présentation

Ce guide fournit des instructions pour migrer votre application client de KCL 1.x vers KCL 3.x. En raison des différences architecturales entre KCL 1.x et KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.

KCL 1.x utilise différentes classes et interfaces par rapport à KCL 3.x. KCL 1.x utilise différentes classes et interfaces par rapport à KCL 3.x. Vous devez d'abord migrer le processeur d'enregistrements, l'usine du processeur d'enregistrements et les classes de travail vers le format compatible KCL 3.x, puis suivre les étapes de migration de KCL 1.x vers KCL 3.x.

Étapes de la migration

Étape 1 : migrer le processeur d'enregistrement

L'exemple suivant montre un processeur d'enregistrement implémenté pour l'adaptateur KCL 1.x DynamoDB Streams Kinesis :

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la RecordProcessor classe
  1. Modifiez les interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor et com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware vers com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor comme suit :

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Mettez à jour les instructions d'importation pour les processRecords méthodes initialize et :

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Remplacez la méthode shutdown par les nouvelles méthodes suivantes : leaseLost, shardEnded et shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Voici la version mise à jour de la classe de processeur d'enregistrement :

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Note

L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, AttributeValue les objets complexes (BS,NS,, ML,SS) ne renvoient jamais nul. Utilisez hasBs() les hasSs() méthodes hasNs()hasM(),hasL(),, pour vérifier si ces valeurs existent.

Étape 2 : migrer l'usine de traitement des enregistrements

La fabrique de processeurs d'enregistrements est responsable de la création des processeurs d'enregistrements lorsqu'un bail est acquis. Voici un exemple d'usine KCL 1.x :

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Pour migrer le RecordProcessorFactory
  • Modifiez l'interface implémentée de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory àsoftware.amazon.kinesis.processor.ShardRecordProcessorFactory, comme suit :

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Voici un exemple de la fabrique de processeurs d'enregistrement dans la version 3.0 :

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Étape 3 : migrer le travailleur

Dans la version 3.0 de la KCL, une nouvelle classe, appelée Scheduler, remplace la classe Worker. Voici un exemple de programme de travail KCL 1.x :

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Pour migrer l'application de travail
  1. Modifiez la déclaration import de la classe Worker pour les instructions d'importation pour les classes Scheduler et ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importer StreamTracker et modifier l'importation de StreamsWorkerFactory versStreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. Choisissez la position à partir de laquelle vous souhaitez démarrer l'application. Cela peut être TRIM_HORIZON ouLATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Créer une instance StreamTracker.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Créez l'AmazonDynamoDBStreamsAdapterClientobjet.

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Créez l'ConfigsBuilderobjet.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Créez le Scheduler comme indiqué dans l'exemple suivant :

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Important

Le CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X paramètre maintient la compatibilité entre l'adaptateur DynamoDB Streams Kinesis pour KCL v3 et KCL v1, et non entre KCL v2 et v3.

Étape 4 : Présentation de la configuration de KCL 3.x et recommandations

Pour une description détaillée des configurations introduites après KCL 1.x qui sont pertinentes dans KCL 3.x, voir Configurations KCL et Configuration du client de migration KCL.

Configurations avec mise à jour de la valeur par défaut dans KCL 3.x

billingMode

Dans la version 1.x de KCL, la valeur par défaut pour billingMode est définie sur. PROVISIONED Cependant, avec la version 3.x de KCL, le mode par défaut billingMode est PAY_PER_REQUEST (mode à la demande). Nous vous recommandons d'utiliser le mode de capacité à la demande pour votre tableau de location afin d'ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l'utilisation de la capacité allouée pour vos tables de location, consultez la section Meilleures pratiques relatives à la table de location avec mode de capacité provisionnée.

idleTimeBetweenReadsInMillis

Dans la version 1.x de KCL, la valeur par défaut pour idleTimeBetweenReadsInMillis est définie sur 1 000 (ou 1 seconde). La version 3.x de KCL définit la valeur par défaut pour i dleTimeBetweenReadsInMillis à 1 500 (soit 1,5 seconde), mais Amazon DynamoDB Streams Kinesis Adapter remplace la valeur par défaut à 1 000 (soit 1 seconde).

Nouvelles configurations dans KCL 3.x

leaseAssignmentIntervalMillis

Cette configuration définit l'intervalle de temps avant que les fragments récemment découverts ne commencent à être traités, et est calculé comme 1,5 ×leaseAssignmentIntervalMillis. Si ce paramètre n'est pas explicitement configuré, l'intervalle de temps est défini par défaut sur 1,5 ×failoverTimeMillis. Le traitement des nouvelles partitions implique de scanner la table des baux et d'interroger un indice secondaire global (GSI) sur la table des baux. La réduction de la fréquence leaseAssignmentIntervalMillis augmente la fréquence de ces opérations d'analyse et de requête, ce qui entraîne une augmentation des coûts DynamoDB. Nous recommandons de définir cette valeur sur 2000 afin de minimiser le délai de traitement des nouvelles partitions.

shardConsumerDispatchPollIntervalMillis

Cette configuration définit l'intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d'état. Dans la version 1.x de KCL, ce comportement était contrôlé par le idleTimeInMillis paramètre, qui n'était pas exposé en tant que paramètre configurable. Avec la version 3.x de KCL, nous vous recommandons de définir cette configuration pour qu'elle corresponde à la valeur utilisée idleTimeInMillis dans votre configuration de KCL version 1.x.

Configurations non remplacées par l'adaptateur Kinesis d'Amazon DynamoDB Streams

shardSyncIntervalMillis

L'adaptateur DynamoDB Streams Kinesis compatible avec la version 1.x de KCL est explicitement défini sur 0. shardSyncIntervalMillis En comparaison, l'adaptateur DynamoDB Streams Kinesis compatible avec la version 3.x de KCL ne définit plus de valeur pour cette configuration. Pour conserver le même comportement d'adaptateur que celui de la version 1.x, définissez la valeur de cette configuration sur 0.

leasesRecoveryAuditorExecutionFrequencyMillis

L'adaptateur DynamoDB Streams Kinesis compatible avec la version 1.x de KCL est défini explicitement sur 1000. leasesRecoveryAuditorExecutionFrequencyMillis En comparaison, l'adaptateur DynamoDB Streams Kinesis compatible avec la version 3.x de KCL ne définit plus de valeur par défaut pour cette configuration. Pour conserver le même comportement d'adaptateur que celui de la version 1.x, définissez la valeur de cette configuration sur 1000.

Étape 5 : Migrer de KCL 2.x vers KCL 3.x

Pour garantir une transition fluide et une compatibilité avec la dernière version de Kinesis Client Library (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la mise à niveau de KCL 2.x vers KCL 3.x.