Amazon Managed Service for Apache Flink(Amazon MSF)之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
将流数据源添加到 Managed Service for Apache Flink
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
使用 Kinesis 数据流
KinesisStreamsSource 从 Amazon Kinesis 数据流向应用程序提供流数据。
创建 KinesisStreamsSource
以下代码示例说明了如何创建 KinesisStreamsSource:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
有关使用的更多信息KinesisStreamsSource,请参阅 Apache Flink 文档中的 Amazon Kinesis Data Stream
创建使用 EFO 使用者的 KinesisStreamsSource
KinesisStreamsSource 现在支持增强型扇出功能(EFO)
如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让其与从流中读取数据的其他使用者共享流的固定带宽。
有关在 Kinesis 消费端上使用 EFO 的更多信息,请参阅 FLIP-128:Kinesis 消费者的增强型扇出 AWS
您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:
READER_TYPE:将此参数设置为 EFO,让您的应用程序使用 EFO 使用者访问 Kinesis Data Stream 数据。
EFO_CONSUMER_NAME:将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。
要将 a 配置KinesisStreamsSource为使用 EFO,请向使用者添加以下参数:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
有关使用 EFO 使用者的 Managed Service for Apache Flink 应用程序的示例,请参阅 Github 上 KinesisConnectors 公开示例
使用 Amazon MSK
KafkaSource源从 Amazon MSK 主题向您的应用程序提供流数据。
创建 KafkaSource
以下代码示例说明了如何创建 KafkaSource:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
有关使用 KafkaSource 的更多信息,请参阅MSK 复制。