기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
KCL을 사용한 멀티스트림 처리
이 섹션에서는 둘 이상의 데이터 스트림을 동시에 처리할 수 있는 KCL 소비자 애플리케이션을 생성할 수 있는 KCL의 필수 변경 사항에 대해 설명합니다.
중요
-
멀티스트림 처리는 KCL 2.3 이상에서만 지원됩니다.
-
에서 실행되는 Java 이외의 언어로 작성된 KCL 소비자는 멀티스트림 처리가 지원되지 않습니다
multilangdaemon
. -
멀티스트림 처리는 KCL 1.x 버전에서는 지원되지 않습니다.
-
MultistreamTracker 인터페이스
-
여러 스트림을 동시에 처리할 수 있는 소비자 애플리케이션을 구축하려면 MultistreamTracker
라는 새 인터페이스를 구현해야 합니다. 이 인터페이스에는 KCL 소비자 애플리케이션에서 처리할 데이터 스트림 및 해당 구성 목록을 반환하는 streamConfigList
메서드가 포함되어 있습니다. 처리 중인 데이터 스트림은 소비자 애플리케이션 런타임 중에 변경될 수 있습니다.streamConfigList
는 처리할 데이터 스트림의 변경 사항에 대해 알아보기 위해 KCL에 의해 주기적으로 호출됩니다. -
는 StreamConfig
목록을 streamConfigList
채웁니다.
package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
-
StreamIdentifier
및InitialPositionInStreamExtended
는 필수 필드이고는 선택 사항consumerArn
입니다. KCL을 사용하여 향상된 팬아웃 소비자 애플리케이션을 구현하는consumerArn
경우에만를 제공해야 합니다. -
에 대한 자세한 내용은 https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L129
StreamIdentifier
참조하십시오. 를 생성하려면streamArn
및 KCL 2.5.0 이상에서 사용할 수streamCreationEpoch
있는에서 멀티스트림 인스턴스를 생성하는StreamIdentifier
것이 좋습니다.streamArm
를 지원하지 않는 KCL v2.3 및 v2.4에서는account-id:StreamName:streamCreationTimestamp
형식을 사용하여 멀티스트림 인스턴스를 생성하세요. 이 형식은 사용되지 않으며 다음 메이저 릴리스부터 더 이상 지원되지 않습니다. -
MultistreamTracker에는 리스 테이블(formerStreamsLeasesDeletionStrategy)에서 이전 스트림의 리스를 삭제하기 위한 전략도 포함되어 있습니다. 소비자 애플리케이션 런타임 중에는 전략을 변경할 수 없다는 점에 유의하세요. 자세한 내용은 https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java
참조하십시오.
-
또는 동시에 여러 스트림을 처리하는 KCL 소비자 애플리케이션을 구현하려는 경우 MultiStreamTracker
로 ConfigsBuilder를 초기화할 수 있습니다.
* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
KCL 소비자 애플리케이션에 대해 멀티스트림 지원이 구현되면 이제 애플리케이션 리스 테이블의 각 행에이 애플리케이션이 처리하는 여러 데이터 스트림의 샤드 ID와 스트림 이름이 포함됩니다.
-
KCL 소비자 애플리케이션에 대한 멀티스트림 지원이 구현되면 leaseKey는 구조를 취합니다
account-id:StreamName:streamCreationTimestamp:ShardId
. 예를 들어111111111:multiStreamTest-1:12345:shardId-000000000336
입니다.
중요
기존 KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하도록 구성된 경우 leaseKey
(리스 테이블의 파티션 키)는 샤드 ID입니다. 여러 데이터 스트림을 처리하도록 기존 KCL 소비자 애플리케이션을 재구성하면 멀티스트림account-id:StreamName:StreamCreationTimestamp:ShardId
을 지원하는 leaseKey
구조가 다음과 같아야 하므로 리스 테이블이 끊어집니다.