Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Migration von KCL 1.x zu KCL 3.x
Übersicht
Dieses Handbuch enthält Anweisungen zur Migration Ihrer Verbraucheranwendung von KCL 1.x auf KCL 3.x. Aufgrund der architektonischen Unterschiede zwischen KCL 1.x und KCL 3.x erfordert die Migration die Aktualisierung mehrerer Komponenten, um die Kompatibilität sicherzustellen.
KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. KCL 1.x verwendet im Vergleich zu KCL 3.x andere Klassen und Schnittstellen. Sie müssen zuerst die Klassen Record Processor, Record Processor Factory und Worker auf das KCL 3.x-kompatible Format migrieren und dann die Migrationsschritte für die Migration von KCL 1.x zu KCL 3.x befolgen.
Schritte zur Migration
Themen
Schritt 1: Migrieren Sie den Record Processor
Das folgende Beispiel zeigt einen Recordprozessor, der für den KCL 1.x DynamoDB Streams Kinesis-Adapter implementiert ist:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
RecordProcessor Um die Klasse zu migrieren
-
Ändern Sie die Schnittstellen
com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
wie folgt voncom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
undcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
zu:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
Aktualisieren Sie die Importanweisungen für die
processRecords
Methodeninitialize
und:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
Ersetzen Sie die Methode
shutdown
durch die folgenden neuen Methoden:leaseLost
,shardEnded
undshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Im Folgenden finden Sie die aktualisierte Version der Record Processor-Klasse:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Anmerkung
DynamoDB Streams Kinesis Adapter verwendet SDKv2 jetzt das Record-Modell. In SDKv2 geben komplexe AttributeValue
Objekte (BS
,,, NS
M
L
,SS
) niemals Null zurück. Verwenden SiehasBs()
,hasNs()
,hasM()
, hasSs()
MethodenhasL()
, um zu überprüfen, ob diese Werte existieren.
Schritt 2: Migrieren Sie den Record Processor ab Werk
Die Datensatzprozessor-Factory ist für das Erstellen von Prozessoren verantwortlich, wenn eine Lease erworben wird. Im Folgenden finden Sie ein Beispiel für eine KCL 1.x-Factory:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Um das zu migrieren RecordProcessorFactory
-
Ändern Sie die implementierte Schnittstelle wie folgt von
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
zusoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Im Folgenden finden Sie ein Beispiel für die Record Processor Factory in 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Schritt 3: Migrieren Sie den Worker
In Version 3.0 der KCL ersetzt eine neue Klasse namens Scheduler die Worker-Klasse. Im Folgenden finden Sie ein Beispiel für einen KCL 1.x-Worker:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
So migrieren Sie den Auftragnehmer
-
Ändern Sie die
import
-Anweisung für dieWorker
-Klasse, um Anweisungen für die KlassenScheduler
undConfigsBuilder
zu importieren.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Import
StreamTracker
und Änderung des Imports von zu.StreamsWorkerFactory
StreamsSchedulerFactory
import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
Wählen Sie die Position, von der aus die Anwendung gestartet werden soll. Es kann
TRIM_HORIZON
oder seinLATEST
.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
Erstellen Sie eine
StreamTracker
-Instance.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
Erstellen Sie das
AmazonDynamoDBStreamsAdapterClient
Objekt.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
Erstellen Sie das
ConfigsBuilder
Objekt.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
Erstellen Sie das
Scheduler
wie im folgenden Beispiel gezeigt:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Wichtig
Die CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
Einstellung gewährleistet die Kompatibilität zwischen DynamoDB Streams Kinesis Adapter für KCL v3 und KCL v1, nicht zwischen KCL v2 und v3.
Schritt 4: Überblick über die Konfiguration und Empfehlungen für KCL 3.x
Eine ausführliche Beschreibung der nach KCL 1.x eingeführten Konfigurationen, die für KCL 3.x relevant sind, finden Sie unter KCL-Konfigurationen und Konfiguration des KCL-Migrationsclients.
Konfigurationen mit dem Standardwert für Updates in KCL 3.x
billingMode
-
In KCL-Version 1.x ist der Standardwert für
billingMode
auf gesetzt.PROVISIONED
Bei KCL-Version 3.xbillingMode
ist die Standardeinstellung jedochPAY_PER_REQUEST
(On-Demand-Modus). Wir empfehlen Ihnen, den On-Demand-Kapazitätsmodus für Ihre Leasing-Tabelle zu verwenden, um die Kapazität automatisch an Ihre Nutzung anzupassen. Anleitungen zur Verwendung der bereitgestellten Kapazität für Ihre Leasingtabellen finden Sie unter Bewährte Methoden für die Leasetabelle mit dem Modus „Bereitgestellte Kapazität“. idleTimeBetweenReadsInMillis
-
In KCL Version 1.x
idleTimeBetweenReadsInMillis
ist der Standardwert für auf 1.000 (oder 1 Sekunde) festgelegt. KCL-Version 3.x legt den Standardwert für idleTimeBetweenReadsInMillis
auf 1.500 (oder 1,5 Sekunden) fest, aber Amazon DynamoDB Streams Kinesis Adapter überschreibt den Standardwert auf 1.000 (oder 1 Sekunde).
Neue Konfigurationen in KCL 3.x
leaseAssignmentIntervalMillis
-
Diese Konfiguration definiert das Zeitintervall, bis die Verarbeitung neu entdeckter Shards beginnt, und wird als 1,5 × berechnet.
leaseAssignmentIntervalMillis
Wenn diese Einstellung nicht explizit konfiguriert ist, ist das Zeitintervall standardmäßig auf 1,5 × eingestellt.failoverTimeMillis
Die Verarbeitung neuer Shards beinhaltet das Scannen der Leasetabelle und das Abfragen eines globalen sekundären Index (GSI) in der Leasetabelle. Eine Verringerung derleaseAssignmentIntervalMillis
erhöht die Häufigkeit dieser Scan- und Abfragevorgänge, was zu höheren DynamoDB-Kosten führt. Wir empfehlen, diesen Wert auf 2000 zu setzen, um die Verzögerung bei der Verarbeitung neuer Shards zu minimieren. shardConsumerDispatchPollIntervalMillis
-
Diese Konfiguration definiert das Intervall zwischen aufeinanderfolgenden Abfragen durch den Shard-Verbraucher, um Zustandsübergänge auszulösen. In KCL-Version 1.x wurde dieses Verhalten durch den
idleTimeInMillis
Parameter gesteuert, der nicht als konfigurierbare Einstellung verfügbar war. Bei KCL-Version 3.x empfehlen wir, diese Konfiguration so einzustellen, dass sie dem Wert entspricht, deridleTimeInMillis
in Ihrem KCL-Version 1.x-Setup verwendet wurde.
Konfigurationen, die nicht vom Amazon DynamoDB Streams Kinesis Adapter überschrieben werden
shardSyncIntervalMillis
-
Der DynamoDB Streams Kinesis Adapter, der mit KCL Version 1.x kompatibel ist, wird explizit auf 0 gesetzt.
shardSyncIntervalMillis
Im Vergleich dazu legt der DynamoDB Streams Kinesis Adapter, der mit KCL Version 3.x kompatibel ist, keinen Wert mehr für diese Konfiguration fest. Um das gleiche Adapterverhalten wie in Version 1.x beizubehalten, setzen Sie den Wert dieser Konfiguration auf 0. leasesRecoveryAuditorExecutionFrequencyMillis
-
Der DynamoDB Streams Kinesis Adapter, der mit KCL Version 1.x kompatibel ist, ist explizit auf 1000 festgelegt.
leasesRecoveryAuditorExecutionFrequencyMillis
Im Vergleich dazu legt der mit KCL Version 3.x kompatible DynamoDB Streams Kinesis Adapter keinen Standardwert mehr für diese Konfiguration fest. Um das gleiche Adapterverhalten wie in Version 1.x beizubehalten, setzen Sie den Wert dieser Konfiguration auf 1000.
Schritt 5: Migrieren Sie von KCL 2.x zu KCL 3.x
Um einen reibungslosen Übergang und die Kompatibilität mit der neuesten Version der Kinesis Client Library (KCL) zu gewährleisten, folgen Sie den Schritten 5—8 in den Anweisungen für das Upgrade von KCL 2.x auf KCL 3.x im Migrationshandbuch.