기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Amazon Keyspaces CDC 스트림에 대한 KCL 소비자 애플리케이션 구현
이 주제에서는 KCL 소비자 애플리케이션을 구현하여 Amazon Keyspaces CDC 스트림을 처리하는 방법에 대한 step-by-step 가이드를 제공합니다.
-
사전 조건: 시작하기 전에 다음을 갖추었는지 확인합니다.
-
CDC 스트림이 있는 Amazon Keyspaces 테이블
-
IAM 보안 주체가 Amazon Keyspaces CDC 스트림에 액세스하고, KCL 스트림 처리를 위해 DynamoDB 테이블을 생성 및 액세스하고, CloudWatch에 지표를 게시하는 데 필요한 IAM 권한입니다. 자세한 내용과 정책 예제는 섹션을 참조하세요Kinesis Client Library(KCL)를 사용하여 Amazon Keyspaces CDC 스트림을 처리할 수 있는 권한.
로컬 구성에 유효한 AWS자격 증명이 설정되어 있는지 확인합니다. 자세한 내용은 프로그래밍 방식 액세스를 위한 액세스 키 저장 단원을 참조하십시오.
-
Java Development Kit(JDK) 8 이상
-
Github의 Readme
에 나열된 요구 사항입니다.
-
-
이 단계에서는 KCL 종속성을 프로젝트에 추가합니다. Maven의 경우 pom.xml에 다음을 추가합니다.
<dependencies> <dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>software.amazon.keyspaces</groupId> <artifactId>keyspaces-streams-kinesis-adapter</artifactId> <version>1.0.0</version> </dependency> </dependencies>참고
항상 KCL GitHub 리포지토리에서 최신 버전의 KCL
을 확인하세요. -
레코드 프로세서 인스턴스를 생성하는 팩토리 클래스를 생성합니다.
import software.amazon.awssdk.services.keyspacesstreams.model.Record; import software.amazon.keyspaces.streamsadapter.adapter.KeyspacesStreamsClientRecord; import software.amazon.keyspaces.streamsadapter.model.KeyspacesStreamsProcessRecordsInput; import software.amazon.keyspaces.streamsadapter.processor.KeyspacesStreamsShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; public class RecordProcessor implements KeyspacesStreamsShardRecordProcessor { private String shardId; @Override public void initialize(InitializationInput initializationInput) { this.shardId = initializationInput.shardId(); System.out.println("Initializing record processor for shard: " + shardId); } @Override public void processRecords(KeyspacesStreamsProcessRecordsInput processRecordsInput) { try { for (KeyspacesStreamsClientRecord record : processRecordsInput.records()) { Record keyspacesRecord = record.getRecord(); System.out.println("Received record: " + keyspacesRecord); } if (!processRecordsInput.records().isEmpty()) { RecordProcessorCheckpointer checkpointer = processRecordsInput.checkpointer(); try { checkpointer.checkpoint(); System.out.println("Checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while checkpointing for shard: " + shardId + " " + e); } } } catch (Exception e) { System.out.println("Error processing records for shard: " + shardId + " " + e); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease lost for shard: " + shardId); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { System.out.println("Shard ended: " + shardId); try { // This is required. Checkpoint at the end of the shard shardEndedInput.checkpointer().checkpoint(); System.out.println("Final checkpoint successful for shard: " + shardId); } catch (Exception e) { System.out.println("Error while final checkpointing for shard: " + shardId + " " + e); throw new RuntimeException("Error while final checkpointing", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { System.out.println("Shutdown requested for shard " + shardId); try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (Exception e) { System.out.println("Error while checkpointing on shutdown for shard: " + shardId + " " + e); } } } -
다음 예제와 같이 레코드 팩토리를 생성합니다.
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class RecordProcessorFactory implements ShardRecordProcessorFactory { private final Queue<RecordProcessor> processors = new ConcurrentLinkedQueue<>(); @Override public ShardRecordProcessor shardRecordProcessor() { System.out.println("Creating new RecordProcessor"); RecordProcessor processor = new RecordProcessor(); processors.add(processor); return processor; } } -
이 단계에서는 KCLv3 및 Amazon Keyspaces 어댑터를 구성하는 기본 클래스를 생성합니다.
import com.example.KCLExample.utils.RecordProcessorFactory; import software.amazon.keyspaces.streamsadapter.AmazonKeyspacesStreamsAdapterClient; import software.amazon.keyspaces.streamsadapter.StreamsSchedulerFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteTableResponse; import software.amazon.awssdk.services.keyspacesstreams.KeyspacesStreamsClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.CoordinatorConfig; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.polling.PollingConfig; public class KCLTestBase { protected KeyspacesStreamsClient streamsClient; protected KinesisAsyncClient adapterClient; protected DynamoDbAsyncClient dynamoDbAsyncClient; protected CloudWatchAsyncClient cloudWatchClient; protected Region region; protected RecordProcessorFactory recordProcessorFactory; protected Scheduler scheduler; protected Thread schedulerThread; public void baseSetUp() { recordProcessorFactory = new RecordProcessorFactory(); setupKCLBase(); } protected void setupKCLBase() { region = Region.US_EAST_1; streamsClient = KeyspacesStreamsClient.builder() .region(region) .build(); adapterClient = new AmazonKeyspacesStreamsAdapterClient( streamsClient, region); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .region(region) .build(); cloudWatchClient = CloudWatchAsyncClient.builder() .region(region) .build(); } protected void startScheduler(Scheduler scheduler) { this.scheduler = scheduler; schedulerThread = new Thread(() -> scheduler.run()); schedulerThread.start(); } protected void shutdownScheduler() { if (scheduler != null) { scheduler.shutdown(); try { schedulerThread.join(30000); } catch (InterruptedException e) { System.out.println("Error while shutting down scheduler " + e); } } } protected Scheduler createScheduler(String streamArn, String leaseTableName) { String workerId = "worker-" + System.currentTimeMillis(); // Create ConfigsBuilder ConfigsBuilder configsBuilder = createConfigsBuilder(streamArn, workerId, leaseTableName); // Configure retrieval config for polling PollingConfig pollingConfig = new PollingConfig(streamArn, adapterClient); // Create the Scheduler return StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(pollingConfig), streamsClient, region ); } private ConfigsBuilder createConfigsBuilder(String streamArn, String workerId, String leaseTableName) { ConfigsBuilder configsBuilder = new ConfigsBuilder( streamArn, leaseTableName, adapterClient, dynamoDbAsyncClient, cloudWatchClient, workerId, recordProcessorFactory); configureCoordinator(configsBuilder.coordinatorConfig()); configureLeaseManagement(configsBuilder.leaseManagementConfig()); configureProcessor(configsBuilder.processorConfig()); configureStreamTracker(configsBuilder, streamArn); return configsBuilder; } private void configureCoordinator(CoordinatorConfig config) { config.skipShardSyncAtWorkerInitializationIfLeasesExist(true) .parentShardPollIntervalMillis(1000) .shardConsumerDispatchPollIntervalMillis(500); } private void configureLeaseManagement(LeaseManagementConfig config) { config.shardSyncIntervalMillis(0) .leasesRecoveryAuditorInconsistencyConfidenceThreshold(0) .leasesRecoveryAuditorExecutionFrequencyMillis(5000) .leaseAssignmentIntervalMillis(1000L); } private void configureProcessor(ProcessorConfig config) { config.callProcessRecordsEvenForEmptyRecordList(true); } private void configureStreamTracker(ConfigsBuilder configsBuilder, String streamArn) { StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); configsBuilder.streamTracker(streamTracker); } public void deleteAllDdbTables(String baseTableName) { List<String> tablesToDelete = Arrays.asList( baseTableName, baseTableName + "-CoordinatorState", baseTableName + "-WorkerMetricStats" ); for (String tableName : tablesToDelete) { deleteTable(tableName); } } private void deleteTable(String tableName) { DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder() .tableName(tableName) .build(); try { DeleteTableResponse response = dynamoDbAsyncClient.deleteTable(deleteTableRequest).get(); System.out.println("Table deletion response " + response); } catch (InterruptedException | ExecutionException e) { System.out.println("Error deleting table: " + tableName + " " + e); } } } -
이 단계에서는 애플리케이션에 대한 레코드 프로세서 클래스를 구현하여 변경 이벤트 처리를 시작합니다.
import software.amazon.kinesis.coordinator.Scheduler; public class KCLTest { private static final int APP_RUNTIME_SECONDS = 1800; private static final int SLEEP_INTERNAL_MS = 60*1000; public static void main(String[] args) { KCLTestBase kclTestBase; kclTestBase = new KCLTestBase(); kclTestBase.baseSetUp(); // Create and start scheduler String leaseTableName = generateUniqueApplicationName(); // Update below to your Stream ARN String streamArn = "arn:aws:cassandra:us-east-1:759151643516:/keyspace/cdc_sample_test/table/test_kcl_bool/stream/2025-07-01T15:52:57.529"; Scheduler scheduler = kclTestBase.createScheduler(streamArn, leaseTableName); kclTestBase.startScheduler(scheduler); // Wait for specified time before shutting down - KCL applications are designed to run forever, however in this // example we will shut it down after APP_RUNTIME_SECONDS long startTime = System.currentTimeMillis(); long endTime = startTime + (APP_RUNTIME_SECONDS * 1000); while (System.currentTimeMillis() < endTime) { try { // Print and sleep every minute Thread.sleep(SLEEP_INTERNAL_MS); System.out.println("Application is running"); } catch (InterruptedException e) { System.out.println("Interrupted while waiting for records"); Thread.currentThread().interrupt(); break; } } // Stop the scheduler kclTestBase.shutdownScheduler(); kclTestBase.deleteAllDdbTables(leaseTableName); } public static String generateUniqueApplicationName() { String timestamp = String.valueOf(System.currentTimeMillis()); String randomString = java.util.UUID.randomUUID().toString().substring(0, 8); return String.format("KCL-App-%s-%s", timestamp, randomString); } }
모범 사례
Amazon Keyspaces CDC 스트림과 함께 KCL을 사용할 때는 다음 모범 사례를 따르세요.
- 오류 처리
-
레코드 프로세서에서 강력한 오류 처리를 구현하여 예외를 정상적으로 처리합니다. 일시적인 장애에 대한 재시도 로직을 구현하는 것이 좋습니다.
- 체크포인트 지정 빈도
-
체크포인트 빈도의 균형을 조정하여 중복 처리를 최소화하는 동시에 적절한 진행 상황 추적을 보장합니다. 체크포인트를 너무 자주 사용하면 성능에 영향을 미칠 수 있지만 체크포인트를 너무 자주 사용하지 않으면 작업자가 실패할 경우 재처리가 증가할 수 있습니다.
- 작업자 규모 조정
-
CDC 스트림의 샤드 수에 따라 작업자 수를 조정합니다. 좋은 출발점은 샤드당 하나의 작업자를 두는 것이지만 처리 요구 사항에 따라 조정해야 할 수 있습니다.
- 모니터링
-
KCL에서 제공하는 CloudWatch 지표를 사용하여 소비자 애플리케이션의 상태와 성능을 모니터링합니다. 주요 지표에는 처리 지연 시간, 체크포인트 기간 및 임대 수가 포함됩니다.
- 테스트
-
작업자 장애, 스트림 리샤딩, 다양한 로드 조건과 같은 시나리오를 포함하여 소비자 애플리케이션을 철저히 테스트합니다.
비 Java 언어에서 KCL 사용
KCL은 주로 Java 라이브러리이지만 MultiLangDaemon을 통해 다른 프로그래밍 언어와 함께 사용할 수 있습니다. MultiLangDaemon은 비 Java 레코드 프로세서와 KCL 간의 상호 작용을 관리하는 Java 기반 데몬입니다.
KCL은 다음 언어를 지원합니다.
-
Python
-
Ruby
-
Node.js
-
.NET
Java 이외의 언어로 KCL을 사용하는 방법에 대한 자세한 내용은 KCL MultiLangDaemon 설명서를
문제 해결
이 섹션에서는 Amazon Keyspaces CDC 스트림에서 KCL을 사용할 때 발생할 수 있는 일반적인 문제에 대한 솔루션을 제공합니다.
- 느린 처리
-
소비자 애플리케이션이 레코드를 느리게 처리하는 경우 다음을 고려하세요.
-
작업자 인스턴스 수 증가
-
레코드 처리 로직 최적화
-
다운스트림 시스템의 병목 현상 확인
-
- 중복 처리
-
레코드가 중복 처리되는 경우 체크포인트 로직을 확인합니다. 레코드를 성공적으로 처리한 후 체크포인트를 지정하고 있는지 확인합니다.
- 작업자 실패
-
작업자가 자주 실패하는 경우 다음을 확인합니다.
-
리소스 제약 조건(CPU, 메모리)
-
네트워크 연결 이벤트
-
권한 문제
-
- 임대 테이블 문제
-
KCL 임대 테이블에 문제가 있는 경우:
-
애플리케이션에 Amazon Keyspaces 테이블에 액세스할 수 있는 적절한 권한이 있는지 확인합니다.
-
테이블에 충분한 프로비저닝 처리량이 있는지 확인
-