

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将流数据源添加到 Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中，您可以使用 [Apache Flink 源](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)以从流中接收数据。本节介绍了可用于 Amazon 服务的源。

## 使用 Kinesis 数据流
<a name="input-streams"></a>

`KinesisStreamsSource` 从 Amazon Kinesis 数据流向应用程序提供流数据。

### 创建 `KinesisStreamsSource`
<a name="input-streams-create"></a>

以下代码示例说明了如何创建 `KinesisStreamsSource`：

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

有关使用的更多信息`KinesisStreamsSource`，请参阅 [Apache Flink 文档中的 Amazon Kinesis Data Stream](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) s Connector [和我们在 Github 上的 KinesisConnectors 公开示例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

### 创建使用 EFO 使用者的 `KinesisStreamsSource`
<a name="input-streams-efo"></a>

`KinesisStreamsSource` 现在支持[增强型扇出功能（EFO）](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)。

如果 Kinesis 使用者使用 EFO，则 Kinesis Data Streams 服务会为其提供自己的专用带宽，而不是让其与从流中读取数据的其他使用者共享流的固定带宽。

有关在 Kinesis 消费端上使用 EFO 的更多信息，[请参阅 FLIP-128：Kinesis 消费者的增强型扇出 AWS](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)。

您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者：
+ **READER\$1TYPE：**将此参数设置为 **EFO**，让您的应用程序使用 EFO 使用者访问 Kinesis Data Stream 数据。
+ **EFO\$1CONSUMER\$1NAME：**将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称，会导致之前使用该名称的使用者被终止。

要将 a 配置`KinesisStreamsSource`为使用 EFO，请向使用者添加以下参数：

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

有关使用 EFO 使用者的 Managed Service for Apache Flink 应用程序的示例，请参阅 [Github 上 KinesisConnectors 公开示例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

## 使用 Amazon MSK
<a name="input-msk"></a>

`KafkaSource`源从 Amazon MSK 主题向您的应用程序提供流数据。

### 创建 `KafkaSource`
<a name="input-msk-create"></a>

以下代码示例说明了如何创建 `KafkaSource`：

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

有关使用 `KafkaSource` 的更多信息，请参阅[MSK 复制](earlier.md#example-msk)。