Migrar da KCL 1.x para a KCL 3.x - Amazon DynamoDB

Migrar da KCL 1.x para a KCL 3.x

Visão geral

Este guia oferece instruções para migrar uma aplicação de consumidor da KCL 1.x para a KCL 3.x. Devido a diferenças de arquitetura entre a KCL 1.x e a KCL 3.x, a migração requer a atualização de vários componentes para garantir compatibilidade.

A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. Você deve primeiro migrar o processador de registros, a fábrica do processador de registros e as classes de operador para o formato compatível com a KCL 3.x e seguir as etapas de migração da KCL 1.x para a KCL 3.x.

Etapas da migração

Etapa 1: migrar o processador de registros

Este exemplo mostra um processador de registros implementado para o DynamoDB Streams Kinesis Adapter da KCL 1.x:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
Como migrar a classe RecordProcessor
  1. Altere as interfaces de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor e com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware para com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor da seguinte forma:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. Atualize as instruções de importação para os métodos initialize e processRecords.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. Substitua o método shutdown pelos novos métodos a seguir: leaseLost, shardEnded, e shutdownRequested.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

Esta é a versão atualizada da classe de processador de registros:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
nota

O DynamoDB Streams Kinesis Adapter agora usa o modelo de registro SDKv2. No SDKv2, objetos AttributeValue complexos (BS, NS, M, L e SS) nunca retornam null. Use os métodos hasBs(), hasNs(), hasM(), hasL() e hasSs() para verificar se esses valores existem.

Etapa 2: migrar a fábrica do processador de registros

A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja o seguinte exemplo de fábrica da KCL 1.x:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
Migrar para RecordProcessorFactory
  • Altere a interface implementada de com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory para software.amazon.kinesis.processor.ShardRecordProcessorFactory da seguinte forma:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

Veja o seguinte exemplo de fábrica de processador de registros em 3.0:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

Etapa 3: migrar o operador

Na versão 3.0 da KCL, uma nova classe, chamada Scheduler, substitui a classe Worker. Veja o seguinte exemplo de operador da KCL 1.x:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
Para migrar o operador
  1. Altere a instrução import para a classe Worker para as instruções de importação para as classes Scheduler e ConfigsBuilder.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. Importe StreamTracker e altere a importação de StreamsWorkerFactory para StreamsSchedulerFactory.

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. Escolha a posição por meio da qual iniciar a aplicação. Ela pode ser TRIM_HORIZON ou LATEST.

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. Crie uma instância de StreamTracker.

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. Crie o objeto AmazonDynamoDBStreamsAdapterClient.

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. Crie o objeto ConfigsBuilder.

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. Crie Scheduler conforme mostrado no seguinte exemplo:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
Importante

A configuração CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X mantém a compatibilidade entre o DynamoDB Streams Kinesis Adapter para a KCL v3 e a KCL v1, não entre a KCL v2 e a v3.

Etapa 4: visão geral e recomendações de configuração da KCL 3.x

Para obter uma descrição detalhada das configurações introduzidas após a KCL 1.x que são relevantes na KCL 3.x, consulte KCL configurations e KCL migration client configuration.

Configurações com valor padrão de atualização na KCL 3.x

billingMode

Na KCL versão 1.x, o valor padrão para billingMode é definido como PROVISIONED. No entanto, na KCL versão 3.x, o padrão billingMode é PAY_PER_REQUEST (modo sob demanda). Recomendamos que você use o modo de capacidade sob demanda em sua tabela de concessões para ajustar automaticamente a capacidade com base no uso. Para obter orientações sobre como usar a capacidade provisionada para suas tabelas de concessões, consulte Best practices for the lease table with provisioned capacity mode.

idleTimeBetweenReadsInMillis

Na KCL versão 1.x, o valor padrão para idleTimeBetweenReadsInMillis é definido como 1.000 (ou 1 segundo). A KCL versão 3.x define o valor padrão para idleTimeBetweenReadsInMillis como 1.500 (ou 1,5 segundo), mas o Amazon DynamoDB Streams Kinesis Adapter substitui esse valor padrão, definindo-o como 1.000 (ou 1 segundo).

Novas configurações na KCL 3.x

leaseAssignmentIntervalMillis

Essa configuração define o intervalo de tempo antes que os fragmentos recém-descobertos comecem a ser processados, e é calculada como 1,5 × leaseAssignmentIntervalMillis. Se essa configuração não for definida explicitamente, o intervalo de tempo será padronizado como 1,5 × failoverTimeMillis. O processamento de novos fragmentos exige a verificação da tabela de concessões e a consulta a um índice secundário global (GSI) na tabela de concessões. A redução de leaseAssignmentIntervalMillis aumenta a frequência dessas operações de verificação e consulta, aumentando os custos do DynamoDB. Recomendamos definir esse valor como 2.000 para minimizar o atraso no processamento de novos fragmentos.

shardConsumerDispatchPollIntervalMillis

Essa configuração define o intervalo entre pesquisas sucessivas feitas pelo consumidor do fragmento para acionar transições de estado. Na KCL versão 1.x, esse comportamento era controlado pelo parâmetro idleTimeInMillis, que não era exposto como uma definição configurável. Na KCL versão 3.x, recomendamos definir essa configuração para corresponder ao valor usado em idleTimeInMillis na configuração da KCL versão 1.x.

Configurações não substituídas pelo Amazon DynamoDB Streams Kinesis Adapter

shardSyncIntervalMillis

O DynamoDB Streams Kinesis Adapter compatível com a KCL versão 1.x define explicitamente shardSyncIntervalMillis como 0. Ao passo que o Kinesis do DynamoDB Streams Adapter compatível com a KCL versão 3.x não define mais um valor para essa configuração. Para manter o mesmo comportamento do adaptador da versão 1.x, defina o valor dessa configuração como 0.

leasesRecoveryAuditorExecutionFrequencyMillis

O DynamoDB Streams Kinesis Adapter compatível com a KCL versão 1.x define explicitamente leasesRecoveryAuditorExecutionFrequencyMillis como 1.000. Ao passo que o Kinesis do DynamoDB Streams Adapter compatível com a KCL versão 3.x não define mais um valor padrão para essa configuração. Para manter o mesmo comportamento do adaptador da versão 1.x, defina o valor dessa configuração como 1.000.

Etapa 5: migrar da KCL 2.x para a KCL 3.x

Para garantir uma transição tranquila e a compatibilidade com a versão mais recente da Kinesis Client Library (KCL), siga as etapas de 5 a 8 nas instruções do guia de migração para atualizar da KCL 2.x para a KCL 3.x.