

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 維護 Managed Service for Apache Flink 應用程式的最佳實務
<a name="best-practices"></a>

本節包含開發穩定、高效能 Managed Service for Apache Flink 應用程式的資訊和建議。

**Topics**
+ [最小化 uber JAR 的大小](#minimize-uber-JAR)
+ [容錯能力：檢查點和儲存點](#how-dev-bp-checkpoint)
+ [不受支援的連接器版本](#how-dev-bp-connectors)
+ [效能與平行處理層級](#how-dev-bp-performance)
+ [設定每個運算子的平行處理層級](#how-dev-bp-parallelism)
+ [記錄](#how-dev-bp-logging)
+ [編碼](#how-dev-bp-code)
+ [管理憑證](#how-dev-bp-managing-credentials)
+ [從具有較少碎片/分割區的來源讀取](#troubleshooting-few-shards-partitions)
+ [Studio 筆記本重新整理間隔](#notebook-refresh-rate)
+ [Studio 筆記本最佳效能](#notebook-refresh-rate)
+ [浮水印策略和閒置碎片如何影響時間範圍](#notebook-watermarking)
+ [為所有運算子設定 UUID](#best-practices-setting-operator-ids)
+ [將 ServiceResourceTransformer 新增至 Maven 著色外掛程式](#best-practices-service-resource-transformer)

## 最小化 uber JAR 的大小
<a name="minimize-uber-JAR"></a>

Java/Scala 應用程式必須封裝在 uber （超級/重度） JAR 中，並包含執行時間尚未提供的所有其他必要相依性。不過，uber JAR 的大小會影響應用程式的啟動和重新啟動時間，並可能導致 JAR 超過 512 MB 的限制。

若要最佳化部署時間，您的 uber JAR **不應**包含下列項目：
+ **執行時間提供的任何相依性**，如下列範例所示。它們應該在 POM 檔案或 Gradle 組態`compileOnly`中具有`provided`範圍。
+ **任何僅用於測試的相依性**，例如 JUnit 或 Mockito。它們應該在 POM 檔案或 Gradle 組態`testImplementation`中具有`test`範圍。
+ **您的應用程式未實際使用的任何相依性**。
+ **您的應用程式所需的任何靜態資料或中繼資料。**應用程式應在執行時間載入靜態資料，例如從資料存放區或從 Amazon S3 載入。
+ 如需上述組態設定的詳細資訊，請參閱此 [POM 範例檔案](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/blob/main/java/GettingStarted/pom.xml)。

**提供的相依性**

Managed Service for Apache Flink 執行時間提供許多相依性。這些相依性不應包含在重型 JAR 中，且必須在 POM 檔案中具有`provided`範圍，或在`maven-shade-plugin`組態中明確排除。在執行時間會忽略包含在重型 JAR 中的任何相依性，但在部署期間增加 JAR 增加額外負荷的大小。

執行時間在執行時間版本 1.18、1.19 和 1.20 中提供的相依性：
+ `org.apache.flink:flink-core`
+ `org.apache.flink:flink-java`
+ `org.apache.flink:flink-streaming-java`
+ `org.apache.flink:flink-scala_2.12`
+ `org.apache.flink:flink-table-runtime`
+ `org.apache.flink:flink-table-planner-loader`
+ `org.apache.flink:flink-json`
+ `org.apache.flink:flink-connector-base`
+ `org.apache.flink:flink-connector-files`
+ `org.apache.flink:flink-clients`
+ `org.apache.flink:flink-runtime-web`
+ `org.apache.flink:flink-metrics-code`
+ `org.apache.flink:flink-table-api-java`
+ `org.apache.flink:flink-table-api-bridge-base`
+ `org.apache.flink:flink-table-api-java-bridge`
+ `org.apache.logging.log4j:log4j-slf4j-impl`
+ `org.apache.logging.log4j:log4j-api`
+ `org.apache.logging.log4j:log4j-core`
+ `org.apache.logging.log4j:log4j-1.2-api`

此外，執行期提供程式庫，用於擷取 Managed Service for Apache Flink 中的應用程式執行期屬性`com.amazonaws:aws-kinesisanalytics-runtime:1.2.0`。

執行時間提供的所有相依性都必須使用下列建議，才**不會**將其包含在 uber JAR 中：
+ 在 Maven (`pom.xml`) 和 SBT (`build.sbt`) 中，使用`provided`範圍。
+ 在 Gradle (`build.gradle`) 中，使用 `compileOnly` 組態。

由於 Apache Flink 的父系優先類別載入，在執行時間會忽略意外包含在 uber JAR 中的任何提供的相依性。如需詳細資訊，請參閱 Apache Flink 文件中的[parent-first-patterns](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default)。

**連接器**

除了 FileSystem 連接器之外，執行時間中未包含的大多數連接器都必須包含在具有預設範圍 () 的 POM 檔案中`compile`。

**其他建議**

一般而言，提供給 Managed Service for Apache Flink 的 Apache Flink uber JAR 應包含執行應用程式所需的最低程式碼。包含包含來源類別、測試資料集或引導狀態的相依性不應包含在此 jar 中。如果需要在執行時間提取靜態資源，請將此問題分成 Amazon S3 等資源。此範例包括狀態引導或推論模型。

花一些時間考慮您的深層相依性樹狀結構，並移除非執行時間相依性。

雖然 Managed Service for Apache Flink 支援 512MB jar 大小，但這應該視為規則的例外狀況。Apache Flink 目前透過其預設組態支援約 104MB 的 jar 大小，這應該是所需 jar 的目標大小上限。

## 容錯能力：檢查點和儲存點
<a name="how-dev-bp-checkpoint"></a>

使用檢查點和儲存點在 Managed Service for Apache Flink 應用程式中實作容錯能力。開發和維護應用程式時，請謹記下列各項：
+ 建議您為應用程式保持啟用檢查點。檢查點可在排程維護期間為您的應用程式提供容錯能力，以及因服務問題、應用程式相依性故障和其他問題而導致的非預期故障。如需排程維護的相關資訊，請參閱[管理 Managed Service for Apache Flink 的維護任務](maintenance.md)。
+ 在應用程式開發或疑難排解期間，將 [ApplicationSnapshotConfiguration::SnapshotsEnabled](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) 設定為 `false`。每次應用程式停止時都會建立快照，如果應用程式處於運作狀態不佳或效能不佳，這可能會造成問題。當應用程式進入生產環境且狀態穩定之後，將 `SnapshotsEnabled` 設定為 `true`。
**注意**  
我們建議您將應用程式設定為每天建立快照數次，以使用正確的狀態資料正確重新啟動。快照的正確頻率取決於應用程式的業務邏輯。經常拍攝快照可讓您復原較新的資料，但會增加成本並需要更多系統資源。

  如需監控應用程式停機時間的相關資訊，請參閱 [Managed Service for Apache Flink 中的指標和維度](metrics-dimensions.md)。

如需實作容錯能力的詳細資訊，請參閱 [實作容錯能力](how-fault.md)。

## 不受支援的連接器版本
<a name="how-dev-bp-connectors"></a>

從 Apache Flink 1.15 版或更新版本，Managed Service for Apache Flink 會在應用程式 JARs 中使用不支援的 Kinesis 連接器版本時，自動防止應用程式啟動或更新。升級至 Managed Service for Apache Flink 1.15 版或更新版本時，請確定您使用的是最新的 Kinesis 連接器。這是指 1.15.2 版本或更新版本。Managed Service for Apache Flink 不支援所有其他版本，因為它們可能會導致**與 Stop with Savepoint** 功能的一致性問題或失敗，以防止清除停止/更新操作。若要進一步了解 Amazon Managed Service for Apache Flink 版本中的連接器相容性，請參閱 [Apache Flink 連接器](https://docs.aws.amazon.com/managed-flink/latest/java/how-flink-connectors.html)。

## 效能與平行處理層級
<a name="how-dev-bp-performance"></a>

應用程式可透過調整其平行處理層級並避免效能缺陷來進行擴展，以滿足任何輸送量水平。開發和維護應用程式時，請謹記下列各項：
+ 確認您的所有應用程式來源和接收器都已充分佈建且未受到限流。如果來源和目的地是其他服務 AWS ，請使用 [CloudWatch](https://docs.aws.amazon.com/cloudwatch/?id=docs_gateway) 監控這些服務。
+ 對於具有非常高的平行處理層級的應用程式，請檢查該高平行處理層級是否已套用到應用程式中的所有運算子。根據預設，Apache Flink 會對應用程式圖形中的所有運算子套用相同的應用程式平行處理層級。這可能會導致來源或接收器的佈建問題，或導致運算子資料處理出現瓶頸。您可以使用 [setParallelism](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/parallel.html) 來變更程式碼中每個運算子的平行處理層級設定。
+ 了解應用程式中運算子平行處理層級設定的意義。如果您變更運算子的平行處理層級，則當運算子的平行處理層級與目前設定不相容時，可能無法從建立的快照還原應用程式。如需設定運算子平行處理的詳細資訊，請參閱[明確設定運算子的最大平行處理層級](https://nightlies.apache.org/flink/flink-docs-release-1.15/ops/production_ready.html#set-maximum-parallelism-for-operators-explicitly)。

如需實作擴展的詳細資訊，請參閱 [實作應用程式擴展](how-scaling.md)。

## 設定每個運算子的平行處理層級
<a name="how-dev-bp-parallelism"></a>

根據預設，所有運算子都會在應用程式層級設定平行處理層級。您可以使用 DataStream API 和 `.setParallelism(x)` 覆寫單一運算子的平行處理層級。您可以將運算子平行處理層級設定為等於或低於應用程式平行處理層級的任何平行處理層級。

如果有可能，請將運算子平行處理層級定義為應用程式平行處理層級的函數。如此一來，運算子平行處理層級會隨應用程式平行處理層級而改變。例如，如果您使用自動擴展，則所有運算子都會以相同比例變更其平行處理層級：

```
int appParallelism = env.getParallelism();
...
...ops.setParalleism(appParallelism/2);
```

在某些情況下，您可能需要將運算子並行處理原則設定為常數。例如，將 Kinesis 串流來源的平行處理層級設定為碎片數目。在這些情況下，請考慮將運算子平行處理傳遞為應用程式組態參數來變更它，而不變更程式碼，例如重新碎片來源串流。

## 記錄
<a name="how-dev-bp-logging"></a>

您可以使用 CloudWatch Logs 來監控應用程式的效能和錯誤狀況。為應用程式設定記錄時，請謹記下列各項：
+ 為應用程式啟用 CloudWatch 記錄，以便對任何執行期問題進行偵錯。
+ 請勿為應用程式中正在處理的每筆記錄建立日誌項目。這會在處理期間造成嚴重的瓶頸，並可能導致資料處理的背壓。
+ 建立 CloudWatch 警示，以在應用程式未正常執行時通知您。如需詳細資訊，請參閱[搭配 Amazon Managed Service for Apache Flink 使用 CloudWatch 警示](monitoring-metrics-alarms.md)

如需實作記錄的詳細資訊，請參閱 [](monitoring-overview.md)。

## 編碼
<a name="how-dev-bp-code"></a>

您可以使用建議的程式設計做法，讓應用程式具備高效能和穩定性。撰寫應用程式的程式碼時，請謹記以下事項：
+ 請勿在應用程式的程式碼、應用程式的 `main` 方法或使用者定義的函數中使用 `system.exit()`。如果想要從程式碼中關閉應用程式，請擲回衍生自 `Exception` 或 `RuntimeException` 的例外狀況，在其中包含關於應用程式所發生問題的訊息。

  請注意下列有關服務如何處理此例外狀況的事項：
  + 如果從應用程式的 `main` 方法擲回例外狀況，服務會在應用程式轉換至 `RUNNING` 狀態時將其包裝在一個 `ProgramInvocationException` 中，並且作業管理員將無法提交作業。
  + 如果從使用者定義的函數擲回例外狀況，作業管理員會讓作業失敗然後重新啟動它，並將例外狀況的詳細資訊寫入例外狀況日誌中。
+ 考慮遮蔽您的應用程式 JAR 檔案及其包含的相依性。如果應用程式與 Apache Flink 執行期之間的套件名稱有可能發生衝突，建議使用遮蔽。如果發生衝突，您的應用程式日誌可能包含 `java.util.concurrent.ExecutionException` 類型的例外狀況。如需遮蔽應用程式 JAR 檔案的詳細資訊，請參閱 [Apache Maven Shade 外掛程式](https://maven.apache.org/plugins/maven-shade-plugin/)。

## 管理憑證
<a name="how-dev-bp-managing-credentials"></a>

您不應將任何長期憑證封裝到生產 (或任何其他) 應用程式中。長期憑證可能簽入版本控制系統，很容易丟失。反之，您可以將角色與 Managed Service for Apache Flink 應用程式建立關聯，並將許可授予該角色。然後，執行中的 Flink 應用程式可以從環境中選取具有相應許可的臨時登入資料。如果未與 IAM 原生整合的服務需要身分驗證，例如需要使用者名稱和密碼進行身分驗證的資料庫，您應該考慮將秘密儲存在 [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) 中。

許多 AWS 原生服務支援身分驗證：
+ Kinesis Data Streams：[ProcessTaxiStream.java](hhttps://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/ProcessTaxiStream.java#L90)
+ Amazon MSK：[https://github.com/aws/aws-msk-iam-auth/\$1using-the-amazon-msk-library-for-iam-authentication](https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication)
+ Amazon Elasticsearch Service：[AmazonElasticsearchSink.java](https://github.com/aws-samples/amazon-kinesis-data-analytics-taxi-consumer/blob/master/src/main/java/com/amazonaws/samples/kaja/taxi/consumer/operators/AmazonElasticsearchSink.java)
+ Amazon S3：在 Managed Service for Apache Flink 上立即可用

## 從具有較少碎片/分割區的來源讀取
<a name="troubleshooting-few-shards-partitions"></a>

從 Apache Kafka 或 Kinesis Data Stream 讀取時，串流的平行處理 (Kafka 的分割區數量和 Kinesis 的碎片數量） 與應用程式的平行處理之間可能不相符。使用單純的設計，應用程式的平行處理層級無法擴展到串流的平行處理層級：來源運算子的每個子任務只能從 1 個或多個碎片/分割區中讀取。這意味著對於只有 2 個碎片的串流和一個平行處理層級為 8 的應用程式，只有兩個子任務實際上從串流中取用資料，有 6 個子任務保持閒置狀態。這可能會大幅限制應用程式的輸送量，特別是如果還原序列化昂貴且由來源執行 (這是預設情況) 時。

為了減輕這種影響，您可以擴展串流。但是，這可能並不總是期望的或可行的。或者，您可以重新構建來源，以便它不執行任何序列化，只是傳遞 `byte[]`。然後，您可以[重新平衡](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/)資料，將資料平均分配到所有任務，然後還原序列化該處的資料。透過這種方式，您可以利用所有子任務進行還原序列化，並且這種潛在代價高昂的操作不再受串流的碎片/分割區數量的限制。

## Studio 筆記本重新整理間隔
<a name="notebook-refresh-rate"></a>

如果要變更段落結的果重新整理間隔，請將其設定為至少 1000 毫秒的值。

## Studio 筆記本最佳效能
<a name="notebook-refresh-rate"></a>

我們使用以下陳述式進行測試，並在 `events-per-second` 乘以 時獲得最佳效能`number-of-keys`，低於 25，000，000。這是針對 `events-per-second` 低於 150,000 以下的情況。

```
SELECT key, sum(value) FROM key-values GROUP BY key
```

## 浮水印策略和閒置碎片如何影響時間範圍
<a name="notebook-watermarking"></a>

從 Apache Kafka 和 Kinesis Data Streams 讀取事件時，來源可以根據串流的屬性設定事件時間。在 Kinesis 的情況下，事件時間等於事件的大約到達時間。但是，在來源處設定事件的事件時間無法讓 Flink 應用程式使用事件時間。來源還必須產生浮水印，將事件時間的資訊從來源傳播到所有其他運算子。[Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/)中對該過程的工作原理進行了清楚的概述。

根據預設，從 Kinesis 讀取的事件時間戳記會設定為 Kinesis 決定的大約到達時間。在應用程式中使用事件時間的其他先決條件是浮水印策略。

```
WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(...));
```

浮水印策略然後會套用至具有 `assignTimestampsAndWatermarks` 方法的 `DataStream`。有一些實用的內建策略：
+ `forMonotonousTimestamps()` 將只使用事件時間 (大約到達時間)，並定期發出最大值作為浮水印 (針對每個特定的子任務)
+ `forBoundedOutOfOrderness(Duration.ofSeconds(...))` 與之前的策略類似，但是將使用事件時間，即產生浮水印的持續時間。

從 [Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/time/)中：

*來源函數的每個平行子任務通常會獨立生成其浮水印。這些浮水印會定義該特定平行來源的事件時間。*

*當浮水印流經串流傳輸程序時，它們會將所到達之運算子處的事件時間提前。每當運算子提前其事件時間時，都會為其後續運算子產生新的浮水印。*

*某些運算子會取用多個輸入串流；例如聯集，或跟隨 keyBy (...) 或 partition (...) 函數的運算子。這類運算子的目前事件時間是其輸入串流事件時間的最小值。隨著其輸入串流更新其事件時間，運算子的事件時間也會更新。*

這意味著，如果來源子任務從閒置碎片中取用，則下游運算子不會從該子任務中收到新的浮水印，因此對使用時間範圍的所有下游運算子進行的處理將停止。為了避免這種情況，客戶可以將 `withIdleness` 選項新增到浮水印策略中。使用此選項時，運算子會在運算運算子的事件時間時，從閒置上游子任務中排除浮水印。因此，閒置子任務不會再封鎖下游運算子中事件時間的進展。

視您使用的碎片指派器而定，某些工作者可能不會獲指派任何 Kinesis 碎片。在這種情況下，即使所有 Kinesis 碎片持續交付事件資料，這些工作者也會呈現閒置來源行為。您可以`uniformShardAssigner`搭配來源運算子使用 來降低此風險。這可確保只要工作者數目小於或等於作用中碎片數目，所有來源子任務都有要處理的碎片。

不過，如果沒有子任務正在讀取任何事件，而串流中沒有事件，則內建浮水印策略的閒置選項不會延長事件時間。對於從串流中讀取一組有限事件的測試用例，這一點變得特別明顯。由於事件時間在讀取最後一個事件之後不會提前，因此最後一個時段 （包含最後一個事件） 不會關閉。

### 摘要
<a name="notebook-watermarking-summary"></a>
+ 如果碎片閒置，此`withIdleness`設定不會產生新的浮水印。它會從下游運算子的最低浮水印計算中排除閒置子任務傳送的最後一個浮水印。
+ 使用內建浮水印策略時，最後一個開啟的時段不會關閉 （除非會傳送推進浮水印的新事件，但會建立一個保持開啟的新時段）。
+ 即使 Kinesis 串流設定了時間，如果一個碎片的耗用速度比其他碎片快 （例如，在應用程式初始化期間，或使用所有現有碎片的並行耗用`TRIM_HORIZON`時，忽略其父/子關係），仍可能發生延遲到達事件。
+ 浮水印策略`withIdleness`的設定似乎會中斷閒置碎片的 Kinesis 來源特定設定`(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS`。

### 範例
<a name="notebook-watermarking-example"></a>

下列應用程式正在從串流讀取，並根據事件時間建立工作階段視窗。

```
Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");

FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig);

WatermarkStrategy<String> s = WatermarkStrategy
    .<String>forMonotonousTimestamps()
    .withIdleness(Duration.ofSeconds(15));
    
env.addSource(consumer)
    .assignTimestampsAndWatermarks(s)
    .map(new MapFunction<String, Long>() {
        @Override
        public Long map(String s) throws Exception {
            return Long.parseLong(s);
        }
    })
    .keyBy(l -> 0l)
    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
    .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() {
        @Override
        public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception {
            long count = StreamSupport.stream(iterable.spliterator(), false).count();
            long timestamp = context.currentWatermark();

            System.out.print("XXXXXXXXXXXXXX Window with " + count + " events");
            System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp));


            for (Long l : iterable) {
                System.out.println(l);
            }
        }
    });
```

在下列範例中，8 個事件會寫入一個有 16 個碎片的串流 (開頭 2 個和最後一個事件發生在相同的碎片中)。

```
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ==
$ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg==
$ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028721934184977530127978070210"
}
{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811028795678659974022576354623682"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275134360684221592378842022114"
}
Wed Mar 23 11:19:57 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA==
$ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ==
$ date

{
    "ShardId": "shardId-000000000010",
    "SequenceNumber": "49627894338570054070103749783042116732419934393936642210"
}
{
    "ShardId": "shardId-000000000014",
    "SequenceNumber": "49627894338659257050897872275659034489934342334017700066"
}
Wed Mar 23 11:20:10 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng==
$ date

{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49627894338369347363316974173886988345467035365375213586"
}
Wed Mar 23 11:20:22 CET 2022

$ sleep 10
$ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw==
$ date

{
    "ShardId": "shardId-000000000008",
    "SequenceNumber": "49627894338525452579706688535878947299195189349725503618"
}
Wed Mar 23 11:20:34 CET 2022

$ sleep 60
$ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA==
$ date

{
    "ShardId": "shardId-000000000012",
    "SequenceNumber": "49627894338614655560500811029600823255837371928900796610"
}
Wed Mar 23 11:21:27 CET 2022
```

此輸入應該會產生 5 個工作階段視窗：事件 1、2、3；事件 4、5；事件 6；事件 7；事件 8。但是，該程式只產生了前 4 個視窗。

```
11:59:21,529 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,530 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,531 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,532 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,533 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM
11:59:21,568 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0
11:59:23,209 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,244 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z
11:59:23,377 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,405 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,581 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:23,586 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1
11:59:24,790 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z
event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z
event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z
11:59:24,907 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2
event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z
event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z
event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z
event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z
XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z
3
1
2
XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
4
5
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z
6
XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z
7
```

輸出僅顯示 4 個視窗 (缺少包含事件 8 的最後一個視窗)。這是由於事件時間和浮水印策略所導致。最後一個視窗無法關閉，因為預先建置的浮水印策略時間絕不會超過從串流讀取的最後一個事件時間。但是對於要關閉的視窗，時間需要在最後一個事件發生後提前超過 10 秒。在此情況下，最後一個浮水印是 2022-03-23T10：21：27.170Z，但工作階段視窗關閉時，需要浮水印 10 秒和 1 毫秒之後。

如果從浮水印策略中移除 `withIdleness`選項，則工作階段視窗將不會關閉，因為視窗運算子的「全域浮水印」無法繼續。

當 Flink 應用程式啟動時 （或如果有資料扭曲），某些碎片的使用速度可能會比其他碎片快。這可能會導致部分浮水印過早從子任務發出 （子任務可能會根據一個碎片的內容發出浮水印，而不會從訂閱的其他碎片耗用）。緩解的方式是不同的浮水印策略，可新增安全緩衝區`(forBoundedOutOfOrderness(Duration.ofSeconds(30))`或明確允許延遲到達事件`(allowedLateness(Time.minutes(5))`。

## 為所有運算子設定 UUID
<a name="best-practices-setting-operator-ids"></a>

當 Managed Service for Apache Flink 使用快照為應用程式啟動 Flink 作業時，Flink 作業可能會因為某些問題而無法啟動。其中一個問題是*運算子 ID 不符*。Flink 預期 Flink 作業圖表運算子具有明確且一致的運算子 ID。如果未明確設定，Flink 會為運算子產生 ID。這是因為，Flink 使用這些運算子 ID 來唯一識別作業圖表中的運算子，並使用它們將每個運算子的狀態儲存在儲存點中。

當 Flink 在作業圖表的運算子 ID 與儲存點中定義的運算子 ID 之間找不到 1:1 對應時，就會發生*運算子 ID 不符*問題。當未設定明確一致運算子 IDs且 Flink 產生可能與每個任務圖表建立不一致的運算子 IDs 時，就會發生這種情況。在維護執行期間，應用程式遇到此問題的可能性很高。為了避免這種情況，我們建議客戶為 Flink 程式碼中的所有運算子設定 UUID。如需詳細資訊，請參閱*生產就緒性*下的[為所有運算子設定 UUID](https://docs.aws.amazon.com/managed-flink/latest/java/production-readiness.html) 主題。

## 將 ServiceResourceTransformer 新增至 Maven 著色外掛程式
<a name="best-practices-service-resource-transformer"></a>

Flink 使用 Java 的[服務提供者介面 (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) 來載入連接器和格式等元件。使用 SPI 的多個 Flink 相依性[可能會在 uber-jar 和非預期的應用程式行為中造成衝突](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/#transform-table-connectorformat-resources)。我們建議您新增 Maven 著色外掛程式的 [ServiceResourceTransformer](https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html)，如 pom.xml 中所定義。

```
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
        </plugin>
```