

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 维持 Managed Service for Apache Flink 应用程序的最佳实践
<a name="best-practices"></a>

本节包含有关开发稳定的高性能 Managed Service for Apache Flink应用程序的信息和建议。

**Topics**
+ [尽可能减小 uber JAR 的大小](#minimize-uber-JAR)
+ [容错：检查点和保存点](#how-dev-bp-checkpoint)
+ [连接器版本不受支持](#how-dev-bp-connectors)
+ [性能和并行度](#how-dev-bp-performance)
+ [设置每个运算符的并行度](#how-dev-bp-parallelism)
+ [日志记录](#how-dev-bp-logging)
+ [编码](#how-dev-bp-code)
+ [管理凭证。](#how-dev-bp-managing-credentials)
+ [从分片/分区很少的源中读取](#troubleshooting-few-shards-partitions)
+ [Studio 笔记本刷新间隔](#notebook-refresh-rate)
+ [Studio 笔记本的最佳性能](#notebook-refresh-rate)
+ [水印策略和空闲分片如何影响时间窗口](#notebook-watermarking)
+ [为所有运算符设置 UUID](#best-practices-setting-operator-ids)
+ [添加 ServiceResourceTransformer 到 Maven 阴影插件](#best-practices-service-resource-transformer)

## 尽可能减小 uber JAR 的大小
<a name="minimize-uber-JAR"></a>

Java/Scala application must be packaged in an uber (super/fat) JAR，并包括运行时尚未提供的所有其他必需依赖项。但是，uber JAR 的大小会影响应用程序的启动和重新启动时间，并可能导致 JAR 超过 512 MB 的限制。

为优化部署时间，uber JAR **不**应包含以下内容：
+ **运行时提供的任何依赖项**，如以下示例所示。它们应该在 POM 文件中具有 `compileOnly` 范围或在 Gradle 配置中具有 `provided` 范围。
+ **任何仅用于测试的依赖项**，例如 JUnit 或 Mockito。它们应该在 POM 文件中具有 `testImplementation` 范围或在 Gradle 配置中具有 `test` 范围。
+ **您的应用程序未实际使用的任何依赖项**。
+ **您的应用程序所需的任何静态数据或元数据。**静态数据应由应用程序在运行时加载，例如从数据存储或 Amazon S3 加载。
+ 有关上述配置设置的详细信息，请参阅此 [POM 示例文件](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/java/GettingStarted/pom.xml)。

**提供的依赖项**

Managed Service for Apache Flink 运行时提供许多依赖项。这些依赖项不应包含在 fat JAR 中，并且必须在 POM 文件中具有 `provided` 范围，或者在 `maven-shade-plugin` 配置中明确排除。fat JAR 中包含的任何依赖项在运行时都会被忽略，但会增加 JAR 的大小，从而在部署期间带来额外开销。

运行时版本 1.18、1.19 和 1.20 中由运行时提供的依赖项：
+ `org.apache.flink:flink-core`
+ `org.apache.flink:flink-java`
+ `org.apache.flink:flink-streaming-java`
+ `org.apache.flink:flink-scala_2.12`
+ `org.apache.flink:flink-table-runtime`
+ `org.apache.flink:flink-table-planner-loader`
+ `org.apache.flink:flink-json`
+ `org.apache.flink:flink-connector-base`
+ `org.apache.flink:flink-connector-files`
+ `org.apache.flink:flink-clients`
+ `org.apache.flink:flink-runtime-web`
+ `org.apache.flink:flink-metrics-code`
+ `org.apache.flink:flink-table-api-java`
+ `org.apache.flink:flink-table-api-bridge-base`
+ `org.apache.flink:flink-table-api-java-bridge`
+ `org.apache.logging.log4j:log4j-slf4j-impl`
+ `org.apache.logging.log4j:log4j-api`
+ `org.apache.logging.log4j:log4j-core`
+ `org.apache.logging.log4j:log4j-1.2-api`

此外，运行时还提供用于在 Managed Service for Apache Flink 中获取应用程序运行时属性的库，`com.amazonaws:aws-kinesisanalytics-runtime:1.2.0`。

运行时提供的所有依赖项都必须使用以下建议，以**免**将其包含在 uber JAR 中：
+ 在 Maven（`pom.xml`）和 SBT（`build.sbt`）中，使用 `provided` 范围。
+ 在 Gradle（`build.gradle`）中，使用 `compileOnly` 配置。

由于 Apache Flink 的父类优先加载，任何意外包含在 uber JAR 中的依赖项都将在运行时被忽略。有关更多信息，请参阅 Apache Flink 文档[parent-first-patterns](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default)中的。

**连接器**

运行时中未包含的大多数 FileSystem 连接器（连接器除外）都必须包含在默认作用域 (`compile`) 的 POM 文件中。

**其他建议**

通常，提供给 Managed Service for Apache Flink 的 Apache Flink uber JAR 应包含运行应用程序所需的最少量代码。包含源类、测试数据集或引导状态的依赖项不应包含在此 JAR 文件中。如果需要在运行时提取静态资源，请将此问题分成诸如 Amazon S3 之类的资源。此方面的示例包括状态引导或推理模型。

花点时间考虑您的深度依赖项树并移除非运行时依赖项。

尽管 Managed Service for Apache Flink 支持 512MB 的 jar 大小，但这应视为规则的例外情况。Apache Flink 目前通过其默认配置支持大约 104MB 的 jar 大小，这应该是所需 jar 的最大目标大小。

## 容错：检查点和保存点
<a name="how-dev-bp-checkpoint"></a>

使用检查点和保存点在 Managed Service for Apache Flink 应用程序中实施容错功能。在开发和维护应用程序时，请牢记以下几点：
+ 我们建议您为应用程序保持启用检查点。在计划维护期间以及由于服务问题、应用程序依赖项故障以及其他问题而导致意外故障，检查点可以为应用程序提供容错功能。有关定期维护的更多信息，请参阅[管理 Managed Service for Apache Flink 的维护任务](maintenance.md)。
+ `false`在应用程序开发或故障排除期间SnapshotsEnabled将[ApplicationSnapshotConfiguration::](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) 设置为。在每次应用程序停止期间，将会创建一个快照；如果应用程序处于不正常状态或性能不佳，则可能会出现问题。在应用程序处于生产状态并保持稳定后，将 `SnapshotsEnabled` 设置为 `true`。
**注意**  
我们建议您设置应用程序以每天创建几次快照，以便使用正确的状态数据正确重启。正确的快照频率取决于应用程序的业务逻辑。频繁拍摄快照可让您恢复更新的数据，但会增加成本并需要更多的系统资源。

  有关监控应用程序停机时间的信息，请参阅[Managed Service for Apache Flink 中的指标和维度](metrics-dimensions.md)。

有关实施容错功能的更多信息，请参阅[实施容错能力](how-fault.md)。

## 连接器版本不受支持
<a name="how-dev-bp-connectors"></a>

从 Apache Flink 1.15 或更高版本开始，如果应用程序使用捆绑到应用程序中的不受支持的 Kinesis 连接器版本，则适用于 Apache Flink 的托管服务会自动阻止应用程序启动或更新。 JARs升级到 Managed Service for Apache Flink 1.15 或更高版本时，请确保使用的是最新的 Kinesis 连接器。可以是 1.15.2 或更高版本的任何版本。Managed Service for Apache Flink 不支持所有其他版本，因为它们可能会导致一致性问题或故障，而**使用保存点停止**功能会防止清理停止/更新操作。要了解有关 Amazon Managed Service for Apache Flink 版本中连接器兼容性的更多信息，请参阅[Apache Flink 连接器](https://docs.aws.amazon.com/managed-flink/latest/java/how-flink-connectors.html)。

## 性能和并行度
<a name="how-dev-bp-performance"></a>

应用程序可以调整应用程序并行度并避免性能陷阱，从而进行扩展以满足任何吞吐量级别要求。在开发和维护应用程序时，请牢记以下几点：
+ 验证是否充分预置了所有应用程序源和接收器，而不会受到限制。如果源和接收器是其他 AWS 服务，请使用监控这些服务[CloudWatch](https://docs.aws.amazon.com/cloudwatch/?id=docs_gateway)。
+ 对于并行度较高的应用程序，请检查是否将较高的并行度应用于应用程序中的所有操作符。默认情况下，Apache Flink 为应用程序图中的所有操作符应用相同的应用程序并行度。这可能会导致在源或接收器上出现预置问题，或者出现操作符数据处理瓶颈。您可以使用 [setParallelism](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/parallel.html) 更改代码中的每个操作符的并行度。
+ 了解应用程序中的操作符的并行度设置的含义。如果更改操作符的并行度，您可能无法从操作符并行度与当前设置不兼容时创建的快照中还原应用程序。有关设置运算符并行度的更多信息，请参阅[为运算符显式设置最大并行度](https://nightlies.apache.org/flink/flink-docs-release-1.15/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly)。

有关实施扩展的更多信息，请参阅[实施应用程序扩展](how-scaling.md)。

## 设置每个运算符的并行度
<a name="how-dev-bp-parallelism"></a>

默认情况下，所有运算符均在应用程序级别设置并行度。您可以使用 DataStream API 覆盖单个运算符的并行度。`.setParallelism(x)`您可以将运算符并行度设置为等于或低于应用程序并行度的任一并行度。

如果可能，将运算符并行度定义为应用程序并行度的函数。这样，运算符的并行度就会随应用程序的并行度而变化。例如，如果您使用自动缩放，则所有运算符都将以相同的比例改变其并行度：

```
int appParallelism = env.getParallelism();
...
...ops.setParalleism(appParallelism/2);
```

在某些情况下，您可能希望将运算符并行度设置为常数。例如，将 Kinesis Stream 源的并行度设置为分片数。在这些情况下，可考虑将运算符并行度作为应用程序配置参数传递，以在不更改代码的情况下对其进行更改，例如对源流进行重新分片。

## 日志记录
<a name="how-dev-bp-logging"></a>

您可以使用 CloudWatch 日志监控应用程序的性能和错误情况。为应用程序配置日志记录时，请牢记以下几点：
+ 启用应用程序的 CloudWatch 日志记录，以便可以调试任何运行时问题。
+ 不要为应用程序中处理的每条记录创建一个日志条目。这会在处理期间出现严重瓶颈，并且可能会导致数据处理反向压力。
+ 创建 CloudWatch 警报，以便在应用程序无法正常运行时通知您。有关更多信息，请参阅 [在适用于 Apache Flink 的亚马逊托管服务中使用 CloudWatch 警报](monitoring-metrics-alarms.md)。

有关实施日志记录的更多信息，请参阅[](monitoring-overview.md)。

## 编码
<a name="how-dev-bp-code"></a>

您可以使用建议的编程做法以提高应用程序性能和稳定性。在编写应用程序代码时，请牢记以下几点：
+ 不要在应用程序代码（应用程序的 `main` 方法或用户定义的函数）中使用 `system.exit()`。如果要从代码中关闭应用程序，请引发一个从 `Exception` 或 `RuntimeException` 派生的异常，其中包含有关应用程序出现的错误的消息。

  请注意下面有关该服务如何处理此类异常的信息：
  + 如果异常是从应用程序的 `main` 方法中引发的，在应用程序转变为 `RUNNING` 状态时，该服务将其封装在 `ProgramInvocationException` 中，并且任务管理器无法提交任务。
  + 如果异常是从用户定义的函数中引发的，任务管理器使任务失败并重新启动，并将异常详细信息写入到异常日志中。
+ 请考虑为应用程序 JAR 文件及其包含的依赖项填充阴影。如果应用程序和 Apache Flink 运行时系统的程序包名称存在潜在的冲突，则建议填充阴影。如果发生冲突，则应用程序日志可能包含 `java.util.concurrent.ExecutionException` 类型的异常。有关为应用程序 JAR 文件填充阴影的更多信息，请参阅 [Apache Maven Shade 插件](https://maven.apache.org/plugins/maven-shade-plugin/)。

## 管理凭证。
<a name="how-dev-bp-managing-credentials"></a>

您不应在生产（或任何其他）应用程序中加入任何长期凭证。长期凭证很可能会被签入版本控制系统，很容易丢失。相反，您可以将角色与 Managed Service for Apache Flink 应用程序关联并为该角色授予权限。然后，正在运行的 Flink 应用程序可以从环境中选择具有相应权限的临时证书。如果未与 IAM 原生集成的服务（例如，需要用户名和密码进行身份验证的数据库）需要身份验证，则应考虑将密钥存储在 [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) 中。

许多 AWS 本机服务都支持身份验证：
+ [Kinesis Data Stream ProcessTaxiStream s — .java](hhttps://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/ProcessTaxiStream.java#L90)
+ 亚马逊 MSK — [https://github.com/aws/aws-msk-iam-auth/using-the-amazon-msk](https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication)\$1-library-for-iam-authentication
+ [亚马逊 Elasticsearch Service — .java AmazonElasticsearchSink](https://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/operators/AmazonElasticsearchSink.java)
+ Amazon S3 — 在 Managed Service for Apache Flink 上开箱即用

## 从分片/分区很少的源中读取
<a name="troubleshooting-few-shards-partitions"></a>

从 Apache Kafka 或 Kinesis 数据流读取数据时，流的并行度（Kafka 的分区数和 Kinesis 的分片数）与应用程序的并行度之间可能存在不匹配。在简单的设计中，应用程序的并行度不能超出流的并行度：源运算符的每个子任务只能从 1 个或多个分片/分区读取。这意味着，对于一个只有 2 个分片的流和一个并行度为 8 的应用程序，实际上只有两个子任务从流中消耗，6 个子任务处于空闲状态。这会大大限制应用程序的吞吐量，尤其是在反序列化成本高昂且由源端执行的情况下（这是默认设置）。

为了减轻这种影响，可以扩展流。但这不一定可取或可行。或者，您可以重构源，使其不进行任何序列化，而只是在 `byte[]` 上传递。然后，您可以[重新平衡](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/)数据，使其在所有任务中均匀分布，然后在那里反序列化数据。通过这种方式，您可以利用所有子任务进行反序列化，并且这种可能昂贵的操作不再受流数量 shards/partitions 的约束。

## Studio 笔记本刷新间隔
<a name="notebook-refresh-rate"></a>

如果更改段落结果刷新间隔，请将其值设置为不低于 1000 毫秒。

## Studio 笔记本的最佳性能
<a name="notebook-refresh-rate"></a>

我们使用以下语句进行了测试，`events-per-second` 乘以 `number-of-keys` 低于 25,000,000 时获得了最佳性能。`events-per-second` 低于 150,000。

```
SELECT key, sum(value) FROM key-values GROUP BY key
```

## 水印策略和空闲分片如何影响时间窗口
<a name="notebook-watermarking"></a>

从 Apache Kafka 和 Kinesis 数据流读取事件时，源可以根据流的属性设置事件时间。对于 Kinesis，事件时间等于事件的大致到达时间。但是，在源为事件设置事件时间不足以让 Flink 应用程序使用事件时间。源还必须生成水印，将有关事件时间的信息从源传播到所有其他运算符。[Flink 文档](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/)很好地概述了该过程的工作原理。

默认情况下，从 Kinesis 读取的事件的时间戳设置为 Kinesis 确定的近似到达时间。要使事件时间在应用程序中发挥作用，另一个先决条件是水印策略。

```
WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(...));
```

然后，使用`assignTimestampsAndWatermarks`方法将水印策略应用于`DataStream`。有一些有用的内置策略：
+ `forMonotonousTimestamps()`只会使用事件时间（大概到达时间），并定期将最大值作为水印发出（针对每个特定的子任务）
+ `forBoundedOutOfOrderness(Duration.ofSeconds(...))`与之前的策略类似，但会使用事件时间 — 生成水印的持续时间。

来自 [Flink 文档](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/)：

*源函数的每个并行子任务通常会独立生成其水印。这些水印定义了该特定并行源上的事件时间。*

*当水印流经流式处理程序时，它们会推进到达运算符的事件时间。每当运算符推进其事件时间时，它都会为其后继运算符在下游生成一个新的水印。*

*有些运算符消耗多个输入流；例如，一个并集或 keyBy(…) 或分区(…) 函数之后的运算符。此类运算符的当前事件时间是其输入流事件时间的最小值。当其输入流更新其事件时间时，运算符也会更新。*

这意味着，如果源子任务从空闲分片中消耗，则下游运算符不会从该子任务中收到新的水印，因此所有使用时间窗口的下游运算符的处理都会停止。为避免这种情况，客户可以在水印策略中添加`withIdleness`选项。使用该选项，在计算运算符的事件时间时，运算符会将水印从空闲的上游子任务中排除。因此，空闲子任务不再阻碍下游运算符的事件时间的推进。

根据您使用的分片分配器，可能不会为某些工作线程分配任何 Kinesis 分片。在这种情况下，即使所有 Kinesis 分片都持续提供事件数据，这些工作线程也会表现出空闲源行为。您可以通过将 `uniformShardAssigner` 与源运算符结合使用来降低这种风险。这样可以确保所有源子任务都有要处理的分片，前提为工作线程的数量小于或等于活动分片的数量。

但是，如果没有子任务在读取任何事件（即流中没有事件），则带有内置水印策略的空闲选项不会推进事件时间。对于从流中读取有限事件集的测试用例来说，这一点尤其明显。由于读取最后一个事件后，事件时间不会推进，因此最后一个窗口（包含最后一个事件）将不会关闭。

### Summary
<a name="notebook-watermarking-summary"></a>
+ 如果分片处于空闲状态，则 `withIdleness` 设置将不会生成新的水印。它会将空闲子任务发送的最后一个水印排除在下游运算符的最小水印计算之外。
+ 使用内置水印策略，最后一个打开的窗口将不会关闭（除非将发送推进水印的新事件，但这会创建一个随后保持打开状态的新窗口）。
+ 即使时间由 Kinesis 流设定，但如果一个分片的消耗速度比其他分片快（例如，在应用程序初始化期间，或者在并行消耗所有现有分片时忽略它们的关系），仍可能发生延迟到来的事件。`TRIM_HORIZON` parent/child 
+ 水印策略的 `withIdleness` 设置似乎中断空闲分片 `(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS` 的 Kinesis 源特定设置。

### 示例
<a name="notebook-watermarking-example"></a>

以下应用程序正在从流中读取数据，并根据事件时间创建会话窗口。

```
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig);

WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(15));
    
env.addSource(consumer)
    .assignTimestampsAndWatermarks(s)
    .map(new MapFunction<String, Long>() {
        @Override
        public Long map(String s) throws Exception {
            return Long.parseLong(s);
        }
    })
    .keyBy(l -> 0l)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() {
        @Override
        public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception {
            long count = StreamSupport.stream(iterable.spliterator(), false).count();
            long timestamp = context.currentWatermark();

            System.out.print("XXXXXXXXXXXXXX Window with " + count + " events");
            System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp));


            for (Long l : iterable) {
                System.out.println(l);
            }
        }
    });
```

在以下示例中，8 个事件被写入一个 16 个分片流（前 2 个和最后一个事件恰好落在同一个分片中）。

```
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ==
$ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg==
$ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028721934184977530127978070210"
}
{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028795678659974022576354623682"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275134360684221592378842022114"
}
Wed Mar 23 11:19:57 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA==
$ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ==
$ date

{
    "ShardId": "shardId-000000000010",
    "SequenceNumber": "49627894338570054070103749783042116732419934393936642210"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275659034489934342334017700066"
}
Wed Mar 23 11:20:10 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng==
$ date

{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49627894338369347363316974173886988345467035365375213586"
}
Wed Mar 23 11:20:22 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw==
$ date

{
    "ShardId": "shardId-000000000008",
    "SequenceNumber": "49627894338525452579706688535878947299195189349725503618"
}
Wed Mar 23 11:20:34 CET 2022

$ sleep 60
$ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811029600823255837371928900796610"
}
Wed Mar 23 11:21:27 CET 2022
```

此输入应生成 5 个会话窗口：事件 1、2、3；事件 4、5；事件 6；事件 7；事件 8。但是，该程序仅生成前 4 个窗口。

```
11:59:21,529 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,531 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:23,209 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,244 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z
11:59:23,377 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,405 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,581 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,586 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:24,790 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z
event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z
event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z
11:59:24,907 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z
event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z
event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z
event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z
XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z
3
1
2
XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
4
5
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
6
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z
7
```

输出仅显示 4 个窗口（缺少包含事件 8 的最后一个窗口）。这是由于事件时间和水印策略造成的。最后一个窗口无法关闭，因为使用预先构建的水印策略，时间永远不会超过从流中读取的最后一个事件的时间。但是要关闭窗口，时间需要早于最后一个事件发生后 10 秒以上。在本例中，最后一个水印是 2022-03-23T10:21:27.170Z，但是为了关闭会话窗口，需要在 10.001 秒后添加水印。

如果从水印策略中移除 `withIdleness` 选项，则任何会话窗口都不会关闭，因为窗口运算符的“全局水印”无法向前推进。

当 Flink 应用程序启动时（或者如果存在数据偏差），某些分片的消耗速度可能比其他分片快。这可能会导致子任务过早发出一些水印（子任务可能会根据一个分片的内容发出水印，而不会从其订阅的其他分片消耗）。缓解的方法是使用不同的水印策略，可以添加安全缓冲区 `(forBoundedOutOfOrderness(Duration.ofSeconds(30))` 或明确允许延迟到达的事件 `(allowedLateness(Time.minutes(5))`。

## 为所有运算符设置 UUID
<a name="best-practices-setting-operator-ids"></a>

当 Managed Service for Apache Flink 为带有快照的应用程序启动 Flink 任务时，Flink 任务可能由于某些问题而无法启动。其中一个原因是*运算符 ID 不匹配*。Flink 期望为 Flink 作业图运算符 IDs 提供明确、一致的运算符。如果未明确设置，Flink 会为运算符生成 ID。这是因为 Flink 使用这些运算符 IDs 来唯一标识作业图中的运算符，并使用它们将每个运算符的状态存储在保存点中。

*当 Flink 找不到作业图的运算符和保存点中 IDs 定义的运算符 IDs 之间的 1:1 映射时，就会出现运算符 ID 不匹配*问题。当未设置显式一致运算符， IDs 而 Flink 生成的运算符可能与创建的每个任务图都不一致时，就会发生 IDs 这种情况。在维护运行期间，应用程序遇到此问题的可能性很高。为避免这种情况，我们建议客户在 Flink 代码中为所有运算符设置 UUID。有关更多信息，请参阅[生产就绪](https://docs.aws.amazon.com/managed-flink/latest/java/production-readiness.html)下的*为所有运算符设置 UUID* 主题。

## 添加 ServiceResourceTransformer 到 Maven 阴影插件
<a name="best-practices-service-resource-transformer"></a>

Flink 使用 Java 的[服务提供者接口 (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) 来加载连接器和格式等组件。使用 SPI 的多个 Flink 依赖项[可能会导致 uber-jar 中的冲突](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources)和意外的应用程序行为。我们建议你添加 pom.xml 中[ServiceResourceTransformer](https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html)定义的 Maven shade 插件。

```
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
        </plugin>
```