Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Migrazione da KCL 1.x a KCL 3.x
Panoramica
Questa guida fornisce istruzioni per la migrazione dell'applicazione consumer da KCL 1.x a KCL 3.x. A causa delle differenze architettoniche tra KCL 1.x e KCL 3.x, la migrazione richiede l'aggiornamento di diversi componenti per garantire la compatibilità.
KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. KCL 1.x utilizza classi e interfacce diverse rispetto a KCL 3.x. È necessario prima migrare il record processor, il record processor factory e le classi worker al formato compatibile con KCL 3.x e seguire i passaggi di migrazione per la migrazione da KCL 1.x a KCL 3.x.
Fasi della migrazione
Argomenti
Fase 1: Migrazione del processore di registrazione
L'esempio seguente mostra un processore di registrazione implementato per l'adattatore KCL 1.x DynamoDB Streams Kinesis:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Per migrare la classe RecordProcessor
-
Modificate le interfacce da
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ecom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
versocom.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
come segue:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
Aggiorna le istruzioni di importazione per i
processRecords
metodiinitialize
and:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
Sostituisci il metodo
shutdown
con i seguenti nuovi metodi:leaseLost
,shardEnded
eshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Di seguito è riportata la versione aggiornata della classe dei processori di registrazione:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
Nota
L'adattatore Kinesis di DynamoDB Streams ora utilizza il modello Record. SDKv2 In SDKv2, AttributeValue
gli oggetti complessi (,BS
,, NS
M
L
,SS
) non restituiscono mai null. Utilizzate i hasSs()
metodi hasBs()
hasNs()
hasM()
,hasL()
,,, per verificare se questi valori esistono.
Fase 2: Migrazione della fabbrica del processore di registrazione
La fabbrica dell'elaboratore di record è responsabile per la creazione di elaboratori di record quando un lease è acquisito. Di seguito è riportato un esempio di fabbrica KCL 1.x:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Per migrare il RecordProcessorFactory
-
Modificare l'interfaccia implementata da
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
asoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
, come segue:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Di seguito è riportato un esempio della fabbrica di processori di registrazione nella versione 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Fase 3: Migrazione del lavoratore
Nella versione 3.0 di KCL, una nuova classe, chiamata Scheduler, sostituisce la classe Worker. Di seguito è riportato un esempio di worker KCL 1.x:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Per migrare il lavoratore
-
Modifica la dichiarazione
import
per la classeWorker
nelle dichiarazioni di importazione delle classiScheduler
eConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Importa
StreamTracker
e modifica l'importazione diStreamsWorkerFactory
to.StreamsSchedulerFactory
import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
Scegli la posizione da cui avviare l'applicazione. Può essere
TRIM_HORIZON
oLATEST
.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
Crea un'
StreamTracker
istanza.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
Create l'
AmazonDynamoDBStreamsAdapterClient
oggetto.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
Create l'
ConfigsBuilder
oggetto.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
Create il file
Scheduler
come mostrato nell'esempio seguente:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Importante
L'CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
impostazione mantiene la compatibilità tra DynamoDB Streams Kinesis Adapter for KCL v3 e KCL v1, non tra KCL v2 e v3.
Fase 4: Panoramica e consigli sulla configurazione di KCL 3.x
Per una descrizione dettagliata delle configurazioni introdotte dopo KCL 1.x e rilevanti in KCL 3.x, consulta Configurazioni KCL e configurazione del client di migrazione KCL.
Configurazioni con valore predefinito di aggiornamento in KCL 3.x
billingMode
-
Nella versione KCL 1.x, il valore predefinito per è impostato su.
billingMode
PROVISIONED
Tuttavia, con la versione 3.x di KCL, l'impostazione predefinitabillingMode
èPAY_PER_REQUEST
(modalità su richiesta). Ti consigliamo di utilizzare la modalità di capacità su richiesta per la tua tabella di leasing per regolare automaticamente la capacità in base all'utilizzo. Per indicazioni sull'utilizzo della capacità assegnata per le tabelle di leasing, consulta Best practice per la tabella di leasing con modalità di capacità assegnata. idleTimeBetweenReadsInMillis
-
Nella versione KCL 1.x, il valore predefinito per
idleTimeBetweenReadsInMillis
è impostato su 1.000 (o 1 secondo). La versione 3.x di KCL imposta il valore predefinitodleTimeBetweenReadsInMillis
per i su 1.500 (o 1,5 secondi), ma Amazon DynamoDB Streams Kinesis Adapter sostituisce il valore predefinito su 1.000 (o 1 secondo).
Nuove configurazioni in KCL 3.x
leaseAssignmentIntervalMillis
-
Questa configurazione definisce l'intervallo di tempo prima che i frammenti appena scoperti inizino l'elaborazione e viene calcolata come 1,5 ×.
leaseAssignmentIntervalMillis
Se questa impostazione non è configurata in modo esplicito, l'intervallo di tempo predefinito è 1,5 ×.failoverTimeMillis
L'elaborazione di nuovi shard prevede la scansione della tabella di leasing e l'interrogazione di un indice secondario globale (GSI) sulla tabella di leasing. La riduzione dileaseAssignmentIntervalMillis
aumenta la frequenza di queste operazioni di scansione e interrogazione, con conseguente aumento dei costi di DynamoDB. Si consiglia di impostare questo valore su 2000 per ridurre al minimo il ritardo nell'elaborazione di nuovi shard. shardConsumerDispatchPollIntervalMillis
-
Questa configurazione definisce l'intervallo tra i sondaggi successivi da parte del consumatore dello shard per attivare le transizioni di stato. Nella versione KCL 1.x, questo comportamento era controllato dal
idleTimeInMillis
parametro, che non era esposto come impostazione configurabile. Con la versione 3.x di KCL, consigliamo di impostare questa configurazione in modo che corrisponda al valore utilizzatoidleTimeInMillis
nella configurazione della versione 1.x di KCL.
Configurazioni non sovrascritte da Amazon DynamoDB Streams Kinesis Adapter
shardSyncIntervalMillis
-
L'adattatore Kinesis DynamoDB Streams compatibile con la versione 1.x di KCL viene impostato esplicitamente su 0.
shardSyncIntervalMillis
In confronto, il DynamoDB Streams Kinesis Adapter compatibile con la versione 3.x di KCL non imposta più un valore per questa configurazione. Per mantenere lo stesso comportamento dell'adattatore della versione 1.x, imposta il valore di questa configurazione su 0. leasesRecoveryAuditorExecutionFrequencyMillis
-
L'adattatore Kinesis DynamoDB Streams compatibile con la versione 1.x di KCL è impostato esplicitamente su 1000.
leasesRecoveryAuditorExecutionFrequencyMillis
In confronto, il DynamoDB Streams Kinesis Adapter compatibile con la versione 3.x di KCL non imposta più un valore predefinito per questa configurazione. Per mantenere lo stesso comportamento dell'adattatore della versione 1.x, imposta il valore di questa configurazione su 1000.
Fase 5: Migrazione da KCL 2.x a KCL 3.x
Per garantire una transizione e una compatibilità fluide con l'ultima versione di Kinesis Client Library (KCL), segui i passaggi 5-8 nelle istruzioni della guida alla migrazione per l'aggiornamento da KCL 2.x a KCL 3.x.