KCL 1.x에서 KCL 3.x로 마이그레이션
개요
이 가이드에서는 소비자 애플리케이션을 KCL 1.x에서 KCL 3.x로 마이그레이션하기 위한 지침을 제공합니다. KCL 1.x와 KCL 3.x의 아키텍처 차이로 인해 마이그레이션하려면 호환성을 보장하기 위해 여러 구성 요소를 업데이트해야 합니다.
KCL 1.x는 KCL 3.x와 다른 클래스 및 인터페이스를 사용합니다. KCL 1.x는 KCL 3.x와 다른 클래스 및 인터페이스를 사용합니다. 먼저 레코드 프로세서, 레코드 프로세서 팩토리 및 워커 클래스를 KCL 3.x 호환 형식으로 마이그레이션하고 KCL 1.x에서 KCL 3.x로 마이그레이션하는 단계를 따라야 합니다.
마이그레이션 단계
주제
1단계: 레코드 프로세서 마이그레이션
다음은 KCL 1.x DynamoDB Streams Kinesis 어댑터에 구현된 레코드 프로세서를 보여주는 예제입니다.
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
RecordProcessor 클래스를 마이그레이션하려면
-
다음과 같이 인터페이스를
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
및com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
에서com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
로 변경합니다.// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
initialize
및processRecords
메서드에 대한 import 문을 업데이트합니다.// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
shutdown
메서드를 새 메서드인leaseLost
,shardEnded
,shutdownRequested
으로 바꿉니다.// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
참고
DynamoDB Streams Kinesis Adapter는 이제 SDKv2 Record 모델을 사용합니다. SDKv2에서 복합 AttributeValue
객체(BS
, NS
, M
, L
, SS
)는 null을 반환하지 않습니다. hasBs()
, hasNs()
, hasM()
, hasL()
, hasSs()
메서드를 사용하여 이러한 값이 존재하는지 확인합니다.
2단계: 레코드 프로세서 팩토리 마이그레이션
레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
RecordProcessorFactory
를 마이그레이션하려면
-
구현된 인터페이스를 다음과 같이
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
에서software.amazon.kinesis.processor.ShardRecordProcessorFactory
로 변경합니다.com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
다음은 3.0의 레코드 프로세서 팩토리의 예입니다.
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
3단계: 워커 마이그레이션
KCL 버전 3.0에서는 Scheduler라는 새로운 클래스가 Worker 클래스를 대체합니다. 다음은 KCL 1.x 워커의 예입니다.
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
작업자를 마이그레이션하려면
-
Worker
클래스에 대한import
문을Scheduler
및ConfigsBuilder
클래스에 대한 가져오기 문으로 변경합니다.// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
StreamTracker
를 가져오고StreamsWorkerFactory
의 가져오기를StreamsSchedulerFactory
로 변경합니다.import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
애플리케이션을 시작할 위치를 선택합니다.
TRIM_HORIZON
또는LATEST
일 수 있습니다.import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
StreamTracker
인스턴스를 만듭니다.StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
AmazonDynamoDBStreamsAdapterClient
객체를 생성합니다.import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
ConfigsBuilder
객체를 생성합니다.import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
다음 예와 같이
Scheduler
를 생성합니다.import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
중요
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
설정은 DynamoDB Streams Kinesis Adapter KCL v2와 v3가 아닌 KCL v3와 KCL v1 간의 호환성을 유지합니다.
4단계: KCL 3.x 구성 개요 및 권장 사항
KCL 1.x 이후에 도입된 KCL 3.x 관련 구성에 대한 자세한 설명은 KCL 구성 및 KCL 마이그레이션 클라이언트 구성을 참조하세요.
KCL 3.x에서 업데이트 기본값을 사용한 구성
billingMode
-
KCL 버전 1.x에서는
billingMode
의 기본값이PROVISIONED
로 설정됩니다. 그러나 KCL 버전 3.x에서는 기본billingMode
가PAY_PER_REQUEST
(온디맨드 모드)입니다. 사용량에 따라 용량을 자동으로 조정하려면 리스 테이블에 온디맨드 용량 모드를 사용하는 것이 좋습니다. 리스 테이블에 프로비저닝된 용량을 사용하는 방법에 대한 지침은 프로비저닝된 용량 모드를 사용하는 리스 테이블의 모범 사례를 참조하세요. idleTimeBetweenReadsInMillis
-
KCL 버전 1.x에서는
idleTimeBetweenReadsInMillis
의 기본값이 1,000(또는 1초)으로 설정됩니다. KCL 버전 3.x에서는 idleTimeBetweenReadsInMillis
의 기본값이 1,500(또는 1.5초)으로 설정되지만 Amazon DynamoDB Streams Kinesis Adapter는 이 기본값을 1,000(또는 1초)으로 재정의합니다.
KCL 3.x의 새로운 구성
leaseAssignmentIntervalMillis
-
이 구성은 새로 검색된 샤드가 처리를 시작하기 전의 시간 간격을 정의하며, 1.5 ×
leaseAssignmentIntervalMillis
로 계산됩니다. 이 설정을 명시적으로 구성하지 않으면 시간 간격은 기본적으로 1.5 ×failoverTimeMillis
로 설정됩니다. 새 샤드를 처리하려면 리스 테이블을 스캔하고 리스 테이블에서 글로벌 보조 인덱스(GSI)를 쿼리해야 합니다.leaseAssignmentIntervalMillis
를 낮추면 이러한 스캔 및 쿼리 작업의 빈도가 증가하여 DynamoDB 비용이 증가합니다. 새 샤드 처리 지연을 최소화하려면 이 값을 2,000으로 설정하는 것이 좋습니다. shardConsumerDispatchPollIntervalMillis
-
이 구성은 상태 전환을 트리거하기 위해 샤드 소비자가 연속 폴링하는 간격을 정의합니다. KCL 버전 1.x에서 이 동작은 구성 가능한 설정으로 노출되지 않은
idleTimeInMillis
파라미터에 의해 제어되었습니다. KCL 버전 3.x에서는 KCL 버전 1.x 설정에서idleTimeInMillis
에 사용된 값과 일치하도록 이 구성을 설정하는 것이 좋습니다.
Amazon DynamoDB Streams Kinesis Adapter에서 재정의되지 않는 구성
shardSyncIntervalMillis
-
KCL 버전 1.x와 호환되는 DynamoDB Streams Kinesis Adapter는 명시적으로
shardSyncIntervalMillis
를 0으로 설정합니다. 반면, KCL 버전 3.x와 호환되는 DynamoDB Streams Kinesis Adapter는 더 이상 이 구성에 대한 값을 설정하지 않습니다. 버전 1.x와 동일한 어댑터 동작을 유지하려면 이 구성의 값을 0으로 설정합니다. leasesRecoveryAuditorExecutionFrequencyMillis
-
KCL 버전 1.x와 호환되는 DynamoDB Streams Kinesis Adapter는 명시적으로
leasesRecoveryAuditorExecutionFrequencyMillis
를 1,000으로 설정합니다. 반면, KCL 버전 3.x와 호환되는 DynamoDB Streams Kinesis Adapter는 더 이상 이 구성에 대한 기본값을 설정하지 않습니다. 버전 1.x와 동일한 어댑터 동작을 유지하려면 이 구성의 값을 1,000으로 설정합니다.
5단계: KCL 2.x에서 KCL 3.x로 마이그레이션
원활한 전환과 최신 Kinesis Client Library(KCL) 버전과의 호환성을 보장하려면 KCL 2.x에서 KCL 3.x로 업그레이드하기 위한 마이그레이션 가이드의 지침 5~8단계를 따르세요.