Java에서 KCL을 사용하여 소비자 개발 - Amazon Kinesis Data Streams

Java에서 KCL을 사용하여 소비자 개발

사전 조건

KCL 3.x를 사용하여 시작하기 전에 다음이 있는지 확인합니다.

  • Java Development Kit(JDK) 8 이상

  • AWS SDK for Java 2.x

  • 종속성 관리를 위한 Maven 또는 Gradle

KCL은 워커가 작동 중인 컴퓨팅 호스트에서 CPU 사용률과 같은 CPU 사용률 지표를 수집하여 로드의 균형을 조정함으로써 워커 간에 리소스 사용률 수준을 균등하게 유지합니다. KCL이 워커로부터 CPU 사용률 지표를 수집할 수 있게 하려면 다음 사전 조건을 충족해야 합니다.

Amazon Elastic Compute Cloud(Amazon EC2)

  • 운영 체제는 Linux OS여야 합니다.

  • EC2 인스턴스에서 IMDSv2를 활성화해야 합니다.

Amazon EC2의 Amazon Elastic Container Service(Amazon ECS)

의 Amazon ECSAWS Fargate

Amazon EC2의 Amazon Elastic Kubernetes Service(Amazon EKS)

  • 운영 체제는 Linux OS여야 합니다.

에 대한 Amazon EKSAWS Fargate

  • Fargate 플랫폼 버전 1.3.0 이상.

중요

KCL이 워커로부터 CPU 사용률 지표를 수집할 수 없는 경우 KCL은 다시 돌아가 워커당 처리량을 사용하여 리스를 할당하고 플릿의 워커 간에 로드의 균형을 조정합니다. 자세한 내용은 KCL이 워커에게 리스를 할당하고 로드의 균형을 조정하는 방법 섹션을 참조하세요.

종속성 설치 및 추가

Maven을 사용하는 경우 pom.xml 파일에 다음 종속성을 추가합니다. 3.x.x를 최신 KCL 버전으로 교체했는지 확인합니다.

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Gradle을 사용하는 경우 build.gradle 파일에 다음을 추가합니다. 3.x.x를 최신 KCL 버전으로 교체했는지 확인합니다.

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

Maven Central Repository에서 최신 버전의 KCL을 확인할 수 있습니다.

소비자 구현

KCL 소비자 애플리케이션은 다음과 같은 주요 구성 요소로 구성됩니다.

RecordProcessor

RecordProcessor는 Kinesis 데이터 스트림 레코드를 처리하는 비즈니스 로직이 상주하는 핵심 구성 요소입니다. 애플리케이션이 Kinesis 스트림에서 수신하는 데이터를 처리하는 방법을 정의합니다.

주요 책임:

  • 샤드 처리 초기화

  • Kinesis 스트림의 레코드 배치 처리

  • 샤드 처리 종료(예: 샤드가 분할 또는 병합되거나 리스가 다른 호스트로 인계되는 경우)

  • 진행 상황 추적을 위한 체크포인트 처리

다음은 구현 예제를 보여줍니다.

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.*; import software.amazon.kinesis.processor.ShardRecordProcessor; public class SampleRecordProcessor implements ShardRecordProcessor { private static final String SHARD_ID_MDC_KEY = "ShardId"; private static final Logger log = LoggerFactory.getLogger(SampleRecordProcessor.class); private String shardId; @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.shardId(); MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber()); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Processing {} record(s)", processRecordsInput.records().size()); processRecordsInput.records().forEach(r -> log.info("Processing record pk: {} -- Seq: {}", r.partitionKey(), r.sequenceNumber()) ); // Checkpoint periodically processRecordsInput.checkpointer().checkpoint(); } catch (Throwable t) { log.error("Caught throwable while processing records. Aborting.", t); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Lost lease, so terminating."); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shardEnded(ShardEndedInput shardEndedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { MDC.put(SHARD_ID_MDC_KEY, shardId); try { log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } finally { MDC.remove(SHARD_ID_MDC_KEY); } } }

다음은 예제에서 사용된 각 메서드에 대한 상세 설명입니다.

initialize(InitializationInput initializationInput)

  • 용도: 레코드 처리에 필요한 리소스 또는 상태를 설정합니다.

  • 호출 시점: KCL이 이 레코드 프로세서에 샤드를 할당할 때 한 번 호출됩니다.

  • 중요 사항:

    • initializationInput.shardId(): 이 프로세서가 처리할 샤드의 ID입니다.

    • initializationInput.extendedSequenceNumber(): 처리를 시작할 시퀀스 번호입니다.

processRecords(ProcessRecordsInput processRecordsInput)

  • 용도: 수신 레코드를 처리하고 선택적으로 진행 상황을 체크포인트합니다.

  • 호출 시점: 레코드 프로세서가 샤드에 대한 리스를 유지하는 동안 반복됩니다.

  • 중요 사항:

    • processRecordsInput.records(): 처리할 레코드 목록입니다.

    • processRecordsInput.checkpointer(): 진행 상황을 체크포인트하는 데 사용됩니다.

    • KCL이 실패하지 않도록 처리 중에 예외를 처리했는지 확인합니다.

    • 예상치 못한 워커 충돌 또는 재시작 전에 체크포인트되지 않은 데이터와 같은 일부 시나리오에서 동일한 레코드가 두 번 이상 처리될 수 있으므로 이 방법은 멱등성이 있어야 합니다.

    • 데이터 일관성을 보장하기 위해 체크포인트를 지정하기 전에 항상 버퍼링된 데이터를 플러시합니다.

leaseLost(LeaseLostInput leaseLostInput)

  • 용도: 이 샤드 처리와 관련된 모든 리소스를 정리합니다.

  • 호출 시점: 다른 스케줄러가 이 샤드에 대한 리스를 인수하는 경우에 호출됩니다.

  • 중요 사항:

    • 이 메서드에서는 체크포인트이 허용되지 않습니다.

shardEnded(ShardEndedInput shardEndedInput)

  • 용도: 이 샤드 및 체크포인트에 대한 처리를 완료합니다.

  • 호출 시점: 샤드가 분할되거나 병합될 때 호출되어 이 샤드에 대한 모든 데이터가 처리되었음을 나타냅니다.

  • 중요 사항:

    • shardEndedInput.checkpointer(): 최종 체크포인트을 수행하는 데 사용됩니다.

    • 처리를 완료하려면 이 방법의 체크포인트이 필수입니다.

    • 여기에서 데이터와 체크포인트를 플러시하지 않으면 샤드를 다시 열 때 데이터 손실 또는 중복 처리가 발생할 수 있습니다.

shutdownRequested(ShutdownRequestedInput shutdownRequestedInput)

  • 용도: KCL이 종료될 때 체크포인트를 수행하고 리소스를 정리합니다.

  • 호출 시점: 애플리케이션이 종료되는 경우와 같이 KCL이 종료될 때 호출됩니다.

  • 중요 사항:

    • shutdownRequestedInput.checkpointer(): 종료 전에 체크포인트을 수행하는 데 사용됩니다.

    • 애플리케이션이 중지되기 전에 진행 상황이 저장되도록 메서드에 체크포인트을 구현했는지 확인합니다.

    • 여기에서 데이터와 체크포인트를 플러시하지 않으면 애플리케이션이 다시 시작될 때 데이터가 손실되거나 레코드가 재처리될 수 있습니다.

중요

KCL 3.x는 이전 워커를 종료하기 전에 체크포인트를 지정하여 한 워커에서 다른 워커로 리스를 인계할 때 데이터 재처리를 줄입니다. shutdownRequested() 메서드에서 체크포인트 로직을 구현하지 않으면 이 이점을 이용할 수 없습니다. shutdownRequested() 메서드 내에 체크포인트 로직을 구현했는지 확인합니다.

RecordProcessorFactory

RecordProcessorFactory는 새 RecordProcessor 인스턴스를 생성하는 역할을 합니다. KCL은 이 팩토리를 사용하여 애플리케이션이 처리해야 하는 각 샤드에 대해 새 RecordProcessor를 생성합니다.

주요 책임:

  • 온디맨드 방식으로 새 RecordProcessor 인스턴스 생성

  • 각 RecordProcessor가 올바르게 초기화되었는지 확인

다음은 구현 예제입니다.

import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class SampleRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new SampleRecordProcessor(); } }

이 예제에서는 shardRecordProcessor()가 호출될 때마다 팩토리에서 새 SampleRecordProcessor를 생성합니다. 필요한 초기화 로직을 포함하도록 이를 확장할 수 있습니다.

스케줄러

스케줄러는 KCL 애플리케이션의 모든 활동을 조정하는 상위 수준 구성 요소입니다. 데이터 처리의 전반적인 오케스트레이션을 담당합니다.

주요 책임:

  • RecordProcessors의 수명 주기 관리

  • 샤드에 대한 리스 관리 처리

  • 체크포인트 조정

  • 애플리케이션의 여러 워커 간에 샤드 처리 로드의 균형 조정

  • 정상적인 종료 및 애플리케이션 종료 신호 처리

스케줄러는 일반적으로 기본 애플리케이션에서 생성되고 시작됩니다. 스케줄러의 구현 예제는 기본 소비자 애플리케이션 섹션에서 확인할 수 있습니다.

기본 소비자 애플리케이션

기본 소비자 애플리케이션은 모든 구성 요소를 하나로 묶어줍니다. KCL 소비자 설정, 필요한 클라이언트 생성, 스케줄러 구성, 애플리케이션의 수명 주기 관리를 담당합니다.

주요 책임:

  • AWS 서비스 클라이언트(Kinesis, DynamoDB, CloudWatch) 설정

  • KCL 애플리케이션 구성

  • 스케줄러 생성 및 시작

  • 애플리케이션 종료 처리

다음은 구현 예제입니다.

import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; import java.util.UUID; public class SampleConsumer { private final String streamName; private final Region region; private final KinesisAsyncClient kinesisClient; public SampleConsumer(String streamName, Region region) { this.streamName = streamName; this.region = region; this.kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(this.region)); } public void run() { DynamoDbAsyncClient dynamoDbAsyncClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, streamName, kinesisClient, dynamoDbAsyncClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() ); Thread schedulerThread = new Thread(scheduler); schedulerThread.setDaemon(true); schedulerThread.start(); } public static void main(String[] args) { String streamName = "your-stream-name"; // replace with your stream name Region region = Region.US_EAST_1; // replace with your region new SampleConsumer(streamName, region).run(); } }

KCL은 기본적으로 전용 처리량으로 향상된 팬아웃(EFO) 소비자를 생성합니다. 향상된 팬아웃에 대한 자세한 내용은 전용 처리량으로 향상된 팬아웃 소비자 개발 섹션을 참조하세요. 소비자가 2명 미만이거나 200ms 미만의 읽기 전파 지연이 필요하지 않은 경우, 공유 처리량 소비자를 사용하도록 스케줄러 객체에서 다음 구성을 설정해야 합니다.

configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))

다음 코드는 공유 처리량 소비자를 사용하는 스케줄러 객체를 생성하는 예제입니다.

가져옵니다.

import software.amazon.kinesis.retrieval.polling.PollingConfig;

코드

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient)) );/