Managed Service for Apache Flink にストリーミングデータソースを追加する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Managed Service for Apache Flink にストリーミングデータソースを追加する

Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「Apache Flink ソース」を使用してストリームからデータを受信します。このセクションでは、Amazon サービスで利用できるソースについて説明します。

Kinesis Data Streams を使用する

KinesisStreamsSource は Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。

の を作成するKinesisStreamsSource

次のコード例は、KinesisStreamsSource の作成を示しています。

// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();

KinesisStreamsSource の使用の詳細については、Apache Flink ドキュメントの「Amazon Kinesis Data Streams Connector」および Github で が公開している KinesisConnectors の例を参照してください。

EFO コンシューマーを使用する KinesisStreamsSource を作成する

KinesisStreamsSource拡張ファンアウト (EFO) がサポートされるようになりました。

Kinesis コンシューマーが EFO を使用する場合、Kinesis Data Streams サービスは、コンシューマーがストリームの固定帯域幅を、ストリームから読み取る他のコンシューマーと共有するのではなく、独自の専用帯域幅を提供します。

Kinesis コンシューマーで EFO を使用する方法の詳細については、「FLIP-128: AWS Kinesis コンシューマー向けの拡張ファンアウト」を参照してください。

EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。

  • READER_TYPE: アプリケーションが EFO コンシューマーを使用して Kinesis Data Streams データにアクセスできるようにするには、このパラメータを EFO に設定します。

  • EFO_CONSUMER_NAME: このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。

EFO を使用するように KinesisStreamsSource を設定するには、コンシューマーに以下のパラメータを追加します。

sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");

EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、Github で が公開している Kinesis コネクタの例を参照してください。

Amazon MSK を使用する

KafkaSource ソースは Amazon MSK トピックからアプリケーションにストリーミングデータを提供します。

の を作成するKafkaSource

次のコード例は、KafkaSource の作成を示しています。

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

KafkaSource の使用方法の詳細については、「MSK レプリケーション」を参照してください。