Migrar da KCL 1.x para a KCL 3.x
Visão geral
Este guia oferece instruções para migrar uma aplicação de consumidor da KCL 1.x para a KCL 3.x. Devido a diferenças de arquitetura entre a KCL 1.x e a KCL 3.x, a migração requer a atualização de vários componentes para garantir compatibilidade.
A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. Você deve primeiro migrar o processador de registros, a fábrica do processador de registros e as classes de operador para o formato compatível com a KCL 3.x e seguir as etapas de migração da KCL 1.x para a KCL 3.x.
Etapas da migração
Tópicos
Etapa 1: migrar o processador de registros
Este exemplo mostra um processador de registros implementado para o DynamoDB Streams Kinesis Adapter da KCL 1.x:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Como migrar a classe RecordProcessor
-
Altere as interfaces de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
ecom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
paracom.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
da seguinte forma:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
Atualize as instruções de importação para os métodos
initialize
eprocessRecords
.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
Substitua o método
shutdown
pelos novos métodos a seguir:leaseLost
,shardEnded
, eshutdownRequested
.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
Esta é a versão atualizada da classe de processador de registros:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
nota
O DynamoDB Streams Kinesis Adapter agora usa o modelo de registro SDKv2. No SDKv2, objetos AttributeValue
complexos (BS
, NS
, M
, L
e SS
) nunca retornam null. Use os métodos hasBs()
, hasNs()
, hasM()
, hasL()
e hasSs()
para verificar se esses valores existem.
Etapa 2: migrar a fábrica do processador de registros
A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja o seguinte exemplo de fábrica da KCL 1.x:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Migrar para RecordProcessorFactory
-
Altere a interface implementada de
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
parasoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
da seguinte forma:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
Veja o seguinte exemplo de fábrica de processador de registros em 3.0:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
Etapa 3: migrar o operador
Na versão 3.0 da KCL, uma nova classe, chamada Scheduler, substitui a classe Worker. Veja o seguinte exemplo de operador da KCL 1.x:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Para migrar o operador
-
Altere a instrução
import
para a classeWorker
para as instruções de importação para as classesScheduler
eConfigsBuilder
.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
Importe
StreamTracker
e altere a importação deStreamsWorkerFactory
paraStreamsSchedulerFactory
.import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
Escolha a posição por meio da qual iniciar a aplicação. Ela pode ser
TRIM_HORIZON
ouLATEST
.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
Crie uma instância de
StreamTracker
.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
Crie o objeto
AmazonDynamoDBStreamsAdapterClient
.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
Crie o objeto
ConfigsBuilder
.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
Crie
Scheduler
conforme mostrado no seguinte exemplo:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Importante
A configuração CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
mantém a compatibilidade entre o DynamoDB Streams Kinesis Adapter para a KCL v3 e a KCL v1, não entre a KCL v2 e a v3.
Etapa 4: visão geral e recomendações de configuração da KCL 3.x
Para obter uma descrição detalhada das configurações introduzidas após a KCL 1.x que são relevantes na KCL 3.x, consulte KCL configurations e KCL migration client configuration.
Configurações com valor padrão de atualização na KCL 3.x
billingMode
-
Na KCL versão 1.x, o valor padrão para
billingMode
é definido comoPROVISIONED
. No entanto, na KCL versão 3.x, o padrãobillingMode
éPAY_PER_REQUEST
(modo sob demanda). Recomendamos que você use o modo de capacidade sob demanda em sua tabela de concessões para ajustar automaticamente a capacidade com base no uso. Para obter orientações sobre como usar a capacidade provisionada para suas tabelas de concessões, consulte Best practices for the lease table with provisioned capacity mode. idleTimeBetweenReadsInMillis
-
Na KCL versão 1.x, o valor padrão para
idleTimeBetweenReadsInMillis
é definido como 1.000 (ou 1 segundo). A KCL versão 3.x define o valor padrão para idleTimeBetweenReadsInMillis
como 1.500 (ou 1,5 segundo), mas o Amazon DynamoDB Streams Kinesis Adapter substitui esse valor padrão, definindo-o como 1.000 (ou 1 segundo).
Novas configurações na KCL 3.x
leaseAssignmentIntervalMillis
-
Essa configuração define o intervalo de tempo antes que os fragmentos recém-descobertos comecem a ser processados, e é calculada como 1,5 ×
leaseAssignmentIntervalMillis
. Se essa configuração não for definida explicitamente, o intervalo de tempo será padronizado como 1,5 ×failoverTimeMillis
. O processamento de novos fragmentos exige a verificação da tabela de concessões e a consulta a um índice secundário global (GSI) na tabela de concessões. A redução deleaseAssignmentIntervalMillis
aumenta a frequência dessas operações de verificação e consulta, aumentando os custos do DynamoDB. Recomendamos definir esse valor como 2.000 para minimizar o atraso no processamento de novos fragmentos. shardConsumerDispatchPollIntervalMillis
-
Essa configuração define o intervalo entre pesquisas sucessivas feitas pelo consumidor do fragmento para acionar transições de estado. Na KCL versão 1.x, esse comportamento era controlado pelo parâmetro
idleTimeInMillis
, que não era exposto como uma definição configurável. Na KCL versão 3.x, recomendamos definir essa configuração para corresponder ao valor usado emidleTimeInMillis
na configuração da KCL versão 1.x.
Configurações não substituídas pelo Amazon DynamoDB Streams Kinesis Adapter
shardSyncIntervalMillis
-
O DynamoDB Streams Kinesis Adapter compatível com a KCL versão 1.x define explicitamente
shardSyncIntervalMillis
como 0. Ao passo que o Kinesis do DynamoDB Streams Adapter compatível com a KCL versão 3.x não define mais um valor para essa configuração. Para manter o mesmo comportamento do adaptador da versão 1.x, defina o valor dessa configuração como 0. leasesRecoveryAuditorExecutionFrequencyMillis
-
O DynamoDB Streams Kinesis Adapter compatível com a KCL versão 1.x define explicitamente
leasesRecoveryAuditorExecutionFrequencyMillis
como 1.000. Ao passo que o Kinesis do DynamoDB Streams Adapter compatível com a KCL versão 3.x não define mais um valor padrão para essa configuração. Para manter o mesmo comportamento do adaptador da versão 1.x, defina o valor dessa configuração como 1.000.
Etapa 5: migrar da KCL 2.x para a KCL 3.x
Para garantir uma transição tranquila e a compatibilidade com a versão mais recente da Kinesis Client Library (KCL), siga as etapas de 5 a 8 nas instruções do guia de migração para atualizar da KCL 2.x para a KCL 3.x.