Migrieren von KCL 1.x zu KCL 3.x - Amazon DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migrieren von KCL 1.x zu KCL 3.x

Übersicht

In dieser Anleitung wird erläutert, wie Sie Ihre Verbraucheranwendung von KCL 1.x zu KCL 3.x migrieren. Aufgrund der unterschiedlichen Architektur von KCL 1.x und KCL 3.x müssen für die Migration mehrere Komponenten aktualisiert werden, um die Kompatibilität sicherzustellen.

KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen „Datensatzprozessor“, „Datensatzprozessor-Factory“ und „Worker“ in das KCL 3.x-kompatible Format migrieren und dann die Schritte für die Migration von KCL 1.x zu KCL 3.x ausführen.

Schritte zur Migration

Schritt 1: Migrieren des Datensatzprozessors

Das folgende Beispiel zeigt einen Datensatzprozessor, der für die Version KCL 1.x des DynamoDB-Streams-Kinesis-Adapters implementiert wurde:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Um die RecordProcessor Klasse zu migrieren
  1. Ändern Sie die Schnittstellen von com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor und com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware folgendermaßen zu software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Aktualisieren Sie die Importanweisungen für die Methoden initialize und processRecords:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Ersetzen Sie die Methode shutdownRequested durch die folgenden neuen Methoden: leaseLost, shardEnded und shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Nachstehend finden Sie die aktualisierte Version der Datensatzprozessorklasse:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Anmerkung

DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe AttributeValue Objekte (BS,,, NS ML,SS) niemals Null zurück. Verwenden Sie die Methoden hasBs(), hasNs(), hasM(), hasL(), hasSs(), um zu überprüfen, ob diese Werte existieren.

Schritt 2: Migrieren der Datensatzprozessor-Factory

Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Nachfolgend sehen Sie ein Beispiel für eine KCL-1.x-Factory:

package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
So migrieren Sie die RecordProcessorFactory
  • Ändern Sie die implementierte Schnittstelle von com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory folgendermaßen zu software.amazon.kinesis.processor.ShardRecordProcessorFactory:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Nachfolgend sehen Sie ein Beispiel für die Datensatzprozessor-Factory in 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Schritt 3: Migrieren des Workers

In Version 3.0 der KCL wird die Worker-Klasse durch eine neue Klasse namens Scheduler ersetzt. Nachfolgend sehen Sie ein Beispiel für einen KCL-1.x-Worker.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
So migrieren Sie den Worker
  1. Ändern Sie die import-Anweisung für die Worker-Klasse in die Import-Anweisungen für die Klassen Scheduler und ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importieren Sie StreamTracker und ändern Sie den Import von StreamsWorkerFactory zu StreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
  3. Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Möglich sind TRIM_HORIZON oder LATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Erstellen Sie eine StreamTracker-Instance.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Erstellen Sie das Objekt AmazonDynamoDBStreamsAdapterClient.

    import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Erstellen Sie das Objekt ConfigsBuilder.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Erstellen Sie den Scheduler mithilfe von ConfigsBuilder, wie im folgenden Beispiel gezeigt:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Wichtig

Die Einstellung CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X gewährleistet die Kompatibilität zwischen dem DynamoDB-Streams-Kinesis-Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.

Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x

Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter KCL Configurations und KCL Migration Client Configuration.

Wichtig

Anstatt Objekte von checkpointConfig, coordinatorConfig, leaseManagementConfig, metricsConfig, processorConfig und retrievalConfig direkt zu erstellen, empfehlen wir, ConfigsBuilder zu verwenden, um Konfigurationen in KCL 3.x und aktuelleren Versionen einzustellen. Das verhindert Probleme mit der Scheduler-Initialisierung. ConfigsBuilder bietet eine flexiblere und wartungsfreundlichere Methode zur Konfiguration Ihrer KCL-Anwendung.

Konfigurationen mit dem Standardwert für Aktualisierungen in KCL 3.x

billingMode

In der KCL-Version 1.x ist der Standardwert für billingMode auf PROVISIONED eingestellt. Bei der KCL-Version 3.x ist die Standardeinstellung für billingMode jedoch PAY_PER_REQUEST (On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasetabelle zu verwenden, um die Kapazität automatisch an die Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasetabellen finden Sie unter Best practices for the lease table with provisioned capacity mode.

idleTimeBetweenReadsInMillis

In der KCL-Version 1.x ist der Standardwert für idleTimeBetweenReadsInMillis auf 1 000 (oder 1 Sekunde) eingestellt. Die KCL-Version 3.x legt den Standardwert für dleTimeBetweenReadsInMillis auf 1 500 (oder 1,5 Sekunden) fest. Der Amazon-DynamoDB-Streams-Kinesis-Adapter überschreibt den Standardwert jedoch mit 1 000 (oder 1 Sekunde).

Neue Konfigurationen in KCL 3.x

leaseAssignmentIntervalMillis

Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu erkannter Shards beginnt. Es wird nach der Formel 1,5 × leaseAssignmentIntervalMillis berechnet. Wenn diese Einstellung nicht explizit konfiguriert ist, beträgt das Zeitintervall standardmäßig 1,5 × failoverTimeMillis. Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Absenkung des leaseAssignmentIntervalMillis-Werts erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2 000 (oder 2 Sekunden) einzustellen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren.

shardConsumerDispatchPollIntervalMillis

Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den Parameter idleTimeInMillis gesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration auf den Wert einzustellen, der in Ihrer KCL-Version 1.x für idleTimeInMillis verwendet wurde.

Schritt 5: Migrieren von KCL 2.x zu KCL 3.x

Um für einen reibungslosen Übergang und Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu sorgen, folgen Sie den Schritten 5–8 der Anleitung für das Upgrade von KCL 2.x auf KCL 3.x im Migrationsleitfaden.

Informationen zur Fehlerbehebung in KCL 3.x finden Sie unter Troubleshooting KCL consumer applications.