Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
Managed Service for Apache Flink にストリーミングデータソースを追加する
Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「Apache Flink ソース
Kinesis Data Streams を使用する
KinesisStreamsSource は Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。
の を作成するKinesisStreamsSource
次のコード例は、KinesisStreamsSource の作成を示しています。
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
KinesisStreamsSource の使用の詳細については、Apache Flink ドキュメントの「Amazon Kinesis Data Streams Connector
EFO コンシューマーを使用する KinesisStreamsSource を作成する
KinesisStreamsSource で拡張ファンアウト (EFO)
Kinesis コンシューマーが EFO を使用する場合、Kinesis Data Streams サービスは、コンシューマーがストリームの固定帯域幅を、ストリームから読み取る他のコンシューマーと共有するのではなく、独自の専用帯域幅を提供します。
Kinesis コンシューマーで EFO を使用する方法の詳細については、「FLIP-128: AWS Kinesis コンシューマー向けの拡張ファンアウト
EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。
READER_TYPE: アプリケーションが EFO コンシューマーを使用して Kinesis Data Streams データにアクセスできるようにするには、このパラメータを EFO に設定します。
EFO_CONSUMER_NAME: このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。
EFO を使用するように KinesisStreamsSource を設定するには、コンシューマーに以下のパラメータを追加します。
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、Github で が公開している Kinesis コネクタの例
Amazon MSK を使用する
KafkaSource ソースは Amazon MSK トピックからアプリケーションにストリーミングデータを提供します。
の を作成するKafkaSource
次のコード例は、KafkaSource の作成を示しています。
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
KafkaSource の使用方法の詳細については、「MSK レプリケーション」を参照してください。