Migration de la KCL 1.x vers la KCL 3.x - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Migration de la KCL 1.x vers la KCL 3.x

Aperçu

Ce guide fournit des instructions pour migrer votre application consommateur de la KCL 1.x vers la KCL 3.x. En raison des différences architecturales entre la KCL 1.x et la KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.

La KCL 1.x utilise des classes et interfaces différentes rapport à la KCL 3.x. Vous devez d’abord migrer le processeur d’enregistrements, la fabrique de processeurs d’enregistrements et les classes de workers vers le format compatible avec la KCL 3.x, puis suivre les étapes de migration de la KCL 1.x vers la KCL 3.x.

Étapes de la migration

Étape 1 : migrer le processeur d’enregistrements

L’exemple suivant illustre un processeur d’enregistrements implémenté pour l’adaptateur DynamoDB Streams Kinesis version 1.x :

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la RecordProcessor classe
  1. Remplacez les interfaces com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor et com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware par software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor comme suit :

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Mettez à jour les instructions d’importation des méthodes initialize et processRecords :

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Remplacez la méthode shutdownRequested par les nouvelles méthodes suivantes : leaseLost, shardEnded et shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Voici la version mise à jour de la classe du processeur d’enregistrements :

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Note

L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, AttributeValue les objets complexes (BS,NS,, ML,SS) ne renvoient jamais nul. Vérifiez si ces valeurs existent à l’aide des méthodes hasBs(), hasNs(), hasM(), hasL() et hasSs().

Étape 2 : migrer la fabrique de processeurs d’enregistrements

La fabrique de processeurs d’enregistrements est responsable de la création des processeurs d’enregistrements lorsqu’un bail est acquis. Voici un exemple de fabrique KCL 1.x :

package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Pour migrer la RecordProcessorFactory
  • Remplacez l’interface implémentée com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory par software.amazon.kinesis.processor.ShardRecordProcessorFactory comme suit :

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Voici un exemple de fabrique de processeurs d’enregistrements dans 3.0 :

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Étape 3 : migrer le worker

Dans la version 3.0 de la KCL, une nouvelle classe, appelée Scheduler, remplace la classe Worker. Voici un exemple de worker KCL 1.x :

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Pour migrer le worker
  1. Modifiez l’instruction import de la classe Worker pour les instructions d’importation pour les classes Scheduler et ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importez StreamTracker et remplacez l’importation StreamsWorkerFactory par StreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
  3. Choisissez la position à partir de laquelle vous souhaitez démarrer l’application. Vous avez le choix entre TRIM_HORIZON et LATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Créez une instance StreamTracker.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Créez l’objet AmazonDynamoDBStreamsAdapterClient.

    import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Créez l’objet ConfigsBuilder.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Créez le Scheduler à l’aide de ConfigsBuilder, comme illustré dans l’exemple suivant :

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Important

Le paramètre CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X assure la compatibilité entre l’adaptateur DynamoDB Streams Kinesis pour la KCL v3 et la KCL v1, et non entre la KCL v2 et la KCL v3.

Étape 4 : présentation de la configuration de la KCL 3.x et recommandations

Pour obtenir une description détaillée des configurations introduites après la KCL 1.x qui sont pertinentes dans la KCL 3.x, consultez KCL configurations et KCL migration client configuration.

Important

Au lieu de créer directement des objets de checkpointConfig, coordinatorConfig, leaseManagementConfig, metricsConfig, processorConfig et retrievalConfig, nous vous recommandons de définir des configurations dans la KCL 3.x et versions ultérieures à l’aide de ConfigsBuilder, afin d’éviter les problèmes d’initialisation du Scheduler. ConfigsBuilder fournit une méthode plus flexible et plus facile à gérer pour configurer votre application KCL.

Configurations avec mise à jour de la valeur par défaut dans la KCL 3.x

billingMode

Dans la KCL version 1.x, la valeur par défaut de billingMode est définie sur PROVISIONED. En revanche, avec la KCL version 3.x, le billingMode par défaut est PAY_PER_REQUEST (mode à la demande). Nous vous recommandons d’utiliser le mode de capacité à la demande pour votre table de baux, afin d’ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l’utilisation de la capacité allouée pour vos tables de baux, consultez Best practices for the lease table with provisioned capacity mode.

idleTimeBetweenReadsInMillis

Dans la KCL version 1.x, la valeur par défaut de idleTimeBetweenReadsInMillis est définie sur 1 000 (soit 1 seconde). La KCL version 3.x définit la valeur par défaut de idleTimeBetweenReadsInMillis sur 1 500 (soit 1,5 seconde), mais l’adaptateur Amazon DynamoDB Streams Kinesis remplace la valeur par défaut par 1 000 (soit 1 seconde).

Nouvelles configurations de la KCL 3.x

leaseAssignmentIntervalMillis

Cette configuration définit l’intervalle de temps avant que les partitions récemment découvertes ne commencent à être traitées. Elle est calculée comme suit : 1,5 × leaseAssignmentIntervalMillis. Si ce paramètre n’est pas explicitement configuré, l’intervalle de temps est défini par défaut sur 1,5 × failoverTimeMillis. Le traitement des nouvelles partitions consiste à analyser la table de baux et à interroger un index secondaire global (GSI) de la table de baux. La réduction de leaseAssignmentIntervalMillis augmente la fréquence de ces opérations d’analyse et d’interrogation, ce qui entraîne une augmentation des coûts de DynamoDB. Nous vous recommandons de définir cette valeur sur 2 000 (soit 2 secondes) afin de réduire le délai de traitement des nouvelles partitions.

shardConsumerDispatchPollIntervalMillis

Cette configuration définit l’intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d’état. Dans la KCL version 1.x, ce comportement était contrôlé par le paramètre idleTimeInMillis, qui n’était pas exposé en tant que paramètre configurable. Avec la KCL version 3.x, nous vous recommandons de définir cette configuration de sorte qu’elle corresponde à la valeur utilisée pour idleTimeInMillis dans votre configuration de la KCL version 1.x.

Étape 5 : migrer de la KCL 2.x vers la KCL 3.x

Pour garantir une transition fluide et une compatibilité avec la version la plus récente de la bibliothèque client Kinesis (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la mise à niveau de la KCL 2.x vers la KCL 3.x.

Pour la résolution des problèmes courants liés à la KCL 3.x, consultez Troubleshooting KCL consumer applications.