Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Migration de KCL 1.x vers KCL 3.x
Présentation
Ce guide fournit des instructions pour migrer votre application client de KCL 1.x vers KCL 3.x. En raison des différences architecturales entre KCL 1.x et KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.
KCL 1.x utilise différentes classes et interfaces par rapport à KCL 3.x. KCL 1.x utilise différentes classes et interfaces par rapport à KCL 3.x. Vous devez d'abord migrer le processeur d'enregistrements, l'usine du processeur d'enregistrements et les classes de travail vers le format compatible KCL 3.x, puis suivre les étapes de migration de KCL 1.x vers KCL 3.x.
Étapes de la migration
Rubriques
Étape 1 : migrer le processeur d'enregistrement
L'exemple suivant montre un processeur d'enregistrement implémenté pour l'adaptateur KCL 1.x DynamoDB Streams Kinesis :
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la RecordProcessor classe
-
Modifiez les interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
etcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
verscom.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
comme suit :// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
Mettez à jour les instructions d'importation pour les
processRecords
méthodesinitialize
et :// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
Remplacez la méthode
shutdown
par les nouvelles méthodes suivantes :leaseLost
,shardEnded
etshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Voici la version mise à jour de la classe de processeur d'enregistrement :
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Note
L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, AttributeValue
les objets complexes (BS
,NS
,, M
L
,SS
) ne renvoient jamais nul. Utilisez hasBs()
les hasSs()
méthodes hasNs()
hasM()
,hasL()
,, pour vérifier si ces valeurs existent.
Étape 2 : migrer l'usine de traitement des enregistrements
La fabrique de processeurs d'enregistrements est responsable de la création des processeurs d'enregistrements lorsqu'un bail est acquis. Voici un exemple d'usine KCL 1.x :
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Pour migrer le RecordProcessorFactory
-
Modifiez l'interface implémentée de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
àsoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, comme suit :com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Voici un exemple de la fabrique de processeurs d'enregistrement dans la version 3.0 :
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Étape 3 : migrer le travailleur
Dans la version 3.0 de la KCL, une nouvelle classe, appelée Scheduler, remplace la classe Worker. Voici un exemple de programme de travail KCL 1.x :
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Pour migrer l'application de travail
-
Modifiez la déclaration
import
de la classeWorker
pour les instructions d'importation pour les classesScheduler
etConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Importer
StreamTracker
et modifier l'importation deStreamsWorkerFactory
versStreamsSchedulerFactory
.import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
Choisissez la position à partir de laquelle vous souhaitez démarrer l'application. Cela peut être
TRIM_HORIZON
ouLATEST
.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
Créer une instance
StreamTracker
.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
Créez l'
AmazonDynamoDBStreamsAdapterClient
objet.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
Créez l'
ConfigsBuilder
objet.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
Créez le
Scheduler
comme indiqué dans l'exemple suivant :import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Important
Le CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
paramètre maintient la compatibilité entre l'adaptateur DynamoDB Streams Kinesis pour KCL v3 et KCL v1, et non entre KCL v2 et v3.
Étape 4 : Présentation de la configuration de KCL 3.x et recommandations
Pour une description détaillée des configurations introduites après KCL 1.x qui sont pertinentes dans KCL 3.x, voir Configurations KCL et Configuration du client de migration KCL.
Configurations avec mise à jour de la valeur par défaut dans KCL 3.x
billingMode
-
Dans la version 1.x de KCL, la valeur par défaut pour
billingMode
est définie sur.PROVISIONED
Cependant, avec la version 3.x de KCL, le mode par défautbillingMode
estPAY_PER_REQUEST
(mode à la demande). Nous vous recommandons d'utiliser le mode de capacité à la demande pour votre tableau de location afin d'ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l'utilisation de la capacité allouée pour vos tables de location, consultez la section Meilleures pratiques relatives à la table de location avec mode de capacité provisionnée. idleTimeBetweenReadsInMillis
-
Dans la version 1.x de KCL, la valeur par défaut pour
idleTimeBetweenReadsInMillis
est définie sur 1 000 (ou 1 seconde). La version 3.x de KCL définit la valeur par défaut pour idleTimeBetweenReadsInMillis
à 1 500 (soit 1,5 seconde), mais Amazon DynamoDB Streams Kinesis Adapter remplace la valeur par défaut à 1 000 (soit 1 seconde).
Nouvelles configurations dans KCL 3.x
leaseAssignmentIntervalMillis
-
Cette configuration définit l'intervalle de temps avant que les fragments récemment découverts ne commencent à être traités, et est calculé comme 1,5 ×
leaseAssignmentIntervalMillis
. Si ce paramètre n'est pas explicitement configuré, l'intervalle de temps est défini par défaut sur 1,5 ×failoverTimeMillis
. Le traitement des nouvelles partitions implique de scanner la table des baux et d'interroger un indice secondaire global (GSI) sur la table des baux. La réduction de la fréquenceleaseAssignmentIntervalMillis
augmente la fréquence de ces opérations d'analyse et de requête, ce qui entraîne une augmentation des coûts DynamoDB. Nous recommandons de définir cette valeur sur 2000 afin de minimiser le délai de traitement des nouvelles partitions. shardConsumerDispatchPollIntervalMillis
-
Cette configuration définit l'intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d'état. Dans la version 1.x de KCL, ce comportement était contrôlé par le
idleTimeInMillis
paramètre, qui n'était pas exposé en tant que paramètre configurable. Avec la version 3.x de KCL, nous vous recommandons de définir cette configuration pour qu'elle corresponde à la valeur utiliséeidleTimeInMillis
dans votre configuration de KCL version 1.x.
Configurations non remplacées par l'adaptateur Kinesis d'Amazon DynamoDB Streams
shardSyncIntervalMillis
-
L'adaptateur DynamoDB Streams Kinesis compatible avec la version 1.x de KCL est explicitement défini sur 0.
shardSyncIntervalMillis
En comparaison, l'adaptateur DynamoDB Streams Kinesis compatible avec la version 3.x de KCL ne définit plus de valeur pour cette configuration. Pour conserver le même comportement d'adaptateur que celui de la version 1.x, définissez la valeur de cette configuration sur 0. leasesRecoveryAuditorExecutionFrequencyMillis
-
L'adaptateur DynamoDB Streams Kinesis compatible avec la version 1.x de KCL est défini explicitement sur 1000.
leasesRecoveryAuditorExecutionFrequencyMillis
En comparaison, l'adaptateur DynamoDB Streams Kinesis compatible avec la version 3.x de KCL ne définit plus de valeur par défaut pour cette configuration. Pour conserver le même comportement d'adaptateur que celui de la version 1.x, définissez la valeur de cette configuration sur 1000.
Étape 5 : Migrer de KCL 2.x vers KCL 3.x
Pour garantir une transition fluide et une compatibilité avec la dernière version de Kinesis Client Library (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la mise à niveau de KCL 2.x vers KCL 3.x.