Migration von KCL 1.x zu KCL 3.x - Amazon-DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migration von KCL 1.x zu KCL 3.x

Übersicht

Dieses Handbuch enthält Anweisungen zur Migration Ihrer Verbraucheranwendung von KCL 1.x auf KCL 3.x. Aufgrund der architektonischen Unterschiede zwischen KCL 1.x und KCL 3.x erfordert die Migration die Aktualisierung mehrerer Komponenten, um die Kompatibilität sicherzustellen.

KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen Record Processor, Record Processor Factory und Worker auf das KCL 3.x-kompatible Format migrieren und dann die Migrationsschritte für die Migration von KCL 1.x zu KCL 3.x befolgen.

Schritte zur Migration

Schritt 1: Migrieren Sie den Record Processor

Das folgende Beispiel zeigt einen Recordprozessor, der für den KCL 1.x DynamoDB Streams Kinesis-Adapter implementiert ist:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
RecordProcessor Um die Klasse zu migrieren
  1. Ändern Sie die Schnittstellen com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor wie folgt von com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor und com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware zu:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Aktualisieren Sie die Importanweisungen für die processRecords Methoden initialize und:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Ersetzen Sie die Methode shutdown durch die folgenden neuen Methoden: leaseLost, shardEnded und shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Im Folgenden finden Sie die aktualisierte Version der Record Processor-Klasse:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Anmerkung

DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe AttributeValue Objekte (BS,,, NS ML,SS) niemals Null zurück. Verwenden SiehasBs(),hasNs(),hasM(), hasSs() MethodenhasL(), um zu überprüfen, ob diese Werte existieren.

Schritt 2: Migrieren Sie den Record Processor ab Werk

Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Im Folgenden finden Sie ein Beispiel für eine KCL 1.x-Factory:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Um das zu migrieren RecordProcessorFactory
  • Ändern Sie die implementierte Schnittstelle wie folgt von com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory zusoftware.amazon.kinesis.processor.ShardRecordProcessorFactory:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Im Folgenden finden Sie ein Beispiel für die Record Processor Factory in 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Schritt 3: Migrieren Sie den Worker

In Version 3.0 der KCL ersetzt eine neue Klasse namens Scheduler die Worker-Klasse. Im Folgenden finden Sie ein Beispiel für einen KCL 1.x-Worker:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
So migrieren Sie den Auftragnehmer
  1. Ändern Sie die import-Anweisung für die Worker-Klasse, um Anweisungen für die Klassen Scheduler und ConfigsBuilder zu importieren.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Import StreamTracker und Änderung des Imports von zu. StreamsWorkerFactory StreamsSchedulerFactory

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Es kann TRIM_HORIZON oder seinLATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Erstellen Sie eine StreamTracker-Instance.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Erstellen Sie das AmazonDynamoDBStreamsAdapterClient Objekt.

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Erstellen Sie das ConfigsBuilder Objekt.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Erstellen Sie das Scheduler wie im folgenden Beispiel gezeigt:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Wichtig

Die CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X Einstellung gewährleistet die Kompatibilität zwischen DynamoDB Streams Kinesis Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.

Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x

Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter KCL-Konfigurationen und Konfiguration des KCL-Migrationsclients.

Konfigurationen mit dem Standardwert für Updates in KCL 3.x

billingMode

In KCL-Version 1.x ist der Standardwert für billingMode auf gesetzt. PROVISIONED Bei KCL-Version 3.x billingMode ist die Standardeinstellung jedoch PAY_PER_REQUEST (On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasing-Tabelle zu verwenden, um die Kapazität automatisch an Ihre Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasingtabellen finden Sie unter Bewährte Methoden für die Leasetabelle mit dem Modus „Bereitgestellte Kapazität“.

idleTimeBetweenReadsInMillis

In KCL Version 1.x idleTimeBetweenReadsInMillis ist der Standardwert für auf 1.000 (oder 1 Sekunde) festgelegt. KCL-Version 3.x legt den Standardwert für i dleTimeBetweenReadsInMillis auf 1.500 (oder 1,5 Sekunden) fest, aber Amazon DynamoDB Streams Kinesis Adapter überschreibt den Standardwert auf 1.000 (oder 1 Sekunde).

Neue Konfigurationen in KCL 3.x

leaseAssignmentIntervalMillis

Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu entdeckter Shards beginnt, und wird als 1,5 × berechnet. leaseAssignmentIntervalMillis Wenn diese Einstellung nicht explizit konfiguriert ist, ist das Zeitintervall standardmäßig auf 1,5 × eingestellt. failoverTimeMillis Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Verringerung der leaseAssignmentIntervalMillis erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2000 zu setzen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren.

shardConsumerDispatchPollIntervalMillis

Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den idleTimeInMillis Parameter gesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration so einzustellen, dass sie dem Wert entspricht, der idleTimeInMillis in Ihrem KCL-Version 1.x-Setup verwendet wurde.

Konfigurationen, die nicht vom Amazon DynamoDB Streams Kinesis Adapter überschrieben werden

shardSyncIntervalMillis

Der DynamoDB Streams Kinesis Adapter, der mit KCL Version 1.x kompatibel ist, wird explizit auf 0 gesetzt. shardSyncIntervalMillis Im Vergleich dazu legt der DynamoDB Streams Kinesis Adapter, der mit KCL Version 3.x kompatibel ist, keinen Wert mehr für diese Konfiguration fest. Um das gleiche Adapterverhalten wie in Version 1.x beizubehalten, setzen Sie den Wert dieser Konfiguration auf 0.

leasesRecoveryAuditorExecutionFrequencyMillis

Der DynamoDB Streams Kinesis Adapter, der mit KCL Version 1.x kompatibel ist, ist explizit auf 1000 festgelegt. leasesRecoveryAuditorExecutionFrequencyMillis Im Vergleich dazu legt der mit KCL Version 3.x kompatible DynamoDB Streams Kinesis Adapter keinen Standardwert mehr für diese Konfiguration fest. Um das gleiche Adapterverhalten wie in Version 1.x beizubehalten, setzen Sie den Wert dieser Konfiguration auf 1000.

Schritt 5: Migrieren Sie von KCL 2.x zu KCL 3.x

Um einen reibungslosen Übergang und die Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu gewährleisten, folgen Sie den Schritten 5—8 in den Anweisungen für das Upgrade von KCL 2.x auf KCL 3.x im Migrationshandbuch.