Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Migrieren von KCL 1.x zu KCL 3.x
Übersicht
In dieser Anleitung wird erläutert, wie Sie Ihre Verbraucheranwendung von KCL 1.x zu KCL 3.x migrieren. Aufgrund der unterschiedlichen Architektur von KCL 1.x und KCL 3.x müssen für die Migration mehrere Komponenten aktualisiert werden, um die Kompatibilität sicherzustellen.
KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen „Datensatzprozessor“, „Datensatzprozessor-Factory“ und „Worker“ in das KCL 3.x-kompatible Format migrieren und dann die Schritte für die Migration von KCL 1.x zu KCL 3.x ausführen.
Schritte zur Migration
Themen
Schritt 1: Migrieren des Datensatzprozessors
Das folgende Beispiel zeigt einen Datensatzprozessor, der für die Version KCL 1.x des DynamoDB-Streams-Kinesis-Adapters implementiert wurde:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Um die RecordProcessor Klasse zu migrieren
-
Ändern Sie die Schnittstellen von
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorundcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAwarefolgendermaßen zusoftware.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor -
Aktualisieren Sie die Importanweisungen für die Methoden
initializeundprocessRecords:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; -
Ersetzen Sie die Methode
shutdownRequesteddurch die folgenden neuen Methoden:leaseLost,shardEndedundshutdownRequested.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Nachstehend finden Sie die aktualisierte Version der Datensatzprozessorklasse:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Anmerkung
DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe AttributeValue Objekte (BS,,, NS ML,SS) niemals Null zurück. Verwenden Sie die Methoden hasBs(), hasNs(), hasM(), hasL(), hasSs(), um zu überprüfen, ob diese Werte existieren.
Schritt 2: Migrieren der Datensatzprozessor-Factory
Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Nachfolgend sehen Sie ein Beispiel für eine KCL-1.x-Factory:
package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
So migrieren Sie die RecordProcessorFactory
-
Ändern Sie die implementierte Schnittstelle von
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactoryfolgendermaßen zusoftware.amazon.kinesis.processor.ShardRecordProcessorFactory:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Nachfolgend sehen Sie ein Beispiel für die Datensatzprozessor-Factory in 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Schritt 3: Migrieren des Workers
In Version 3.0 der KCL wird die Worker-Klasse durch eine neue Klasse namens Scheduler ersetzt. Nachfolgend sehen Sie ein Beispiel für einen KCL-1.x-Worker.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
So migrieren Sie den Worker
-
Ändern Sie die
import-Anweisung für dieWorker-Klasse in die Import-Anweisungen für die KlassenSchedulerundConfigsBuilder.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder; -
Importieren Sie
StreamTrackerund ändern Sie den Import vonStreamsWorkerFactoryzuStreamsSchedulerFactory.import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory; -
Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Möglich sind
TRIM_HORIZONoderLATEST.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -
Erstellen Sie eine
StreamTracker-Instance.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); -
Erstellen Sie das Objekt
AmazonDynamoDBStreamsAdapterClient.import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion); -
Erstellen Sie das Objekt
ConfigsBuilder.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory()); -
Erstellen Sie den
Schedulermithilfe vonConfigsBuilder, wie im folgenden Beispiel gezeigt:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Wichtig
Die Einstellung CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X gewährleistet die Kompatibilität zwischen dem DynamoDB-Streams-Kinesis-Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.
Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x
Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter KCL Configurations und KCL Migration Client Configuration.
Wichtig
Anstatt Objekte von checkpointConfig, coordinatorConfig, leaseManagementConfig, metricsConfig, processorConfig und retrievalConfig direkt zu erstellen, empfehlen wir, ConfigsBuilder zu verwenden, um Konfigurationen in KCL 3.x und aktuelleren Versionen einzustellen. Das verhindert Probleme mit der Scheduler-Initialisierung. ConfigsBuilder bietet eine flexiblere und wartungsfreundlichere Methode zur Konfiguration Ihrer KCL-Anwendung.
Konfigurationen mit dem Standardwert für Aktualisierungen in KCL 3.x
billingMode-
In der KCL-Version 1.x ist der Standardwert für
billingModeaufPROVISIONEDeingestellt. Bei der KCL-Version 3.x ist die Standardeinstellung fürbillingModejedochPAY_PER_REQUEST(On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasetabelle zu verwenden, um die Kapazität automatisch an die Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasetabellen finden Sie unter Best practices for the lease table with provisioned capacity mode. idleTimeBetweenReadsInMillis-
In der KCL-Version 1.x ist der Standardwert für
idleTimeBetweenReadsInMillisauf 1 000 (oder 1 Sekunde) eingestellt. Die KCL-Version 3.x legt den Standardwert fürdleTimeBetweenReadsInMillisauf 1 500 (oder 1,5 Sekunden) fest. Der Amazon-DynamoDB-Streams-Kinesis-Adapter überschreibt den Standardwert jedoch mit 1 000 (oder 1 Sekunde).
Neue Konfigurationen in KCL 3.x
leaseAssignmentIntervalMillis-
Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu erkannter Shards beginnt. Es wird nach der Formel 1,5 ×
leaseAssignmentIntervalMillisberechnet. Wenn diese Einstellung nicht explizit konfiguriert ist, beträgt das Zeitintervall standardmäßig 1,5 ×failoverTimeMillis. Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Absenkung desleaseAssignmentIntervalMillis-Werts erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2 000 (oder 2 Sekunden) einzustellen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren. shardConsumerDispatchPollIntervalMillis-
Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den Parameter
idleTimeInMillisgesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration auf den Wert einzustellen, der in Ihrer KCL-Version 1.x füridleTimeInMillisverwendet wurde.
Schritt 5: Migrieren von KCL 2.x zu KCL 3.x
Um für einen reibungslosen Übergang und Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu sorgen, folgen Sie den Schritten 5–8 der Anleitung für das Upgrade von KCL 2.x auf KCL 3.x im Migrationsleitfaden.
Informationen zur Fehlerbehebung in KCL 3.x finden Sie unter Troubleshooting KCL consumer applications.