View a markdown version of this page

将流数据源添加到 Managed Service for Apache Flink - Managed Service for Apache Flink

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将流数据源添加到 Managed Service for Apache Flink

Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源以从流中接收数据。本节介绍了可用于 Amazon 服务的源。

使用 Kinesis 数据流

KinesisStreamsSource 从 Amazon Kinesis 数据流向应用程序提供流数据。

创建 KinesisStreamsSource

以下代码示例说明了如何创建 KinesisStreamsSource

// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();

有关使用的更多信息KinesisStreamsSource,请参阅 Apache Flink 文档中的 Amazon Kinesis Data Stream s Connector 和我们在 Github 上的 KinesisConnectors 公开示例

创建使用 EFO 使用者的 KinesisStreamsSource

KinesisStreamsSource 现在支持增强型扇出功能(EFO)

如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让其与从流中读取数据的其他使用者共享流的固定带宽。

有关在 Kinesis 消费端上使用 EFO 的更多信息,请参阅 FLIP-128:Kinesis 消费者的增强型扇出 AWS

您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:

  • READER_TYPE:将此参数设置为 EFO,让您的应用程序使用 EFO 使用者访问 Kinesis Data Stream 数据。

  • EFO_CONSUMER_NAME:将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。

要将 a 配置KinesisStreamsSource为使用 EFO,请向使用者添加以下参数:

sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");

有关使用 EFO 使用者的 Managed Service for Apache Flink 应用程序的示例,请参阅 Github 上 KinesisConnectors 公开示例

使用 Amazon MSK

KafkaSource源从 Amazon MSK 主题向您的应用程序提供流数据。

创建 KafkaSource

以下代码示例说明了如何创建 KafkaSource

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

有关使用 KafkaSource 的更多信息,请参阅MSK 复制