Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Migration de la KCL 1.x vers la KCL 3.x
Aperçu
Ce guide fournit des instructions pour migrer votre application consommateur de la KCL 1.x vers la KCL 3.x. En raison des différences architecturales entre la KCL 1.x et la KCL 3.x, la migration nécessite la mise à jour de plusieurs composants pour garantir la compatibilité.
La KCL 1.x utilise des classes et interfaces différentes rapport à la KCL 3.x. Vous devez d’abord migrer le processeur d’enregistrements, la fabrique de processeurs d’enregistrements et les classes de workers vers le format compatible avec la KCL 3.x, puis suivre les étapes de migration de la KCL 1.x vers la KCL 3.x.
Étapes de la migration
Rubriques
Étape 1 : migrer le processeur d’enregistrements
L’exemple suivant illustre un processeur d’enregistrements implémenté pour l’adaptateur DynamoDB Streams Kinesis version 1.x :
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Pour migrer la RecordProcessor classe
-
Remplacez les interfaces
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessoretcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAwareparsoftware.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessorcomme suit :// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor -
Mettez à jour les instructions d’importation des méthodes
initializeetprocessRecords:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; -
Remplacez la méthode
shutdownRequestedpar les nouvelles méthodes suivantes :leaseLost,shardEndedetshutdownRequested.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Voici la version mise à jour de la classe du processeur d’enregistrements :
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Note
L'adaptateur DynamoDB Streams Kinesis utilise désormais le modèle Record. SDKv2 Dans SDKv2, AttributeValue les objets complexes (BS,NS,, ML,SS) ne renvoient jamais nul. Vérifiez si ces valeurs existent à l’aide des méthodes hasBs(), hasNs(), hasM(), hasL() et hasSs().
Étape 2 : migrer la fabrique de processeurs d’enregistrements
La fabrique de processeurs d’enregistrements est responsable de la création des processeurs d’enregistrements lorsqu’un bail est acquis. Voici un exemple de fabrique KCL 1.x :
package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Pour migrer la RecordProcessorFactory
-
Remplacez l’interface implémentée
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactoryparsoftware.amazon.kinesis.processor.ShardRecordProcessorFactorycomme suit :// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Voici un exemple de fabrique de processeurs d’enregistrements dans 3.0 :
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Étape 3 : migrer le worker
Dans la version 3.0 de la KCL, une nouvelle classe, appelée Scheduler, remplace la classe Worker. Voici un exemple de worker KCL 1.x :
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Pour migrer le worker
-
Modifiez l’instruction
importde la classeWorkerpour les instructions d’importation pour les classesScheduleretConfigsBuilder.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder; -
Importez
StreamTrackeret remplacez l’importationStreamsWorkerFactoryparStreamsSchedulerFactory.import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory; -
Choisissez la position à partir de laquelle vous souhaitez démarrer l’application. Vous avez le choix entre
TRIM_HORIZONetLATEST.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -
Créez une instance
StreamTracker.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); -
Créez l’objet
AmazonDynamoDBStreamsAdapterClient.import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion); -
Créez l’objet
ConfigsBuilder.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory()); -
Créez le
Schedulerà l’aide deConfigsBuilder, comme illustré dans l’exemple suivant :import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Important
Le paramètre CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X assure la compatibilité entre l’adaptateur DynamoDB Streams Kinesis pour la KCL v3 et la KCL v1, et non entre la KCL v2 et la KCL v3.
Étape 4 : présentation de la configuration de la KCL 3.x et recommandations
Pour obtenir une description détaillée des configurations introduites après la KCL 1.x qui sont pertinentes dans la KCL 3.x, consultez KCL configurations et KCL migration client configuration.
Important
Au lieu de créer directement des objets de checkpointConfig, coordinatorConfig, leaseManagementConfig, metricsConfig, processorConfig et retrievalConfig, nous vous recommandons de définir des configurations dans la KCL 3.x et versions ultérieures à l’aide de ConfigsBuilder, afin d’éviter les problèmes d’initialisation du Scheduler. ConfigsBuilder fournit une méthode plus flexible et plus facile à gérer pour configurer votre application KCL.
Configurations avec mise à jour de la valeur par défaut dans la KCL 3.x
billingMode-
Dans la KCL version 1.x, la valeur par défaut de
billingModeest définie surPROVISIONED. En revanche, avec la KCL version 3.x, lebillingModepar défaut estPAY_PER_REQUEST(mode à la demande). Nous vous recommandons d’utiliser le mode de capacité à la demande pour votre table de baux, afin d’ajuster automatiquement la capacité en fonction de votre utilisation. Pour obtenir des conseils sur l’utilisation de la capacité allouée pour vos tables de baux, consultez Best practices for the lease table with provisioned capacity mode. idleTimeBetweenReadsInMillis-
Dans la KCL version 1.x, la valeur par défaut de
idleTimeBetweenReadsInMillisest définie sur 1 000 (soit 1 seconde). La KCL version 3.x définit la valeur par défaut de idleTimeBetweenReadsInMillissur 1 500 (soit 1,5 seconde), mais l’adaptateur Amazon DynamoDB Streams Kinesis remplace la valeur par défaut par 1 000 (soit 1 seconde).
Nouvelles configurations de la KCL 3.x
leaseAssignmentIntervalMillis-
Cette configuration définit l’intervalle de temps avant que les partitions récemment découvertes ne commencent à être traitées. Elle est calculée comme suit : 1,5 ×
leaseAssignmentIntervalMillis. Si ce paramètre n’est pas explicitement configuré, l’intervalle de temps est défini par défaut sur 1,5 ×failoverTimeMillis. Le traitement des nouvelles partitions consiste à analyser la table de baux et à interroger un index secondaire global (GSI) de la table de baux. La réduction deleaseAssignmentIntervalMillisaugmente la fréquence de ces opérations d’analyse et d’interrogation, ce qui entraîne une augmentation des coûts de DynamoDB. Nous vous recommandons de définir cette valeur sur 2 000 (soit 2 secondes) afin de réduire le délai de traitement des nouvelles partitions. shardConsumerDispatchPollIntervalMillis-
Cette configuration définit l’intervalle entre les interrogations successives effectuées par le consommateur de partitions pour déclencher des transitions d’état. Dans la KCL version 1.x, ce comportement était contrôlé par le paramètre
idleTimeInMillis, qui n’était pas exposé en tant que paramètre configurable. Avec la KCL version 3.x, nous vous recommandons de définir cette configuration de sorte qu’elle corresponde à la valeur utilisée pouridleTimeInMillisdans votre configuration de la KCL version 1.x.
Étape 5 : migrer de la KCL 2.x vers la KCL 3.x
Pour garantir une transition fluide et une compatibilité avec la version la plus récente de la bibliothèque client Kinesis (KCL), suivez les étapes 5 à 8 des instructions du guide de migration pour la mise à niveau de la KCL 2.x vers la KCL 3.x.
Pour la résolution des problèmes courants liés à la KCL 3.x, consultez Troubleshooting KCL consumer applications.