Managed Service for Apache Flink でシンクを使用してデータを書き込む - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Managed Service for Apache Flink でシンクを使用してデータを書き込む

アプリケーションコードでは、任意の Apache Flink シンク コネクタを使用して、Kinesis Data Streams や DynamoDB などの AWS サービスを含む外部システムに書き込むことができます。

Apache Flink には、ファイル用やソケット用のシンクも用意されており、カスタムシンクを実行することもできます。さまざまなシンクがサポートされていますが、その中でも、次のものがよく使用されます。

Kinesis Data Streams を使用する

Apache Flink では、「Kinesis Data Streams Connector」に関する情報が Apache Flink ドキュメントに記載されています。

Kinesis データストリームを入力と出力に使用するアプリケーションの例については、 チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要 を参照してください。

Apache Kafka と Amazon Managed Streaming for Apache Kafka (MSK) を使用する

Apache Flink Kafka コネクタは、Apache Kafka と Amazon MSK にデータを公開するための広範なサポートを提供します。これには、1 回限りの保証が含まれます。Kafka に書き込む方法については、Apache Flink ドキュメントの「Kafka Connectors examples」を参照してください。

Amazon S3 を使用する

Amazon S3 バケットにオブジェクトを書き込むには、Apache Flink StreamingFileSink を使用できます。

S3 にオブジェクトを書き込む方法の例については、 「例: Amazon S3 バケットに書き込む」 を参照してください。

Firehose を使用する

FlinkKinesisFirehoseProducer は、Firehose サービスを使用してアプリケーション出力を保存するための、信頼性が高くスケーラブルな Apache Flink シンクです。このセクションでは、Maven プロジェクトをセットアップして FlinkKinesisFirehoseProducer を作成・使用するための設定方法について説明します。

の を作成するFlinkKinesisFirehoseProducer

次のコード例は、FlinkKinesisFirehoseProducer の作成を示しています。

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer コード例

次のコード例は、FlinkKinesisFirehoseProducer を作成して設定し、Apache Flink データストリームから Firehose サービスにデータを送信する方法を示しています。

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Firehose シンクの使用方法に関する詳細なチュートリアルについては、「「例: Firehose に書き込む」」を参照してください。