

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Managed Service for Apache Flink：運作方式
<a name="how-it-works"></a>

Managed Service for Apache Flink 是一種全受管的 Amazon 服務，可讓您使用 Apache Flink 應用程式來處理串流資料。首先，您將 Apache Flink 應用程式編寫程式，然後建立 Managed Service for Apache Flink 應用程式。

## 設定您的 Apache Flink 應用程式
<a name="how-it-works-programming"></a>

Apache Flink 應用程式是使用 Apache Flink 框架建立的 Java 或 Scala 應用程式。您可以在本機編寫並建置 Apache Flink 應用程式。

應用程式主要使用 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) 或[資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/)。您也可以使用其他 Apache Flink API，但是這些 API 在建置串流應用程式時較不常用。

兩種 API 的功能如下：

### DataStream API
<a name="how-it-works-prog-datastream"></a>

Apache Flink DataStream API 程式設計模型以兩個元件為基礎：
+ **資料串流**：資料記錄之連續資料流的結構化表示。
+ **轉換運算子**：接受一或多個資料串流作為輸入，並產生一或多個資料串流作為輸出。

使用 DataStream API 建立的應用程式會執行下列動作：
+ 從資料來源 (例如 Kinesis 串流或 Amazon MSK 主題) 讀取資料。
+ 將轉換套用至資料，例如篩選、彙總或富集。
+ 將轉換後的資料寫入資料接收器。

使用 DataStream API 的應用程式可以使用 Java 或 Scala 編寫，而且可以從 Kinesis 資料串流、Amazon MSK 主題或自訂來源讀取。

應用程式使用*連接器*來處理資料。Apache Flink 使用以下類型的連接器：
+ **來源**：用於讀取外部資料的連接器。
+ **接收器**：用於寫入外部位置的連接器。
+ **運算子**：用於處理應用程式內資料的連接器。

典型的應用程式包含至少一個具有來源的資料串流、具有一或多個運算子的資料串流，以及至少一個資料接收器。

如需如何使用 DataStream API 的詳細資訊，請參閱 [檢閱 DataStream API 元件](how-datastream.md)。

### 資料表 API
<a name="how-it-works-prog-table"></a>

Apache Flink 資料表 API 程式設計模型以下列元件為基礎：
+ **資料表環境：**用於建立和託管一個或多個資料表的基礎資料的介面。
+ **資料表：**提供 SQL 資料表或檢視存取權的物件。
+ **資料表來源：**用於從外部來源 (例如 Amazon MSK 主題) 讀取資料。
+ **資料表函數：**用於轉換資料的 SQL 查詢或 API 呼叫。
+ **資料表接收器：**用於將資料寫入外部位置，例如 Amazon S3 儲存貯體。

使用資料表 API 建立的應用程式會執行下列動作：
+ 透過連線至 `Table Source` 建立 `TableEnvironment`。
+ 使用 SQL 查詢或資料表 API 函數在 `TableEnvironment` 中建立資料表。
+ 使用資料表 API 或 SQL 在資料表上執行查詢。
+ 使用資料表函數或 SQL 查詢對查詢結果套用轉換。
+ 將查詢或函數結果寫入 `Table Sink`。

使用資料表 API 的應用程式可以用 Java 或 Scala 編寫，並且可以使用 API 呼叫或 SQL 查詢來查詢資料。

如需如何使用資料表 API 的詳細資訊，請參閱 [檢閱資料表 API 元件](how-table.md)。

## 建立 Managed Service for Apache Flink 應用程式
<a name="how-it-works-app"></a>

Managed Service for Apache Flink 是一項 AWS 服務，可建立託管 Apache Flink 應用程式的環境，並提供下列設定：
+ **[使用執行期屬性](how-properties.md)：**可以提供給應用程式的參數。您可以變更這些參數，無需重新編譯應用程式的程式碼。
+ **[實作容錯能力](how-fault.md)**：應用程式如何從中斷和重新啟動中復原。
+ **[在 Amazon Managed Service for Apache Flink 中記錄和監控](monitoring-overview.md)**：應用程式如何將事件記錄到 CloudWatch Logs。
+ **[實作應用程式擴展](how-scaling.md)**：應用程式如何佈建運算資源。

您可以使用主控台或 AWS CLI建立 Managed Service for Apache Flink 應用程式。若要開始建立 Managed Service for Apache Flink 應用程式，請參閱[教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)。

# 建立 Managed Service for Apache Flink 應用程式
<a name="how-creating-apps"></a>

本主題包含建立 Managed Service for Apache Flink 應用程式的相關資訊。

**Topics**
+ [建置 Managed Service for Apache Flink 應用程式程式碼](#how-creating-apps-building)
+ [建立 Managed Service for Apache Flink 應用程式](#how-creating-apps-creating)
+ [使用客戶受管金鑰](#how-creating-apps-use-cmk)
+ [啟動 Managed Service for Apache Flink 應用程式](#how-creating-apps-starting)
+ [驗證 Managed Service for Apache Flink 應用程式](#how-creating-apps-verifying)
+ [為您的 Managed Service for Apache Flink 應用程式啟用系統復原](how-system-rollbacks.md)

## 建置 Managed Service for Apache Flink 應用程式程式碼
<a name="how-creating-apps-building"></a>

本節說明您用來為 Managed Service for Apache Flink 應用程式建置應用程式程式碼的元件。

建議將 Apache Flink 應用程式的最新支援版本用於您的應用程式程式碼。如需升級 Managed Service for Apache Flink 應用程式的相關資訊，請參閱[針對 Apache Flink 使用就地版本升級](how-in-place-version-upgrades.md)。

您可以使用 [Apache Maven](https://maven.apache.org/) 建置應用程式的程式碼。Apache Maven 專案使用 `pom.xml` 檔案來指定它使用的元件的版本。

**注意**  
Managed Service for Apache Flink 支援最大 512 MB 的 JAR 檔案。如果使用的 JAR 檔案大於此大小，應用程式將無法啟動。

應用程式現在可以使用任何 Scala 版本的 Java API。您必須將您選擇的 Scala 標準程式庫綁定到 Scala 應用程式。

如需建立使用 **Apache Beam** 之 Managed Service for Apache Flink 應用程式的相關資訊，請參閱[將 Apache Beam 與 Managed Service for Apache Flink 應用程式搭配使用](how-creating-apps-beam.md)。

### 指定應用程式的 Apache Flink 版本
<a name="how-creating-apps-building-flink"></a>

使用 Managed Service for Apache Flink 執行期 1.1.0 版及更新版本時，您可以指定您編譯應用程式時應用程式使用的 Apache Flink 版本。您可以使用 `-Dflink.version` 參數提供 Apache Flink 的版本。例如，如果您使用的是 Apache Flink 2.2.0，請提供下列項目：

```
mvn package -Dflink.version=2.2.0
```

如需使用舊版 Apache Flink 建置應用程式，請參閱 [舊版](earlier.md)。

## 建立 Managed Service for Apache Flink 應用程式
<a name="how-creating-apps-creating"></a>

建置應用程式程式碼之後，請執行下列動作來建立 Managed Service for Apache Flink (Amazon MSF) 應用程式：
+ **上傳應用程式的程式碼**：將應用程式的程式碼上傳至 Amazon S3 儲存貯體。建立應用程式時，請指定應用程式程式碼的 S3 儲存貯體名稱和物件名稱。如需說明如何上傳應用程式程式碼的教學課程，請參閱教學[教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)課程。
+ **建立 Managed Service for Apache Flink 應用程式**：使用下列其中一種方法來建立 Amazon MSF 應用程式：
**注意**  
Amazon MSF 預設會使用 加密您的應用程式 AWS 擁有的金鑰。您也可以使用 AWS KMS 客戶受管金鑰 (CMKs) 來自行建立、擁有和管理金鑰，以建立新的應用程式。如需 CMKs 的詳細資訊，請參閱 [Amazon Managed Service for Apache Flink 中的金鑰管理](key-management-flink.md)。
  + **使用 AWS 主控台建立 Amazon MSF 應用程式：**您可以使用 AWS 主控台建立和設定應用程式。

    當您使用主控台建立應用程式時，也會為您建立應用程式的相依資源 (例如 CloudWatch Logs 串流、IAM 角色和 IAM 政策)。

    使用主控台建立應用程式時，您可以從 **Managed Service for Apache Flink - 建立應用程式**頁面的下拉式清單中選取版本，來指定應用程式使用的 Apache Flink 版本。

    如需如何使用 主控台建立應用程式的教學課程，請參閱教學[教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)課程。
  + **使用 CLI AWS 建立 Amazon MSF 應用程式：**您可以使用 CLI AWS 建立和設定應用程式。

    當您使用 CLI 建立應用程式時，還必須手動建立應用程式的相依資源 (例如 CloudWatch Logs 串流、IAM 角色和 IAM 政策)。

    使用 CLI 建立應用程式時，您可以使用 `CreateApplication` 動作的 `RuntimeEnvironment` 參數指定應用程式使用的 Apache Flink 版本。
**注意**  
您可以變更現有應用程式的 `RuntimeEnvironment` 。若要了解作法，請參閱[針對 Apache Flink 使用就地版本升級](how-in-place-version-upgrades.md)。

## 使用客戶受管金鑰
<a name="how-creating-apps-use-cmk"></a>

在 Amazon MSF 中，客戶受管金鑰 (CMKs) 是一種功能，您可以使用您在 AWS Key Management Service () 上建立、擁有和管理的金鑰來加密應用程式的資料AWS KMS。對於 Amazon MSF 應用程式，這表示受 Flink [檢查點](how-fault.md)或[快照](how-snapshots.md)約束的所有資料都會使用您為該應用程式定義的 CMK 加密。

若要搭配應用程式使用 CMK，您必須先[建立新的應用程式](#how-creating-apps-creating)，然後套用 CMK。如需使用 CMKs 的詳細資訊，請參閱 [Amazon Managed Service for Apache Flink 中的金鑰管理](key-management-flink.md)。

## 啟動 Managed Service for Apache Flink 應用程式
<a name="how-creating-apps-starting"></a>

建置應用程式的程式碼、將程式碼上傳至 S3，並建立 Managed Service for Apache Flink 應用程式之後，即可啟動應用程式。啟動 Managed Service Apache Flink 應用程式通常需要幾分鐘時間。

使用下列其中一種方法來啟動應用程式：
+ **使用 AWS 主控台啟動 Managed Service for Apache Flink 應用程式：**您可以在 AWS 主控台的應用程式頁面上選擇**執行**，以執行應用程式。
+ **使用 AWS API 啟動 Managed Service for Apache Flink 應用程式：**您可以使用 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 動作執行應用程式。

## 驗證 Managed Service for Apache Flink 應用程式
<a name="how-creating-apps-verifying"></a>

您可以驗證應用程式是否正常運作，方式如下：
+ **使用 CloudWatch Logs**：您可以使用 CloudWatch Logs 和 CloudWatch Logs Insights 來驗證您的應用程式是否在正常執行。如需將 CloudWatch Logs 與 Managed Service for Apache Flink 搭配使用的相關資訊，請參閱[在 Amazon Managed Service for Apache Flink 中記錄和監控](monitoring-overview.md)。
+ **使用 CloudWatch 指標：**您可以使用 CloudWatch 指標來監控應用程式的活動，或應用程式用於輸入或輸出的資源中的活動 （例如 Kinesis 串流、Firehose 串流或 Amazon S3 儲存貯體）。如需 CloudWatch 指標的詳細資訊，請參閱《Amazon CloudWatch 使用者指南》中的[使用指標](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html)。
+ **監控輸出位置：**如果應用程式將輸出寫入某個位置 (例如 Amazon S3 儲存貯體或資料庫)，您可以為寫入的資料監控該位置。

# 為您的 Managed Service for Apache Flink 應用程式啟用系統復原
<a name="how-system-rollbacks"></a>

透過系統復原功能，您可以在 Amazon Managed Service for Apache Flink 上實現更高可用性的執行中 Apache Flink 應用程式。選擇此組態可讓服務在 或 等動作`autoscaling`執行到程式碼`UpdateApplication`或組態錯誤時，自動將應用程式還原至先前執行的版本。

**注意**  
若要使用系統復原功能，您必須更新應用程式以選擇加入。根據預設，現有的應用程式不會自動使用系統復原。

## 運作方式
<a name="how-rollback-works"></a>

當您啟動應用程式操作時，例如更新或擴展動作，Amazon Managed Service for Apache Flink 會先嘗試執行該操作。如果偵測到阻止操作成功的問題，例如程式碼錯誤或許可不足，服務會自動啟動`RollbackApplication`操作。

轉返會嘗試將應用程式還原至先前成功執行的版本，以及相關聯的應用程式狀態。如果轉返成功，您的應用程式會使用先前的版本，以最短的停機時間繼續處理資料。如果自動轉返也失敗，Amazon Managed Service for Apache Flink 會將應用程式轉換為 `READY` 狀態，以便您可以採取進一步的動作，包括修正錯誤和重試操作。

您必須選擇加入，才能使用自動系統復原。從現在開始，您可以使用 主控台或 API 來啟用應用程式的所有操作。

`UpdateApplication` 動作的下列範例請求會啟用應用程式的系統復原：

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## 檢閱自動系統復原的常見案例
<a name="common-scenarios"></a>

下列案例說明自動系統復原的有利之處：
+ **應用程式更新：**如果您使用透過主要方法初始化 Flink 任務時出現錯誤的新程式碼來更新應用程式，則自動復原允許還原先前的工作版本。系統復原有幫助的其他更新案例包括：
  + 如果您的應用程式已更新為以高於 [maxParallelism](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto) 的平行處理執行。
  + 如果您的應用程式已更新為在啟動 Flink 任務期間導致失敗的 VPC 應用程式使用不正確的子網路執行。
+ **Flink 版本升級：**當您升級至新的 Apache Flink 版本，且升級的應用程式遇到快照相容性問題時，系統復原可讓您自動還原至先前的 Flink 版本。
+ **AutoScaling：**當應用程式擴展，但由於快照和 Flink 任務圖表之間的運算子不相符，從儲存點遇到還原問題時。

## 使用操作 APIs進行系統復原
<a name="operation-apis"></a>

為了提供更好的可見性，Amazon Managed Service for Apache Flink 有兩個與應用程式操作相關的 APIs，可協助您追蹤故障和相關的系統復原。

`ListApplicationOperations`

此 API 會依反向時間順序列出應用程式上執行的所有操作，包括 `RollbackApplication`、、 `UpdateApplication` `Maintenance`和其他操作。`ListApplicationOperations` 動作的下列範例請求會列出應用程式的前 10 個應用程式操作：

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

的下列範例請求`ListApplicationOperations`有助於篩選應用程式上先前更新的清單：

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

此 API 提供 列出之特定操作的詳細資訊`ListApplicationOperations`，包括故障原因，如適用。`DescribeApplicationOperation` 動作的下列範例請求會列出特定應用程式操作的詳細資訊：

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

如需故障診斷資訊，請參閱[系統復原最佳實務](troubleshooting-system-rollback.md)。

# 執行 Managed Service for Apache Flink 應用程式
<a name="how-running-apps"></a>

本主題包含執行 Managed Service for Apache Flink 的相關資訊。

執行 Managed Service for Apache Flink 應用程式時，該服務會建立 Apache Flink 作業。Apache Flink 作業是指 Managed Service for Apache Flink 應用程式的執行生命週期。作業的執行及其使用的資源由作業管理員管理。作業管理員會將應用程式的執行分隔為任務。每個任務由任務管理員管理。監視應用程式的效能時，您可以檢查每個任務管理員的效能或作業管理員的整體效能。

如需 Apache Flink 任務的相關資訊，請參閱 Apache Flink 文件中的[任務和排程](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)。

## 識別應用程式和任務狀態
<a name="how-running-job-status"></a>

應用程式及其作業都具有目前的執行狀態：
+ **應用程式狀態**：您的應用程式目前的狀態，描述其執行期。應用程式狀態包括下列幾種：
  + **穩定的應用程式狀態**：您的應用程式通常會保持下列狀態，直到您變更狀態為止：
    + **READY**：新的或已停止的應用程式處於 READY 狀態，直到您執行它為止。
    + **RUNNING**：已成功啟動的應用程式處於 RUNNING 狀態。
  + **暫時性應用程式狀態**：處於這些狀態的應用程式通常處於轉換至其他狀態的過程中。如果應用程式已保持暫時狀態一段時間，您可以使用 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 動作停止應用程式，並將 `Force` 參數設定為 `true`。這些狀態包括下列項目：
    + `STARTING:` 在 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 操作之後發生。應用程式正在從狀態 `READY` 轉換為 `RUNNING` 狀態。
    + `STOPPING:` 在 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 動作之後發生。應用程式正在從狀態 `RUNNING` 轉換為 `READY` 狀態。
    + `DELETING:` 在 [DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html) 動作之後發生。正在刪除應用程式。
    + `UPDATING:` 在 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作之後發生。應用程式正在更新，並會轉換回 `RUNNING` 或 `READY` 狀態。
    + `AUTOSCALING:` 應用程式的 [ParallelismConfiguration `AutoScalingEnabled`](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html) 屬性設定為 `true`，而且服務正在增加應用程式的平行處理層級。當應用程式處於此狀態時，您唯一可以使用的有效 API 動作是 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 動作，且 `Force` 參數設定為 `true`。如需自動擴展的相關資訊，請參閱[在 Managed Service for Apache Flink 中使用自動擴展](how-scaling-auto.md)。
    + `FORCE_STOPPING:` 在呼叫 [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html) 動作且 `Force` 參數設定為 `true` 之後發生。正在停止應用程式。應用程式正在從 `STARTING`、`UPDATING`、`STOPPING` 或 `AUTOSCALING` 狀態轉換為 `READY` 狀態。
    + `ROLLING_BACK:` 在 [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) 動作之後發生。應用程式正在復原至先前的版本。應用程式正在從 `UPDATING` 或 `AUTOSCALING` 狀態轉換為 `RUNNING` 狀態。
    + `MAINTENANCE:` 在 Managed Service for Apache Flink 將修補程式套用至您的應用程式時發生。如需詳細資訊，請參閱[管理 Managed Service for Apache Flink 的維護任務](maintenance.md)。

  您可以使用主控台或 [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html) 動作來檢查應用程式的狀態。
+ **作業狀態：**當應用程式處於 `RUNNING` 狀態時，作業的狀態會描述其目前的執行期。作業會以 `CREATED` 狀態開始，然後在啟動時繼續進行到 `RUNNING` 狀態。如果發生錯誤情況，應用程式會進入下列狀態：
  + 對於使用 Apache Flink 1.11 及更新版本的應用程式，會進入 `RESTARTING` 狀態。
  + 對於使用 Apache Flink 1.8 及更新版本的應用程式，會進入 `FAILING` 狀態。

  然後，應用程式會繼續進入 `RESTARTING` 或 `FAILED` 狀態，取決於作業是否可以重新啟動。

  您可以檢查應用程式的 CloudWatch 日誌看是否有狀態變更，以檢查作業的狀態。

## 執行批次工作負載
<a name="batch-workloads"></a>

Managed Service for Apache Flink 支援執行 Apache Flink 批次工作負載。在批次作業中，當 Apache Flink 作業進入 **FINISHED** 狀態時，Managed Service for Apache Flink 應用程式的狀態會設定為 **READY**。如需 Flink 作業狀態的詳細資訊，請參閱[作業與排程](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/)。

# 檢閱 Managed Service for Apache Flink 應用程式資源
<a name="how-resources"></a>

本節說明應用程式使用的系統資源。了解 Managed Service for Apache Flink 如何佈建和使用資源，將協助您設計、建立和維護高效能且穩定的 Managed Service for Apache Flink 應用程式。

## Managed Service for Apache Flink 應用程式資源
<a name="how-resources-kda"></a>

Managed Service for Apache Flink 是一項 AWS 服務，可建立託管 Apache Flink 應用程式的環境。Managed Service for Apache Flink 服務使用 **Kinesis 處理單元 (KPU)** 提供資源。

一個 KPU 代表下列系統資源：
+ 一個 CPU 核心
+ 4 GB 的記憶體，其中 1 GB 是本機記憶體，3 GB 是堆積記憶體
+ 50 GB 的可用磁碟空間

KPU 在稱為**任務**和**子任務**的不同執行單元中執行應用程式。你可以將一個子任務視為等效於一個執行緖。

應用程式可用的 KPU 數量等於應用程式的 `Parallelism` 設定除以應用程式的 `ParallelismPerKPU` 設定。

如需應用程式平行處理的詳細資訊，請參閱 [實作應用程式擴展](how-scaling.md)。

## Apache Flink 應用程式資源
<a name="how-resources-flink"></a>

Apache Flink 環境會使用稱為**任務空位**的單元為應用程式配置資源。Managed Service for Apache Flink 為應用程式配置資源時，會將一或多個 Apache Flink 任務空位指派給單一 KPU。指派給單一 KPU 的空位數量等於應用程式的 `ParallelismPerKPU` 設定。如需任務槽的詳細資訊，請參閱 Apache Flink 文件中的[任務排程](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)。

### 運算子平行處理
<a name="how-resources-flink-operatorparallelism"></a>

您可以設定運算子可以使用的最大子任務數目。此值稱為**運算子平行處理層級**。依預設，應用程式中每個運算子的平行處理層級與應用程式的平行處理層級相同。這表示依預設，應用程式中的每個運算子都可以視需要使用應用程式中所有可用的子任務。

您可以使用 `setParallelism` 方法設定應用程式中運算子的平行處理層級。使用此方法，您可以控制每個運算子一次可以使用的子任務數目。

如需運算子的詳細資訊，請參閱 Apache Flink 文件中的[運算子](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

### 運算子鏈結
<a name="how-resources-flink-operatorchaining"></a>

通常，每個運算子都使用單獨的子任務來執行，但是如果多個運算子始終按順序執行，則執行期可以將它們全部分配給相同的任務。這個過程稱為**運算子鏈結**。

如果幾個循序運算子都對相同的資料進行操作，則可以鏈接到單個任務中。以下是實現這一點所需要的一些條件：
+ 運算子執行 1 對 1 簡單轉發。
+ 所有運算子具有相同的平行處理層級。

應用程式將運算子鏈結到單一子任務中時，可以節省系統資源，因為服務不需要執行網路作業，也不需要為每個運算子配置子任務。若要判斷您的應用程式是否使用運算子鏈結，請查看 Managed Service for Apache Flink 主控台中的作業圖表。應用程式中的每個頂點代表一或多個運算子。此圖表顯示已鏈結為單一頂點的運算子。

# Managed Service for Apache Flink 中的每秒帳單
<a name="how-pricing"></a>

Managed Service for Apache Flink 現在以一秒遞增計費。每個應用程式最低收費為 10 分鐘。每秒計費適用於新啟動或已執行的應用程式。本節說明 Managed Service for Apache Flink 如何計量您的用量並向您收費。若要進一步了解 Managed Service for Apache Flink 定價，請參閱 [Amazon Managed Service for Apache Flink 定價](https://aws.amazon.com/managed-service-apache-flink/pricing/)。

## 運作方式
<a name="how-resources-kda"></a>

Managed Service for Apache Flink 會在支援的 中以一秒增量計費的 **Kinesis 處理單元 (KPUs)** 期間和數量向您收費 AWS 區域。單一 KPU 包含 1vCPU 運算和 4 GB 記憶體。系統會根據用於執行應用程式的 KPUs 數目，向您收取每小時費率的費用。

例如，執行 20 分鐘和 10 秒的應用程式會收取 20 分鐘和 10 秒的費用，再乘以其使用的資源。執行 5 分鐘的應用程式將至少收取 10 分鐘的費用，再乘以其使用的資源。

Managed Service for Apache Flink 以小時為單位說明用量。例如，15 分鐘對應至 0.25 小時。

對於 Apache Flink 應用程式，您需要為每個應用程式支付單一額外 KPU，用於協調。應用程式也會針對執行中的儲存體和耐用的備份付費。執行中的應用程式儲存用於 Managed Service for Apache Flink 中的有狀態處理功能，並按 GB/月收費。持久性備份是選用的，可為應用程式提供point-in-time復原，每月每 GB 收費。

在串流模式下，Managed Service for Apache Flink 會隨著記憶體和運算的需求波動，自動擴展串流處理應用程式所需的 KPUs 數量。您可以選擇使用所需的 KPUs 數量來佈建應用程式。

## AWS 區域 可用性
<a name="how-pricing-regions"></a>

**注意**  
目前，每秒計費不適用於下列區域： AWS GovCloud （美國東部）、 AWS GovCloud （美國西部）、中國 （北京） 和中國 （寧夏）。

每秒計費提供如下 AWS 區域：
+ 美國東部 (維吉尼亞北部) – us-east-1
+ 美國東部 (俄亥俄) - us-east-2
+ 美國西部 (加利佛尼亞北部) – us-west-1
+ 美國西部 (奧勒岡) - us-west-2
+ 非洲 (開普敦) – af-south-1
+ 亞太區域 (香港) – ap-east-1
+ 亞太區域 （海德拉巴） - ap-south-1
+ 亞太區域 (雅加達) – ap-southeast-3
+ 亞太區域 （墨爾本） - ap-southeast-4
+ 亞太區域 (孟買) – ap-south-1
+ 亞太區域 (大阪) - ap-northeast-3
+ 亞太區域 (首爾) – ap-northeast-2
+ 亞太區域 (新加坡) – ap-southeast-1
+ 亞太區域 (雪梨) – ap-southeast-2
+ 亞太區域 (東京) – ap-northeast-1
+ 加拿大 (中部) – ca-central-1
+ 加拿大西部 (卡加利) - ca-west-1
+ 歐洲 (法蘭克福) – eu-central-1
+ 歐洲 (愛爾蘭) – eu-west-1
+ 歐洲 (倫敦) – eu-west-2
+ 歐洲 (米蘭) – eu-south-1
+ 歐洲 (巴黎) – eu-west-3
+ 歐洲 (西班牙) – eu-south-2
+ 歐洲 (斯德哥爾摩) – eu-north-1
+ 歐洲 (蘇黎世) – eu-central-2
+ 以色列 （特拉維夫） - il-central-1
+ 中東 (巴林) – me-south-1
+ 中東 (阿拉伯聯合大公國) – me-central-1
+ 南美洲 (聖保羅) – sa-east-1

## 定價範例
<a name="how-pricing-examples"></a>

您可以在 Managed Service for Apache Flink 定價頁面上找到定價範例。如需詳細資訊，請參閱 [Amazon Managed Service for Apache Flink 定價](https://aws.amazon.com/managed-service-apache-flink/pricing/)。以下是每個 的成本用量報告圖例的進一步範例。

### 長時間執行、繁重的工作負載
<a name="pricing-example-1"></a>

您是大型影片串流服務，您想要根據使用者的互動建立即時影片建議。您可以在 Managed Service for Apache Flink 中使用 Apache Flink 應用程式，持續從多個 Kinesis 資料串流擷取使用者互動事件，並在輸出至下游系統之前即時處理事件。使用者互動事件會使用多個運算子進行轉換。這包括依事件類型分割資料、使用其他中繼資料擴充資料、依時間戳記排序資料，以及在交付前緩衝資料 5 分鐘。應用程式有許多需要大量運算且可平行化的轉換步驟。您的 Flink 應用程式已設定為使用 20 KPUs執行，以容納工作負載。您的應用程式每天使用 1 GB 的耐用應用程式備份。每月 Managed Service for Apache Flink 費用的計算方式如下：

**每月費用**

美國東部 （維吉尼亞北部） 區域的價格為每 KPU 小時 0.11 美元。Managed Service for Apache Flink 為每個 KPU 配置 50 GB 的執行中應用程式儲存體，每月收取 0.10 USD。
+ 每月 KPU 費用：24 小時 \$1 30 天 \$1 (20 個 KPUs \$1 串流應用程式的額外 1 個 KPU) \$1 \$10.11/小時 = \$11，584.00
+ 每月執行的應用程式儲存費用：30 天 \$1 20 KPUs \$1 50 GB/KPUs \$1 \$10.10/GB-月 = \$1100.00
+ 每月持久性應用程式儲存費用：30 天 \$1 1 GB \$1 0.023/GB-月 = 0.03 美元
+ 總費用：\$11，584.00 \$1 \$1100 \$1 \$10.03 = **\$11，684.03**

**帳單與成本管理主控台上當月 Managed Service for Apache Flink 的成本用量報告**

Kinesis Analytics
+ USD 1，684.03 - 美國東部 （維吉尼亞北部）
+ Amazon Kinesis Analytics CreateSnapshot
  + 每 GB 每月 \$10.023 的耐用應用程式備份
    + 每月 1 GB - 0.03 USD
+ Amazon Kinesis Analytics StartApplication
  + 每 GB 每月執行應用程式儲存 0.10 美元
    + 每月 1，000 GB - 100 USD
  + Apache Flink 應用程式每 Kinesis 處理單位小時 0.11 美元
    + 15，120 KPU 小時 - 1，584 USD

### 每天執行約 15 分鐘的批次工作負載
<a name="pricing-example-2"></a>

您可以在 Managed Service for Apache Flink 中使用 Apache Flink 應用程式，以批次模式轉換 Amazon Simple Storage Service (Amazon S3) 中的日誌資料。日誌資料會使用多個運算子進行轉換。這包括將結構描述套用至不同的日誌事件、依事件類型分割資料，以及依時間戳記排序資料。應用程式有許多轉換步驟，但沒有任何步驟具有運算密集性。此應用程式會在 30 天內，每天以每秒 2，000 筆記錄擷取資料 15 分鐘。您不會建立任何耐用的應用程式備份。每月 Managed Service for Apache Flink 費用的計算方式如下：

**每月費用**

美國東部 （維吉尼亞北部） 區域的價格為每 KPU 小時 0.11 美元。Managed Service for Apache Flink 為每個 KPU 配置 50 GB 的執行中應用程式儲存體，每月收取 0.10 USD。
+ 批次工作負載：在每天 15 分鐘內，Managed Service for Apache Flink 應用程式正在處理 2，000 筆記錄/秒，這需要 2KPUs。30 天/月 \$1 15 分鐘/天 = 450 分鐘/月
+ 每月 KPU 費用：每月 450 分鐘 \$1 (2KPUs 串流應用程式的額外 1 個 KPU) \$1 \$10.11/小時 = \$12.48
+ 每月執行的應用程式儲存費用：每月 450 分鐘 \$1 2 KPUs \$1 50 GB/KPUs \$1 每月 0.10 USD = 0.11 USD
+ 費用總計：\$12.48 \$1 0.11 = **\$12.59**

**帳單與成本管理主控台上當月 Managed Service for Apache Flink 的成本用量報告**

Kinesis Analytics
+ USD 2.59 - 美國東部 （維吉尼亞北部）
+ Amazon Kinesis Analytics StartApplication
  + 每 GB 每月執行應用程式備份 0.10 美元
    + 每月 1.042 GB - 0.11 USD
  + Apache Flink 應用程式每 Kinesis 處理單位小時 0.11 美元
    + 22.5 KPU-Hour - USD 2.48

### 測試應用程式，可在同一小時內停止並持續啟動，並吸引多個最低費用
<a name="pricing-example-3"></a>

您是一個大型電子商務平台，每天處理數百萬筆交易。您想要開發即時詐騙偵測。您可以在 Managed Service for Apache Flink 中使用 Apache Flink 應用程式，從 Kinesis Data Streams 擷取交易事件，並使用不同的轉換步驟即時處理事件。這包括使用滑動視窗彙總事件、依事件類型分割事件，以及針對不同的事件類型套用特定偵測規則。在開發期間，您會多次啟動和停止應用程式，以測試和偵錯行為。有時候，您的應用程式只會執行幾分鐘。當您使用 4 個 KPUs 測試應用程式時，有一小時的時間，您的應用程式不會使用任何耐用的應用程式備份：
+ 在上午 10：05，您會啟動應用程式，該應用程式會在上午 10：35 停止之前執行 30 分鐘。
+ 在上午 10：40，您會再次啟動應用程式，該應用程式會在上午 10：45 停止之前執行 5 分鐘。
+ 上午 10：50，您會再次啟動應用程式，並在上午 10：52 停止之前執行 2 分鐘。

每次應用程式啟動時，Managed Service for Apache Flink 都會收取至少 10 分鐘的使用費。您應用程式的每月 Managed Service for Apache Flink 用量計算方式如下：
+ 第一次您的應用程式啟動和停止時：使用 30 分鐘
+ 第二次您的應用程式啟動和停止時：使用 10 分鐘 （您的應用程式執行 5 分鐘，四捨五入至最低 10 分鐘費用）
+ 您的應用程式第三次啟動和停止：使用 10 分鐘 （您的應用程式執行 2 分鐘，四捨五入至最低 10 分鐘費用）

您的應用程式總共需支付 50 分鐘的使用費。如果應用程式執行當月沒有其他時間，則會計算每月 Managed Service for Apache Flink 費用，如下所示：

**每月費用**

美國東部 （維吉尼亞北部） 區域的價格為每 KPU 小時 0.11 美元。Managed Service for Apache Flink 為每個 KPU 配置 50 GB 的執行中應用程式儲存體，每月收取 0.10 USD。
+ 每月 KPU 費用：50 分鐘 \$1 (4KPUs \$1 串流應用程式的 1 個額外 KPU) \$1 0.11 美元/小時 = 0.46 美元 （四捨五入至最接近的一元）
+ 每月執行的應用程式儲存費用：50 分鐘 \$1 4 KPUs \$1 50 GB/KPUs \$1 每月 0.10 USD = 0.03 USD （四捨五入至最接近的一元）
+ 費用總計：\$10.46 \$1 0.03 = **\$10.49**

**帳單與成本管理主控台上當月 Managed Service for Apache Flink 的成本用量報告**

Kinesis Analytics
+ USD 0.49 - 美國東部 （維吉尼亞北部）
+ Amazon Kinesis Analytics StartApplication
  + 每 GB 每月執行應用程式儲存 0.10 美元
    + 每月 0.232 GB - 0.03 USD
  + Apache Flink 應用程式每 Kinesis 處理單位小時 0.11 美元
    + 4.167 KPU-Hour - USD 0.46

# 檢閱 DataStream API 元件
<a name="how-datastream"></a>

Apache Flink 應用程式使用 [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) 來轉換資料串流中的資料。

本節說明移動、轉換和追蹤資料的不同元件：
+ [使用連接器透過 DataStream API 在 Managed Service for Apache Flink 中移動資料](how-connectors.md)：這些元件可以在應用程式與外部資料來源和目的地之間移動資料。
+ [使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料](how-operators.md)：這些元件可以轉換或分組應用程式中的資料元素。
+ [使用 DataStream API 追蹤 Managed Service for Apache Flink 中的事件](how-time.md)：本主題說明 Managed Service for Apache Flink 如何在使用 DataStream API 時追蹤事件。

# 使用連接器透過 DataStream API 在 Managed Service for Apache Flink 中移動資料
<a name="how-connectors"></a>

在 Amazon Managed Service for Apache Flink DataStream API 中，*連接器*是可將資料移入和移出 Managed Service for Apache Flink 應用程式的軟體元件。連接器是靈活的整合，可讓您從檔案和目錄讀取。連接器包含用於與 Amazon 服務和第三方系統互動的完整模組。

連接器包含下列類型：
+ [新增串流資料來源](how-sources.md)：從 Kinesis 資料串流、檔案或其他資料來源向應用程式提供資料。
+ [使用接收器寫入資料](how-sinks.md)：從您的應用程式將資料傳送至 Kinesis 資料串流、Firehose 串流或其他資料目的地。
+ [使用非同步 I/O](how-async.md)：提供對資料來源 (例如資料庫) 的非同步存取，以富集串流事件。

## 可用的連接器
<a name="how-connectors-list"></a>

Apache Flink 架構包含用於存取各種來源之資料的連接器。如需 Apache Flink 架構中可用連接器的相關資訊，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)。

**警告**  
如果您的應用程式在 Flink 1.6、1.8、1.11 或 1.13 上執行，並且想要在中東 （阿拉伯聯合大公國）、亞太區域 （海德拉巴）、以色列 （特拉維夫）、歐洲 （蘇黎世）、中東 （阿拉伯聯合大公國）、亞太區域 （墨爾本） 或亞太區域 （雅加達） 區域執行，您可能需要使用更新的連接器重建應用程式封存，或升級至 Flink 1.18。  
Apache Flink 連接器存放在自己的開放原始碼儲存庫中。如果您要升級至 1.18 版或更新版本，則必須更新您的相依性。若要存取 Apache Flink AWS 連接器的儲存庫，請參閱 [flink-connector-aws](https://github.com/apache/flink-connector-aws)。  
之前的 Kinesis 來源`org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer`已停止，未來可能會隨著 Flink 版本而移除。請改用 [Kinesis 來源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)。  
`FlinkKinesisConsumer` 和 之間沒有狀態相容性`KinesisStreamsSource`。如需詳細資訊，請參閱 Apache Flink 文件中的[將現有任務遷移至新的 Kinesis 串流來源](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)。  
 以下是建議的準則：  


**連接器升級**  

| Flink 版本 | 使用的連接器 | Resolution | 
| --- | --- | --- | 
| 1.19、1.20 | Kinesis 來源 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Kinesis Data Streams 來源連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)。  | 
| 1.19、1.20 | Kinesis 接收器 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Kinesis Data Streams 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)。  | 
| 1.19、1.20 | DynamoDB 串流來源 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 DynamoDB Streams 來源連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon DynamoDB 連接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。  | 
| 1.19、1.20 | DynamoDB 接收器 | 升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 DynamoDB 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon DynamoDB 連接器](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)。 | 
| 1.19、1.20 | Amazon SQS 接收器 |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Amazon SQS 接收器連接器。這必須是任何 5.0.0 版或更新版本。如需詳細資訊，請參閱 [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)。  | 
| 1.19、1.20 | Amazon Managed Service for Prometheus Sink |  升級至 Managed Service for Apache Flink 1.19 和 1.20 版時，請確定您使用的是最新的 Amazon Managed Service for Prometheus 接收器連接器。這必須是任何 1.0.0 版或更新版本。如需詳細資訊，請參閱 [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)。  | 

# 將串流資料來源新增至 Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink 提供了連接器，用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中，您可以使用 [Apache Flink 來源](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)接收來自串流的資料。本節說明可用於 Amazon 服務的來源。

## 使用 Kinesis 資料串流
<a name="input-streams"></a>

`KinesisStreamsSource` 會從 Amazon Kinesis 資料串流提供串流資料到您的應用程式。

### 建立 `KinesisStreamsSource`
<a name="input-streams-create"></a>

以下程式碼範例示範如何建立 `KinesisStreamsSource`：

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

如需使用 的詳細資訊`KinesisStreamsSource`，請參閱 Apache Flink 文件中的 [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)，以及 [Github 上的公有 KinesisConnectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

### 建立`KinesisStreamsSource`使用 EFO 取用者的
<a name="input-streams-efo"></a>

現在`KinesisStreamsSource`支援[增強型廣發 (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)。

如果 Kinesis 取用者使用 EFO，Kinesis Data Streams 服務會提供專屬頻寬，而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。

如需搭配 Kinesis 取用者使用 EFO 的詳細資訊，請參閱 [ FLIP-128：增強的 AWS Kinesis 取用者扇出](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)。

您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者：
+ **READER\$1TYPE：**將此參數設定為 **EFO**，讓您的應用程式使用 EFO 取用者存取 Kinesis Data Stream 資料。
+ **EFO\$1CONSUMER\$1NAME**：將此參數設定為字串值，確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱，將導致先前使用該名稱的使用者遭到終止。

若要設定 `KinesisStreamsSource` 使用 EFO，請將下列參數新增至取用者：

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

如需使用 EFO 消費者的 Managed Service for Apache Flink 應用程式範例，請參閱 [Github 上的公有 Kinesis Connectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)。

## 使用 Amazon MSK
<a name="input-msk"></a>

此 `KafkaSource` 來源將來自 Amazon MSK 主題的串流資料提供給應用程式。

### 建立 `KafkaSource`
<a name="input-msk-create"></a>

以下程式碼範例示範如何建立 `KafkaSource`：

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

如需如何使用 `KafkaSource` 的詳細資訊，請參閱 [MSK 複寫](earlier.md#example-msk)。

# 在 Managed Service for Apache Flink 中使用接收器寫入資料
<a name="how-sinks"></a>

在應用程式程式碼中，您可以使用任何 [Apache Flink 接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/)連接器來寫入外部系統，包括 Kinesis Data Streams 和 DynamoDB 等 AWS 服務。

Apache Flink 也為檔案和通訊端提供接收器，而且您可以實作自訂接收器。在數個支援的接收器中，經常使用下列項目：

## 使用 Kinesis 資料串流
<a name="sinks-streams"></a>

Apache Flink 在《Apache Flink 文件》中提供了 [Kinesis Data Streams 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)的相關資訊。

如需使用 Kinesis 資料串流進行輸入和輸出的應用程式範例，請參閱 [教學課程：開始使用 Managed Service for Apache Flink 中的 DataStream API](getting-started.md)。

## 使用 Apache Kafka 和 Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

[Apache Flink Kafka 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)提供廣泛的支援，可將資料發佈至 Apache Kafka 和 Amazon MSK，包括恰好一次的保證。若要了解如何寫入 Kafka，請參閱 Apache Flink 文件中的 [Kafka Connectors 範例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)。

## 使用 Amazon S3
<a name="sinks-s3"></a>

您可以使用 Apache Flink `StreamingFileSink` 將物件寫入 Amazon S3 儲存貯體。

如需如何將物件寫入 S3 的範例，請參閱[範例：寫入 Amazon S3 儲存貯體](earlier.md#examples-s3)。

## 使用 Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` 是可靠、可擴展的 Apache Flink 接收器，用於使用 [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) 服務存放應用程式輸出。本節說明如何建立 Maven 專案以建立和使用 `FlinkKinesisFirehoseProducer`。

**Topics**
+ [建立 `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` 程式碼範例](#sinks-firehose-sample)

### 建立 `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

以下程式碼範例示範如何建立 `FlinkKinesisFirehoseProducer`：

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` 程式碼範例
<a name="sinks-firehose-sample"></a>

下列程式碼範例示範如何建立和設定 ，`FlinkKinesisFirehoseProducer`以及將資料從 Apache Flink 資料串流傳送至 Firehose 服務。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

如需如何使用 Firehose 接收器的完整教學課程，請參閱 [範例：寫入 Firehose](earlier.md#get-started-exercise-fh)。

# 在 Managed Service for Apache Flink 中使用非同步 I/O
<a name="how-async"></a>

非同步 I/O 運算子使用外部資料來源 (例如資料庫) 來富集串流資料。Managed Service for Apache Flink 以非同步方式富集串流事件，以便批次處理請求來提高效率。

如需詳細資訊，請參閱 Apache Flink 文件中的[非同步 I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)。

# 使用 Managed Service for Apache Flink 中的運算子搭配 DataStream API 轉換資料
<a name="how-operators"></a>

若要在 Managed Service for Apache Flink 中轉換傳入的資料，請使用 Apache Flink *運算子*。Apache Flink 運算子可將一或多個資料串流轉換為新的資料串流。新的資料串流包含來自原始資料串流的修改資料。Apache Flink 提供了超過 25 個預先建置的串流處理運算子。如需詳細資訊，請參閱 Apache Flink 文件中的[運算子](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)。

**Topics**
+ [使用轉換運算子](#how-operators-transform)
+ [使用彙總運算子](#how-operators-agg)

## 使用轉換運算子
<a name="how-operators-transform"></a>

以下是在 JSON 資料串流的其中一個欄位上進行簡單文字轉換的範例。

此程式碼會建立轉換後的資料串流。新資料串流具有與原始串流相同的資料，並在 `TICKER` 欄位內容後面附加 ` Company` 字串。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 使用彙總運算子
<a name="how-operators-agg"></a>

以下是彙總運算子的範例。程式碼會建立彙總的資料串流。運算子會建立 5 秒的翻轉視窗，並傳回視窗中具有相同 `TICKER` 值之記錄的 `PRICE` 值總和。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

如需更多程式碼範例，請參閱 [建立和使用 Managed Service for Apache Flink 應用程式的範例](examples-collapsibles.md)。

# 使用 DataStream API 追蹤 Managed Service for Apache Flink 中的事件
<a name="how-time"></a>

Managed Service for Apache Flink 使用下列時間戳記來追蹤事件：
+ **處理時間：**指正在執行相應操作的機器的系統時間。
+ **事件時間：**指每個個別事件在其生產設備上發生的時間。
+ **擷取時間：**指事件進入 Managed Service for Apache Flink 服務的時間。

您可以使用 設定串流環境使用的時間`setStreamTimeCharacteristic`。

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

如需時間戳記的詳細資訊，請參閱 Apache Flink 文件中的[產生浮水印](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)。

# 檢閱資料表 API 元件
<a name="how-table"></a>

Apache Flink 應用程式使用 [Apache Flink 資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/)，透過關聯式模型與串流中的資料互動。您可以使用資料表 API 來存取使用資料表來源的資料，然後使用資料表函數來轉換和篩選資料表資料。您可以使用 API 函數或 SQL 命令來轉換和篩選表格式資料。

本節包含下列主題：
+ [資料表 API 連接器](how-table-connectors.md)：這些元件可以在應用程式與外部資料來源和目的地之間移動資料。
+ [資料表 API 時間屬性](how-table-timeattributes.md)：本主題說明 Managed Service for Apache Flink 如何在使用資料表 API 時追蹤事件。

# 資料表 API 連接器
<a name="how-table-connectors"></a>

在 Apache Flink 程式設計模型中，連接器是您的應用程式用來從外部來源讀取或寫入資料的元件，例如其他服務 AWS 。

透過 Apache Flink 資料表 API，您可以使用下列類型的連接器：
+ [資料表 API 來源](#how-table-connectors-source)：您可以使用資料表 API 來源連接器以及 API 呼叫或 SQL 查詢，在 `TableEnvironment` 中建立資料表。
+ [資料表 API 接收器](#how-table-connectors-sink)：您可以使用 SQL 命令將資料表資料寫入外部來源，例如 Amazon MSK 主題或 Amazon S3 儲存貯體。

## 資料表 API 來源
<a name="how-table-connectors-source"></a>

您可以從資料串流建立資料表來源。下列程式碼會從 Amazon MSK 主題建立資料表：

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

如需資料表來源的詳細資訊，請參閱 Apache Flink 文件中的[資料表和 SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## 資料表 API 接收器
<a name="how-table-connectors-sink"></a>

若要將資料表資料寫入接收器，請在 SQL 中建立接收器，然後在 `StreamTableEnvironment` 物件上執行 SQL 型接收器。

下列程式碼範例示範如何將資料表資料寫入 Amazon S3 接收器：

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 您可以使用 `format` 參數來控制 Managed Service for Apache Flink 用來將輸出寫入接收器的格式。如需格式的相關資訊，請參閱 Apache Flink 文件中的[支援的連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## 使用者定義的來源和接收器
<a name="how-table-connectors-userdef"></a>

您可以使用現有的 Apache Kafka 連接器與其他 AWS 服務 (例如 Amazon MSK 和 Amazon S3) 之間相互傳送資料。若要與其他資料來源和目的地互動，您可以定義自己的來源和接收器。如需詳細資訊，請參閱 Apache Flink 文件中的[使用者定義來源和接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)。

# 資料表 API 時間屬性
<a name="how-table-timeattributes"></a>

資料串流中的每個記錄都有數個時間戳記，用來定義與記錄相關的事件發生的時間：
+ **事件時間**：使用者定義的時間戳記，定義建立記錄的事件發生的時間。
+ **擷取時間**：應用程式從資料串流擷取記錄的時間。
+ **處理時間**：您的應用程式處理記錄的時間。

當 Apache Flink Table API 根據記錄時間建立視窗時，您可以使用 `setStreamTimeCharacteristic`方法定義其使用的時間戳記。

如需搭配資料表 API 使用時間戳記的詳細資訊，請參閱 Apache Flink 文件中的[時間屬性](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/)和[及時串流處理](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)。

# 使用 Python 搭配 Managed Service for Apache Flink
<a name="how-python"></a>

**注意**  
如果您在使用 Apple 晶片的新 Mac 上開發 Python Flink 應用程式，可能會遇到與 PyFlink 1.15 的 Python 相依性的一些[已知問題](https://issues.apache.org/jira/browse/FLINK-26981)。在這種情況下，我們建議在 Docker 中執行 Python 解譯器。如需逐步指示，請參閱[在 Apple Silicon Mac 上進行 PyFlink 1.15 開發](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon)。

Apache Flink 2.2 版包含使用 Python 3.12 版建立應用程式的支援；Python 3.8 版的支援已移除。如需詳細資訊，請參閱 [Flink Python 文件](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/)。若要使用 Python 建立 Managed Service for Apache Flink 應用程式，請執行下列動作：
+ 使用 `main` 方法將 Python 應用程式的程式碼建立為文字檔案。
+ 將應用程式的程式碼檔案以及任何 Python 或 Java 相依性綁定到一個 zip 檔案中，然後將其上傳到 Amazon S3 儲存貯體。
+ 建立 Managed Service for Apache Flink 應用程式，並指定 Amazon S3 程式碼位置、應用程式屬性和應用程式設定。

在高層級上，Python 資料表 API 是 Java 資料表 API 周圍的一項包裝函式。如需有關 Python 資料表 API 的資訊，請參閱 Apache Flink 文件中的[資料表 API 教學課程](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/)。

# 程式設計 Managed Service for Apache Flink Python 應用程式
<a name="how-python-programming"></a>

您可以使用 Apache Flink Python 資料表 API 對 Managed Service for Apache Flink Python 應用程式進行編碼。Apache Flink 引擎將 Python 資料表 API 陳述式 (在 Python 虛擬機中執行) 轉換為 Java 資料表 API 陳述式 (在 Java 虛擬機中執行)。

執行下列動作以使用 Python 資料表 API：
+ 建立對 `StreamTableEnvironment` 的參考。
+ 透過對 `table` 參考執行查詢，從來源串流資料建立 `StreamTableEnvironment` 物件。
+ 對 `table` 物件執行查詢以建立輸出資料表。
+ 使用 `StatementSet` 將輸出資料表寫入目的地。

若要開始在 Managed Service for Apache Flink 中使用 Python 資料表 API，請參閱 [Amazon Managed Service for Apache Flink for Python 入門](gs-python.md)。

## 讀取和寫入串流資料
<a name="how-python-programming-readwrite"></a>

若要讀取和寫入串流資料，請在資料表環境中執行 SQL 查詢。

### 建立資料表
<a name="how-python-programming-readwrite-createtable"></a>

下列程式碼範例示範可建立 SQL 查詢的使用者定義函數。SQL 查詢會建立與 Kinesis 串流互動的資料表：

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### 讀取串流資料
<a name="how-python-programming-readwrite-read"></a>

下列程式碼範例示範如何在資料表環境參考上使用先前的 `CreateTable` SQL 查詢來讀取資料：

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### 寫入串流資料
<a name="how-python-programming-readwrite-write"></a>

下列程式碼範例示範如何使用 `CreateTable` 範例中的 SQL 查詢來建立輸出資料表參考，以及如何使用 `StatementSet` 與資料表互動，以將資料寫入目的地 Kinesis 串流：

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## 讀取執行期屬性
<a name="how-python-programming-properties"></a>

您可以使用執行期屬性來設定應用程式，而無需變更應用程式的程式碼。

您可以指定應用程式的應用程式屬性，指定方式與 Managed Service for Apache Flink Java 應用程式相同。您可採用以下方式來指定執行期屬性：
+ 使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作。
+ 使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作。
+ 使用主控台設定應用程式。

您可以讀取 Managed Service for Apache Flink 執行期所建立的名為 `application_properties.json` 的 json 檔案，藉此擷取程式碼中的應用程式屬性。

下列程式碼範例示範從 `application_properties.json` 檔案讀取應用程式屬性：

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

下列使用者定義函數的程式碼範例示範如何從應用程式屬性物件讀取屬性群組：擷取：

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

下列程式碼範例示範從上一個範例傳回的屬性群組讀取名為 INPUT\$1STREAM\$1KEY 的屬性：

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## 建立應用程式的程式碼套件
<a name="how-python-programming-package"></a>

建立 Python 應用程式之後，您就可以將程式碼檔案和相依性綁定到 zip 檔案中。

您的 zip 檔案必須包含帶有 `main` 方法的 Python 指令碼，並且可以選擇包含以下內容：
+ 其他 Python 程式碼檔案
+ JAR 檔案中使用者定義的 Java 程式碼
+ JAR 檔案中的 Java 程式庫

**注意**  
應用程式 zip 檔案必須包含應用程式的所有相依性。您無法為應用程式參考其他來源的程式庫。

# 建立 Managed Service for Apache Flink Python 應用程式
<a name="how-python-creating"></a>

## 指定您的程式碼檔案
<a name="how-python-creating-code"></a>

建立應用程式的程式碼套件之後，可將其上傳到 Amazon S3 儲存貯體。然後可以使用主控台或 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作來建立應用程式。

使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作建立應用程式時，可以使用名為 `kinesis.analytics.flink.run.options` 的特殊應用程式屬性群組在 zip 檔案中指定程式碼檔案和存檔。您可以定義下列類型的檔案：
+ **python**：包含 Python 主要方法的文字檔案。
+ **jarfile**：包含 Java 使用者定義函數的 Java JAR 檔案。
+ **pyFiles**：包含應用程式要使用之資源的 Python 資源檔案。
+ **pyArchives**：包含應用程式資源檔案的 zip 檔案。

如需 Apache Flink Python 程式碼檔案類型的詳細資訊，請參閱 Apache Flink 文件中的[命令列界面](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/)。

**注意**  
Managed Service for Apache Flink 不支援 `pyModule`、`pyExecutable`、或 `pyRequirements` 檔案類型。所有程式碼、請求和相依性都必須在 zip 檔案中。您無法指定要使用 pip 安裝的相依性。

以下範例 json 程式碼片段示範如何指定檔案在應用程式 zip 檔案中的位置：

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# 監控 Managed Service for Apache Flink Python 應用程式
<a name="how-python-monitoring"></a>

您可以使用應用程式的 CloudWatch 日誌來監控 Managed Service for Apache Flink Python 應用程式。

Managed Service for Apache Flink 記錄適用於 Python 應用程式的下列訊息：
+ 在應用程式的 `main` 方法中使用 `print()` 寫入主控台的訊息。
+ 使用 `logging` 套件以使用者定義函數的形式傳送的訊息。下列程式碼範例示範從使用者定義的函數寫入應用程式日誌：

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ 應用程式擲出的錯誤訊息。

  如果應用程式在 `main` 函數中擲出例外狀況，該例外狀況將出現在應用程式的日誌中。

  下列範例示範從 Python 程式碼擲回例外狀況的日誌項目：

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**注意**  
由於效能問題，建議只在應用程式開發期間使用自訂日誌訊息。

## 使用 CloudWatch Insights 查詢日誌
<a name="how-python-monitoring-insights"></a>

下列 CloudWatch Insights 查詢會在執行應用程式的主函數時，搜尋 Python 進入點所建立的日誌：

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# 在 Managed Service for Apache Flink 中使用執行期屬性
<a name="how-properties"></a>

您可以使用*執行期屬性*來設定應用程式，而無需重新編譯應用程式的程式碼。

**Topics**
+ [使用主控台管理執行期屬性](#how-properties-console)
+ [使用 CLI 管理執行期屬性](#how-properties-cli)
+ [在 Managed Service for Apache Flink 應用程式中存取執行期屬性](#how-properties-access)

## 使用主控台管理執行期屬性
<a name="how-properties-console"></a>

您可以使用 從 Managed Service for Apache Flink 應用程式新增、更新或移除執行期屬性 AWS 管理主控台。

**注意**  
如果您使用的是舊版支援的 Apache Flink，而且想要將現有應用程式升級至 Apache Flink 1.19.1，則可以使用就地 Apache Flink 版本升級來執行此操作。透過就地版本升級，您可以保留跨 Apache Flink 版本的單一 ARN 的應用程式可追蹤性，包括快照、日誌、指標、標籤、Flink 組態等。您可以在 `RUNNING`和 `READY` 狀態使用此功能。如需詳細資訊，請參閱[針對 Apache Flink 使用就地版本升級](how-in-place-version-upgrades.md)。

**更新 Managed Service for Apache Flink 應用程式的執行期屬性**

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 選擇 Managed Service for Apache Flink 應用程式。選擇**應用程式詳細資訊**。

1. 在應用程式頁面，選擇**設定**。

1. 展開**屬性**區段。

1. 使用**屬性**區段中的控制項，以鍵值對定義屬性群組。可以使用這些控制項來新增、更新或移除屬性群組和執行期屬性。

1. 選擇**更新**。

## 使用 CLI 管理執行期屬性
<a name="how-properties-cli"></a>

您可以使用 [AWS CLI](https://docs.aws.amazon.com/cli) 新增、更新或移除執行期屬性。

本節包含針對設定應用程式執行期屬性之 API 動作的範例請求。如需如何使用 JSON 檔案作為 API 動作輸入的相關資訊，請參閱 [Managed Service for Apache Flink API 範例程式碼](api-examples.md)。

**注意**  
使用您的帳戶 ID 取代以下範例中的範例帳戶 ID (*`012345678901`*)。

### 在建立應用程式時新增執行期屬性
<a name="how-properties-create"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 動作的下列範例請求會在您建立應用程式時新增兩個執行期屬性群組 (`ProducerConfigProperties` 和 `ConsumerConfigProperties`)：

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### 在現有應用程式中新增和更新執行期屬性
<a name="how-properties-update"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列範例請求會新增或更新現有應用程式的執行期屬性：

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**注意**  
如果您使用的索引鍵在屬性群組中沒有對應的執行期屬性，Managed Service for Apache Flink 會將鍵值對新增為新屬性。如果您將索引鍵用於屬性群組中的現有執行期屬性，Managed Service for Apache Flink 會更新屬性值。

### 移除執行期屬性
<a name="how-properties-remove"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列範例請求會從現有應用程式中移除所有執行期屬性和屬性群組：

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**重要**  
如果您省略現有屬性群組或屬性群組中的現有屬性索引鍵，則會移除該屬性群組或屬性。

## 在 Managed Service for Apache Flink 應用程式中存取執行期屬性
<a name="how-properties-access"></a>

您可以使用可傳回 `Map<String, Properties>` 物件的靜態 `KinesisAnalyticsRuntime.getApplicationProperties()` 方法，在 Java 應用程式的程式碼中擷取執行期屬性。

下列 Java 程式碼範例會擷取應用程式的執行期屬性：

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

您可以擷取屬性群組 (作為 `Java.Util.Properties` 物件)，如下所示：

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

您通常透過傳入 `Properties` 物件來設定 Apache Flink 來源或接收器，而無需擷取個別屬性。下列程式碼範例示範如何透過傳入從執行期屬性擷取的 `Properties` 物件來建立 Flink 來源：

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

如需程式碼範例，請參閱 [建立和使用 Managed Service for Apache Flink 應用程式的範例](examples-collapsibles.md)。

# 將 Apache Flink 連接器與 Managed Service for Apache Flink 搭配使用
<a name="how-flink-connectors"></a>

Apache Flink 連接器是將資料移入和移出 Amazon Managed Service for Apache Flink 應用程式的軟體元件。連接器是靈活的整合，可讓您從檔案和目錄讀取。連接器包含用於與 Amazon 服務和第三方系統互動的完整模組。

連接器包含下列類型：
+ **來源：**從 Kinesis 資料串流、檔案、Apache Kafka 主題、檔案或其他資料來源將資料提供給應用程式。
+ **接收器：**從您的應用程式將資料傳送至 Kinesis 資料串流、Firehose 串流、Apache Kafka 主題或其他資料目的地。
+ **非同步 I/O：**提供非同步存取資料來源，例如資料庫，以擴充串流。

Apache Flink 連接器存放在自己的來源儲存庫中。Apache Flink 連接器的版本和成品會根據您使用的 Apache Flink 版本，以及您使用的是 DataStream、Table 或 SQL API。

Amazon Managed Service for Apache Flink 支援超過 40 個預先建置的 Apache Flink 來源和接收器連接器。下表提供最常用的連接器及其相關版本的摘要。您也可以使用非同步接收器架構來建置自訂接收器。如需詳細資訊，請參閱 Apache Flink 文件中的[一般非同步基礎接收器](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/)。

 若要存取 Apache Flink AWS 連接器的儲存庫，請參閱 [flink-connector-aws](https://github.com/apache/flink-connector-aws)。

## Flink 2.2 的連接器
<a name="connectors-flink-2-2"></a>

升級至 Flink 2.2 時，您需要將連接器相依性更新為與 Flink 2.x 執行時間相容的版本。Flink 連接器獨立於 Flink 執行期發行，而且並非所有連接器都有 Flink 2.x 相容版本。下表摘要說明截至撰寫時 Amazon Managed Service for Apache Flink 中常用連接器的可用性：


**Flink 2.2 的連接器**  

| 連接器 | Flink 2.0\$1 版本 | 備註 | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | 建議用於 Flink 2.2 | 
| Kinesis Data Streams （來源） | flink-connector-aws-kinesis-streams 6.0.0-2.0 | 建議用於 Flink 2.2 | 
| Kinesis Data Streams （接收器） | flink-connector-aws-kinesis-streams 6.0.0-2.0 | 建議用於 Flink 2.2 | 
| FileSystem (S3、HDFS) | 與 Flink 綁定 | 內建於 Flink 分佈中 — 一律可用 | 
| JDBC | 尚未針對 2.x 發行 | 沒有可用的 Flink 2.x 相容版本 | 
| OpenSearch | 尚未針對 2.x 發行 | 沒有可用的 Flink 2.x 相容版本 | 
| Elasticsearch | 尚未針對 2.x 發行 | 考慮遷移至 OpenSearch 連接器 | 
| Amazon Managed Service for Prometheus | 尚未針對 2.x 發行 | 寫入時沒有 Flink 2.x 相容版本 | 

如果您的應用程式依賴的連接器尚未有 Flink 2.2 版本，您有兩個選項：等待連接器發行相容的版本，或評估是否可以用替代版本取代它 （例如，使用 JDBC 目錄或自訂接收器）。

**已知問題**
+ 當 Kinesis 串流進行重新分片時，連接器 v5.0.0 和 v6.0.0 中引入`KinesisStreamsSource`搭配 EFO （增強廣發/SubscribeToShard) 路徑使用 的應用程式可能會失敗。這是社群中的已知問題。如需詳細資訊，請參閱 [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648)。
+ 如果 Flink 應用程式處於背壓狀態，則連接器 v5.0.0 和 v6.0.0 中引入`KinesisStreamsSource`搭配 EFO （增強廣發/SubscribeToShard) 路徑使用 的應用程式`KinesisStreamsSink`可能會遇到死結，導致一個或多個 TaskManagers 中的資料處理完全停止。需要強制停止操作和啟動應用程式操作才能復原應用程式。這是社群中已知問題的子案例：[FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071)。

## 舊版 Flink 的連接器
<a name="connectors-older-versions"></a>


**舊版 Flink 的連接器**  

| 連接器 | Flink 1.15 版 | Flink 1.18 版 | Flink 1.19 版 | Flink 1.20 版 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream - 來源 - DataStream 和資料表 API | flink-connector-kinesis，1.15.4 | flink-connector-kinesis，4.3.0-1.18 | flink-connector-kinesis，5.0.0-1.19 | flink-connector-kinesis，5.0.0-1.20 | 
| Kinesis Data Stream - Sink - DataStream 和資料表 API | flink-connector-aws-kinesis-streams，1.15.4 | flink-connector-aws-kinesis-streams，4.3.0-1.18 | flink-connector-aws-kinesis-streams，5.0.0-1.19 | flink-connector-aws-kinesis-streams，5.0.0-1.20 | 
| Kinesis Data Streams - 來源/接收器 - SQL | flink-sql-connector-kinesis，1.15.4 | flink-sql-connector-kinesis，4.3.0-1.18 | flink-sql-connector-kinesis，5.0.0-1.19 | flink-sql-connector-kinesis-streams，5.0.0-1.20 | 
| Kafka - DataStream 和資料表 API | flink-connector-kafka，1.15.4 | flink-connector-kafka，3.2.0-1.18 | flink-connector-kafka，3.3.0-1.19 | flink-connector-kafka，3.3.0-1.20 | 
| Kafka - SQL | flink-sql-connector-kafka，1.15.4 | flink-sql-connector-kafka，3.2.0-1.18 | flink-sql-connector-kafka，3.3.0-1.19 | flink-sql-connector-kafka，3.3.0-1.20 | 
| Firehose - DataStream 和資料表 API | flink-connector-aws-kinesis-firehose，1.15.4 | flink-connector-aws-firehose，4.3.0-1.18 | flink-connector-aws-firehose，5.0.0-1.19 | flink-connector-aws-firehose，5.0.0-1.20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose，1.15.4 | flink-sql-connector-aws-firehose，4.3.0-1.18 | flink-sql-connector-aws-firehose，5.0.0-1.19 | flink-sql-connector-aws-firehose，5.0.0-1.20 | 
| DynamoDB - DataStream 和資料表 API | flink-connector-dynamodb，3.0.0-1.15 | flink-connector-dynamodb，4.3.0-1.18 | flink-connector-dynamodb，5.0.0-1.19 | flink-connector-dynamodb，5.0.0-1.20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb，3.0.0-1.15 | flink-sql-connector-dynamodb，4.3.0-1.18 | flink-sql-connector-dynamodb，5.0.0-1.19 | flink-sql-connector-dynamodb，5.0.0-1.20 | 
| OpenSearch - DataStream 和資料表 API | - | flink-connector-opensearch，1.2.0-1.18 | flink-connector-opensearch，1.2.0-1.19 | flink-connector-opensearch，1.2.0-1.19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch，1.2.0-1.18 | flink-sql-connector-opensearch，1.2.0-1.19 | flink-sql-connector-opensearch，1.2.0-1.19 | 
| Amazon Managed Service for Prometheus DataStream | - | flink-sql-connector-opensearch，1.2.0-1.18 | flink-connector-prometheus，1.0.0-1.19 | flink-connector-prometheus，1.0.0-1.20 | 
| Amazon SQS DataStream 和資料表 API | - | flink-sql-connector-opensearch，1.2.0-1.18 | flink-connector-sqs、5.0.0-1.19 | flink-connector-sqs、5.0.0-1.20 | 

若要進一步了解 Amazon Managed Service for Apache Flink 中的連接器，請參閱：
+ [DataStream API 連接器](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [資料表 API 連接器](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### 已知問題
<a name="connectors-known-issues"></a>

Apache Flink 1.15 中的 Apache Kafka 連接器存在已知的開放原始碼 Apache Flink 問題。此問題已在更新版本的 Apache Flink 中解決。

如需詳細資訊，請參閱[已知問題](flink-1-15-2.md#flink-1-15-known-issues)。

# 在 Managed Service for Apache Flink 中實作容錯能力
<a name="how-fault"></a>

檢查點是用於在 Amazon Managed Service for Apache Flink 中實作容錯能力的方法。*檢查點*是執行中應用程式的最新備份，可用來從意外的應用程式中斷或容錯移轉中立即復原。

如需 Apache Flink 應用程式中檢查點的詳細資訊，請參閱 Apache Flink 文件中的[檢查點](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/)。

*快照*是手動建立和管理的應用程式狀態備份。快照可讓您透過呼叫 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 將應用程式還原到先前的狀態。如需詳細資訊，請參閱[使用快照管理應用程式備份](how-snapshots.md)。

如果應用程式已啟用檢查點，則服務會在意外的應用程式重新啟動時建立並載入應用程式資料的備份，藉此提供容錯能力。這些意外的應用程式重新啟動可能是由於意外的作業重新啟動、執行個體失敗等原因造成。這為應用程式提供了與在這些重新啟動期間無故障執行時相同的語義。

如果已為應用程式啟用快照，並使用應用程式的 [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) 進行設定，則服務會在應用程式更新期間或在與服務相關的擴展或維護期間提供恰好一次的處理語義。

## 在 Managed Service for Apache Flink 中設定檢查點
<a name="how-fault-configure"></a>

您可以設定應用程式的檢查點行為。您可以定義應用程式是否保持檢查點狀態、應用程式將狀態儲存至檢查點的頻率，以及某個檢查點操作結束與另一個檢查點操作開始之間的最小間隔。

您可以使用 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 或 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) API 作業來設定下列設定：
+ `CheckpointingEnabled`：指示是否已在應用程式中啟用檢查點。
+ `CheckpointInterval`：包含檢查點 (持續性) 作業之間的時間 (毫秒)。
+ `ConfigurationType`：將此值設定為 `DEFAULT` 以使用預設檢查點行為。將此值設定為 `CUSTOM` 以設定其他值。
**注意**  
預設的檢查點行為如下：  
**CheckpointingEnabled：**true
**CheckpointInterval：**60000
**MinPauseBetweenCheckpoints：**5000
如果 **ConfigurationType** 設定為 `DEFAULT`，即使使用 或應用程式程式碼中的值設定為其他值 AWS Command Line Interface，也會使用上述值。
**注意**  
對於 Flink 1.15 以上版本，Managed Service for Apache Flink 將在自動建立快照期間 (也就是應用程式更新、擴展或停止時) 使用 `stop-with-savepoint`。
+ `MinPauseBetweenCheckpoints`：檢查點操作結束與另一個檢查點操作開始之間的最短時間 (毫秒)。設定此值可防止在檢查點操作時間超過 `CheckpointInterval` 時，應用程式持續執行檢查點。

## 檢閱檢查點 API 範例
<a name="how-fault-examples"></a>

本節包含針對設定應用程式檢查點之 API 動作的範例請求。如需如何使用 JSON 檔案作為 API 動作輸入的相關資訊，請參閱 [Managed Service for Apache Flink API 範例程式碼](api-examples.md)。

### 設定新應用程式的檢查點
<a name="how-fault-examples-create-config"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 動作的下列請求範例會在您建立應用程式時設定檢查點：

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### 停用新應用程式的檢查點
<a name="how-fault-examples-create-disable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 動作的下列請求範例會在您建立應用程式時停用檢查點：

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### 設定現有應用程式的檢查點
<a name="how-fault-examples-update-config"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列範例請求會為現有的應用程式設定檢查點：

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### 停用現有應用程式的檢查點
<a name="how-fault-examples-update-update-disable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列範例請求會為現有的應用程式停用檢查點：

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# 使用快照管理應用程式備份
<a name="how-snapshots"></a>

*快照*是 Apache Flink *儲存點*的 Managed Service for Apache Flink 實作。快照是使用者或服務觸發、建立和管理的應用程式狀態備份。如需 Apache Flink 儲存點的相關資訊，請參閱 Apache Flink 文件中的[儲存點](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)。使用快照，您可以從應用程式狀態的特定快照重新啟動應用程式。

**注意**  
建議您的應用程式每天建立數次快照，以便使用正確的狀態資料正確重新啟動。快照的正確頻率取決於應用程式的業務邏輯。經常拍攝快照可讓您復原較新的資料，但會增加成本並需要更多系統資源。

在 Managed Service for Apache Flink 中，您使用下列 API 動作來管理快照：
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

如需每個應用程式的快照數目限制，請參閱[Managed Service for Apache Flink 和 Studio 筆記本配額](limits.md)。如果您的應用程式達到快照數目限制，則手動建立快照會失敗，並顯示 `LimitExceededException`。

Managed Service for Apache Flink 永遠不會刪除快照。您必須使用 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 動作手動刪除快照。

若要在啟動應用程式時載入儲存的應用程式狀態快照，請使用 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 或 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) 參數。

**Topics**
+ [管理自動建立快照](#how-fault-snapshot-update)
+ [從包含不相容狀態資料的快照還原](#how-fault-snapshot-restore)
+ [檢閱快照 API 範例](#how-fault-snapshot-examples)

## 管理自動建立快照
<a name="how-fault-snapshot-update"></a>

如果 `SnapshotsEnabled` 在應用程式的 [ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html) 中設定為 `true`，Managed Service for Apache Flink 會在應用程式更新、擴展或停止時自動建立並使用快照，以提供恰好一次的處理語義。

**注意**  
將 `ApplicationSnapshotConfiguration::SnapshotsEnabled` 設定為 `false` 會導致應用程式更新期間資料遺失。

**注意**  
Managed Service for Apache Flink 會在快照建立期間觸發中繼儲存點。對於 Flink 版本 1.15 或更高版本，中繼儲存點不會再造成任何副作用。請參閱[觸發儲存點](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)。

自動建立的快照具有下列特質：
+ 快照由服務管理，但您可以使用 [ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) 動作查看快照。自動建立的快照會根據您的快照限制計數。
+ 如果您的應用程式超過快照限制，手動建立的快照將會失敗，但 Managed Service for Apache Flink 服務在應用程式更新、擴展或停止時仍會成功建立快照。您必須先使用 [DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 動作手動刪除快照，然後才能手動建立更多快照。

## 從包含不相容狀態資料的快照還原
<a name="how-fault-snapshot-restore"></a>

由於快照包含運算子的資訊，因此從快照還原運算子的狀態資料 (自上一應用程式版本以來已變更) 可能會產生非預期的結果。如果應用程式嘗試從不對應於目前運算子的快照還原狀態資料，應用程式將會發生錯誤。錯誤的應用程式將卡在 `STOPPING` 或 `UPDATING` 狀態。

[若要允許應用程式從包含不相容狀態資料的快照還原，請使用 UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作將 [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html) 的 `AllowNonRestoredState` 參數設定為 `true`。

從過時的快照還原應用程式時，您會看到下列行為：
+ **新增運算子：**如果新增了新的運算子，則儲存點沒有該新運算子的狀態資料。不會發生任何錯誤，並且不必設定 `AllowNonRestoredState`。
+ **刪除運算子：**如果現有的運算子被刪除，則儲存點會有該遺失運算子的狀態資料。除非將 `AllowNonRestoredState` 設定為 `true`，否則會發生錯誤。
+ **修改運算子：**如果進行了相容的變更，例如將參數類型變更為相容類型，應用程式就可以從過時的快照還原。如需從快照還原的詳細資訊，請參閱 Apache Flink 文件中的[儲存點](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)。使用 Apache Flink 1.8 版或更新版本的應用程式可能可以從具有不同結構描述的快照還原。使用 Apache Flink 1.6 版的應用程式無法還原。對於two-phase-commit接收器，我們建議使用系統快照 (SwS)，而不是使用者建立的快照 (CreateApplicationSnapshot)。

  對於 Flink，Managed Service for Apache Flink 會在快照建立期間觸發中繼儲存點。對於 Flink 版本 1.15 以上版本，中繼儲存點不會再造成任何副作用。請參閱[觸發儲存點](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)。

如果您需要恢復與現有儲存點資料不相容的應用程式，建議將 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 動作的 `ApplicationRestoreType` 參數設定為 `SKIP_RESTORE_FROM_SNAPSHOT`，以略過從快照還原。

如需 Apache Flink 如何處理不相容狀態資料的詳細資訊，請參閱《Apache Flink 文件》**中的[狀態結構描述演進](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/)。

## 檢閱快照 API 範例
<a name="how-fault-snapshot-examples"></a>

本節包括將快照與應用程式搭配使用的 API 動作的範例請求。如需如何使用 JSON 檔案作為 API 動作輸入的相關資訊，請參閱 [Managed Service for Apache Flink API 範例程式碼](api-examples.md)。

### 啟用應用程式的快照
<a name="how-fault-savepoint-examples-enable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列請求範例可為應用程式啟用快照：

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### 建立快照
<a name="how-fault-savepoint-examples-create"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) 動作的下列範例請求可建立目前應用程式狀態的快照：

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### 列出應用程式的快照
<a name="how-fault-snapshot-examples-list"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) 動作的下列範例請求會列出目前應用程式狀態的前 50 個快照：

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### 列出應用程式快照的詳細資訊
<a name="how-fault-snapshot-examples-describe"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) 動作的下列請求範例會列出特定應用程式快照的詳細資訊：

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### 刪除快照
<a name="how-fault-snapshot-examples-delete"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 動作的下列請求範例會刪除先前儲存的快照。您可以使用 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html)或 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) 取得 `SnapshotCreationTimestamp` 值：

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### 使用具名快照重新啟動應用程式
<a name="how-fault-snapshot-examples-load-custom"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 動作的下列請求範例會使用特定快照中的已儲存狀態來啟動應用程式：

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### 使用最新的快照重新啟動應用程式
<a name="how-fault-snapshot-examples-load-recent"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 動作的下列請求範例會使用最新快照來啟動應用程式：

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### 不使用快照重新啟動應用程式
<a name="how-fault-snapshot-examples-load-none"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) 動作的下列請求範例會在不載入應用程式狀態的情況下啟動應用程式，即使有快照也是如此：

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# 針對 Apache Flink 使用就地版本升級
<a name="how-in-place-version-upgrades"></a>

透過 Apache Flink 的就地版本升級，您可以保留跨 Apache Flink 版本針對單一 ARN 的應用程式可追蹤性。這包括快照、日誌、指標、標籤、Flink 組態、資源限制增加、VPCs等。

您可以為 Apache Flink 執行就地版本升級，將現有應用程式升級至 Amazon Managed Service for Apache Flink 中的新 Flink 版本。若要執行此任務，您可以使用 AWS CLI、、 AWS CloudFormation AWS SDK 或 AWS 管理主控台。

**注意**  
您無法將 Apache Flink 的就地版本升級與 Amazon Managed Service for Apache Flink Studio 搭配使用。

**Topics**
+ [使用 Apache Flink 的就地版本升級來升級應用程式](upgrading-applications.md)
+ [將您的應用程式升級至新的 Apache Flink 版本](upgrading-application-new-version.md)
+ [復原應用程式升級](rollback.md)
+ [應用程式升級的一般最佳實務和建議](best-practices-recommendations.md)
+ [應用程式升級的注意事項和已知問題](precautions.md)
+ [升級到 Flink 2.2：完成指南](flink-2-2-upgrade-guide.md)
+ [Flink 2.2 升級的狀態相容性指南](state-compatibility.md)

# 使用 Apache Flink 的就地版本升級來升級應用程式
<a name="upgrading-applications"></a>

開始之前，建議您觀看此影片：[就地版本升級](https://www.youtube.com/watch?v=f1qGGdaP2XI)。

若要執行 Apache Flink 的就地版本升級，您可以使用 AWS CLI、、 AWS CloudFormation AWS SDK 或 AWS 管理主控台。您可以將此功能與 `READY`或 `RUNNING` 狀態的 Managed Service for Apache Flink 搭配使用的任何現有應用程式搭配使用。它使用 UpdateApplication API 來新增變更 Flink 執行時間的功能。

## 升級之前：更新您的 Apache Flink 應用程式
<a name="before-upgrading"></a>

當您寫入 Flink 應用程式時，您可以將它們與其相依性綁定到應用程式 JAR，並將 JAR 上傳到您的 Amazon S3 儲存貯體。從那裡，Amazon Managed Service for Apache Flink 會在您選取的新 Flink 執行時間中執行任務。您可能需要更新應用程式，才能與您要升級的 Flink 執行時間相容。Flink 版本之間可能存在導致版本升級失敗的不一致。最常見的是使用來源 （輸入） 或目的地 （接收器、輸出） 和 Scala 相依性的連接器。Managed Service for Apache Flink 中的 Flink 1.15 和更新版本與 Scala 無關，您的 JAR 必須包含您計劃使用的 Scala 版本。

**更新您的應用程式**

1. 閱讀 Flink 社群有關升級狀態應用程式的建議。請參閱[升級應用程式和 Flink 版本](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)。

1. 閱讀了解問題和限制的清單。請參閱 [應用程式升級的注意事項和已知問題](precautions.md)。

1. 更新您的相依性，並在本機測試您的應用程式。這些相依性通常為：

   1. Flink 執行期和 API。

   1. 連接器建議用於新的 Flink 執行時間。您可以在要更新的特定執行時間的[發行版本](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html)中找到這些項目。

   1. Scala – Apache Flink 從 Flink 1.15 開始和包含 Flink，與 Scala 無關。您必須包含要在應用程式 JAR 中使用的 Scala 相依性。

1. 在 zipfile 上建立新的應用程式 JAR，並將其上傳至 Amazon S3。我們建議您使用與先前 JAR/zipfile 不同的名稱。如果您需要轉返，您將使用此資訊。

1. 如果您執行具狀態應用程式，強烈建議您擷取目前應用程式的快照。如果您在升級期間或之後遇到問題，這可讓您以狀態復原。

# 將您的應用程式升級至新的 Apache Flink 版本
<a name="upgrading-application-new-version"></a>

您可以使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作來升級 Flink 應用程式。

您可以透過多種方式呼叫 `UpdateApplication` API：
+ 在 上使用現有的**組態**工作流程 AWS 管理主控台。
  + 前往 上的應用程式頁面 AWS 管理主控台。
  + 選擇**設定**。
  + 選取您要從中開始的新執行期和快照，也稱為還原組態。使用最新的設定作為還原組態，從最新的快照啟動應用程式。指向 Amazon S3 上新升級的應用程式 JAR/zip。
+ 使用 AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) 動作。
+ Use CloudFormation (CFN)。
  + 更新 [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment) 欄位。先前， CloudFormation 刪除應用程式並建立新的應用程式，導致您的快照和其他應用程式歷史記錄遺失。現在會 CloudFormation 更新您的 RuntimeEnvironment，而不會刪除您的應用程式。
+ 使用 AWS SDK。
  + 如需您選擇的程式設計語言，請參閱 SDK 文件。請參閱 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)。

您可以在應用程式處於 `RUNNING` 狀態或應用程式處於停止`READY`狀態時執行升級。Amazon Managed Service for Apache Flink 會驗證原始執行時間版本與目標執行時間版本之間的相容性。當您在 `RUNNING` 狀態執行 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 時，或在`READY`處於 狀態時升級時，會在下一個 [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) 執行此相容性檢查。

## 升級處於 `RUNNING` 狀態的應用程式
<a name="upgrading-running"></a>

下列範例顯示使用 將`RUNNING`狀態為 的應用程式升級至美國東部 （維吉尼亞北部） 的 `UpgradeTest` Flink 1.18， AWS CLI 並從最新的快照啟動升級的應用程式。

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ 如果您啟用服務快照，並想要從最新的快照繼續應用程式，Amazon Managed Service for Apache Flink 會驗證目前`RUNNING`應用程式的執行時間是否與所選目標執行時間相容。
+ 如果您已指定要繼續目標執行時間的快照，Amazon Managed Service for Apache Flink 會驗證目標執行時間是否與指定的快照相容。如果相容性檢查失敗，您的更新請求會遭到拒絕，而且您的應用程式在 `RUNNING` 狀態中保持不變。
+ 如果您選擇在沒有快照的情況下啟動應用程式，Amazon Managed Service for Apache Flink 不會執行任何相容性檢查。
+ 如果您升級的應用程式失敗或卡在傳輸`UPDATING`狀態，請遵循 [復原應用程式升級](rollback.md)區段中的指示以返回運作狀態。

**執行狀態應用程式的處理流程**

![\[下圖代表在執行時升級應用程式的建議工作流程。我們假設應用程式具有狀態，且您已啟用快照。對於此工作流程，更新時，您可以從 Amazon Managed Service for Apache Flink 在更新之前自動拍攝的最新快照還原應用程式。\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/in-place-update-while-running.png)


## 升級處於 **READY** 狀態的應用程式
<a name="upgrading-ready"></a>

下列範例顯示使用 將`READY`狀態為 的應用程式升級至美國東部 （維吉尼亞北部） 的 `UpgradeTest` Flink 1 AWS CLI.18。因為應用程式未執行，所以沒有指定的快照可啟動應用程式。您可以在發出啟動應用程式請求時指定快照。

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ 您可以將處於 `READY` 狀態的應用程式執行時間更新為任何 Flink 版本。在您啟動應用程式之前，Amazon Managed Service for Apache Flink 不會執行任何檢查。
+  Amazon Managed Service for Apache Flink 只會針對您選擇啟動應用程式的快照執行相容性檢查。這些是遵循 [Flink 相容性資料表的基本相容性](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table)檢查。他們只會檢查拍攝快照的 Flink 版本，以及您要鎖定的 Flink 版本。如果所選快照的 Flink 執行時間與應用程式的新執行時間不相容，則啟動請求可能會被拒絕。

**就緒狀態應用程式的處理流程**

![\[下圖代表在就緒狀態時升級應用程式的建議工作流程。我們假設應用程式具有狀態，且您已啟用快照。對於此工作流程，更新時，您可以從應用程式停止時 Amazon Managed Service for Apache Flink 自動拍攝的最新快照還原應用程式。\]](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/images/in-place-update-while-ready.png)


# 復原應用程式升級
<a name="rollback"></a>

如果您的應用程式發生問題，或在 Flink 版本之間發現應用程式程式碼不一致，您可以使用 AWS CLI、、 AWS CloudFormation AWS SDK 或 復原 AWS 管理主控台。下列範例顯示在不同失敗案例中復原的情況。

## 執行時間升級成功，應用程式處於 `RUNNING` 狀態，但任務失敗並持續重新啟動
<a name="succeeded-restarting"></a>

假設您嘗試將名為 的狀態應用程式`TestApplication`從 Flink 1.15 升級至美國東部 （維吉尼亞北部） 的 Flink 1.18。不過，即使應用程式處於 `RUNNING` 狀態，升級的 Flink 1.18 應用程式仍無法啟動或持續重新啟動。這是常見的失敗案例。為了避免進一步停機，建議您立即將應用程式復原至先前的執行版本 (Flink 1.15)，並在稍後診斷問題。

若要將應用程式復原至先前的執行版本，請使用 [Rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI 命令或 [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API 動作。此 API 動作會復原您所做的變更，進而產生最新版本。然後，它會使用最新的成功快照重新啟動您的應用程式。

強烈建議您先使用現有應用程式拍攝快照，再嘗試升級。這將有助於避免資料遺失或必須重新處理資料。

在此失敗案例中， CloudFormation 不會為您復原應用程式。您必須更新 CloudFormation 範本以指向先前的執行時間和先前的程式碼，以強制 CloudFormation 更新應用程式。否則，CloudFormation 會假設您的應用程式在轉換為 `RUNNING` 狀態時已更新。

## 復原卡在 中的應用程式 `UPDATING`
<a name="stuck-updating"></a>

如果您的應用程式在升級嘗試後卡在 `UPDATING`或 `AUTOSCALING` 狀態，Amazon Managed Service for Apache Flink 會提供[轉返應用程式](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI 命令，或 [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API 動作，可在卡住`UPDATING`或`AUTOSCALING`狀態之前將應用程式轉返至版本。此 API 會復原您所做的變更，導致應用程式卡在 `UPDATING`或 `AUTOSCALING` 傳輸狀態。

# 應用程式升級的一般最佳實務和建議
<a name="best-practices-recommendations"></a>
+ 在嘗試生產升級之前，在非生產環境中測試沒有狀態的新任務/執行時間。
+ 考慮先使用非生產應用程式測試有狀態升級。
+ 請確定您的新任務圖表具有與您用來啟動升級應用程式的快照相容的狀態。
  + 請確定存放在運算子狀態中的類型保持不變。如果類型已變更，則 Apache Flink 無法還原運算子狀態。
  + 請確定您使用 `uid`方法設定的運算IDs 保持不變。Apache Flink 強烈建議將唯一 IDs 指派給運算子。如需詳細資訊，請參閱 Apache Flink 文件中的[指派運算子 IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids)。

    如果您未將 IDs指派給運算子，Flink 會自動產生 ID。在這種情況下，它們可能取決於程式結構，如果變更，可能會導致相容性問題。Flink 使用運算子 IDs將快照中的狀態與運算子比對。變更運算子 IDs 會導致應用程式未啟動，或快照中存放的狀態遭到捨棄，而新的運算子在沒有狀態的情況下啟動。
  + 請勿變更用於存放金鑰狀態的金鑰。
  + 請勿修改狀態運算子的輸入類型，例如視窗或聯結。這會隱含地變更運算子的內部狀態類型，導致狀態不相容。

# 應用程式升級的注意事項和已知問題
<a name="precautions"></a>

## 代理程式重新啟動後，Kafka 遞交檢查點會重複失敗
<a name="apache-kafka-connector"></a>

Flink 1.15 版中的 Apache Kafka 連接器存在已知的開放原始碼 Apache Flink 問題，原因在於 Kafka 用戶端 2.8.1 中的重大開放原始碼 Kafka 用戶端錯誤。如需詳細資訊，請參閱在[代理程式重新啟動後，Kafka 遞交檢查點會重複失敗，](https://issues.apache.org/jira/browse/FLINK-28060)且在 [ commitOffsetAsync 例外狀況之後，KafkaConsumer 無法復原與群組協調器的連線](https://issues.apache.org/jira/browse/KAFKA-13840)。

為了避免此問題，建議您在 Amazon Managed Service for Apache Flink 中使用 Apache Flink 1.18 或更新版本。

## 狀態相容性的已知限制
<a name="state-precautions"></a>
+ 如果您使用的是資料表 API，Apache Flink 不保證 Flink 版本之間的狀態相容性。如需詳細資訊，請參閱 Apache Flink 文件中的[狀態升級和演變](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution)。
+ Flink 1.6 狀態與 Flink 1.18 不相容。如果您嘗試使用 狀態從 1.6 升級到 1.18 及更新版本，API 會拒絕您的請求。您可以升級至 1.8、1.11、1.13 和 1.15，並拍攝快照，然後升級至 1.18 和更新版本。如需詳細資訊，請參閱 Apache [Flink 文件中的升級應用程式和 Flink 版本](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)。

## Flink Kinesis 連接器的已知問題
<a name="kinesis-connector-precautions"></a>
+ 如果您使用的是 Flink 1.11 或更早版本，並使用`amazon-kinesis-connector-flink`連接器進行Enhanced-fan-out(EFO) 支援，則必須採取額外的步驟，將狀態升級到 Flink 1.13 或更新版本。這是因為連接器套件名稱的變更。如需詳細資訊，請參閱 [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink)。

  Flink 1.11 和更早版本的`amazon-kinesis-connector-flink`連接器使用封裝 `software.amazon.kinesis`，而 Flink 1.13 和更新版本的 Kinesis 連接器使用 。 `org.apache.flink.streaming.connectors.kinesis`使用此工具來支援遷移：[amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)。
+ 如果您使用 Flink 1.13 或更早版本搭配 `FlinkKinesisProducer` 並升級至 Flink 1.15 或更新版本，若要有狀態升級，您必須繼續`FlinkKinesisProducer`在 Flink 1.15 或更新版本中使用 ，而非較新的 。 `KinesisStreamsSink`不過，如果您的接收器上已有自訂`uid`設定，您應該可以切換到 ，`KinesisStreamsSink`因為 `FlinkKinesisProducer`不會保持狀態。Flink 會將其視為相同的運算子，因為`uid`已設定自訂。

## 以 Scala 撰寫的 Flink 應用程式
<a name="scala-precautions"></a>
+ 從 Flink 1.15 開始，Apache Flink 在執行時間中不包含 Scala。升級至 Flink 1.15 或更新版本時，您必須在程式碼 JAR/zip 中包含要使用的 Scala 版本和其他 Scala 相依性。如需詳細資訊，請參閱 [Amazon Managed Service for Apache Flink 1.15.2 版](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html)。
+ 如果您的應用程式使用 Scala，而且您要從 Flink 1.11 或更早版本 (Scala 2.11) 升級至 Flink 1.13 (Scala 2.12)，請確定您的程式碼使用 Scala 2.12。否則，您的 Flink 1.13 應用程式可能無法在 Flink 1.13 執行時間中找到 Scala 2.11 類別。

## 降級 Flink 應用程式時應考量的事項
<a name="downgrading-precautions"></a>
+ 可以降級 Flink 應用程式，但僅限於應用程式之前使用舊版 Flink 執行的情況。對於狀態升級，Managed Service for Apache Flink 需要使用使用相符或更舊版本的快照進行降級
+ 如果您要將執行時間從 Flink 1.13 或更新版本更新為 Flink 1.11 或更新版本，而且如果您的應用程式使用 HashMap 狀態後端，您的應用程式將會持續失敗。

# 升級到 Flink 2.2：完成指南
<a name="flink-2-2-upgrade-guide"></a>

本指南提供step-by-step說明。這是主要版本升級，其中包含需要仔細規劃和測試的重大變更。

**主要版本升級是單向**  
升級操作可以使用狀態保留將您的應用程式從 Flink 1.x 移至 2.2，但您無法從 2.2 移至具有 2.2 狀態的 1.x。如果您的應用程式在升級後變得運作狀態不佳，請使用轉返 API，從最新的快照返回具有原始 1.x 狀態的 1.x 版本。

## 先決條件
<a name="upgrade-guide-prerequisites"></a>

開始升級之前：
+ 檢閱 [中斷變更和棄用](flink-2-2.md#flink-2-2-breaking-changes)
+ 檢閱 [Flink 2.2 升級的狀態相容性指南](state-compatibility.md)
+ 確保您擁有用於測試的非生產環境
+ 記錄您目前的應用程式組態和相依性

## 了解遷移路徑
<a name="upgrade-guide-migration-paths"></a>

您的升級體驗取決於應用程式與 Flink 2.2 的相容性。了解這些路徑可協助您適當地準備並設定逼真的期望。

**路徑 1：相容的二進位和應用程式狀態**

**預期事項：**
+ 叫用升級操作
+ 使用應用程式狀態轉換完成遷移至 2.2：`RUNNING`→ `UPDATING` → `RUNNING`
+ 保留所有應用程式狀態，而不會遺失資料或進行重新處理
+ 與次要版本遷移相同的體驗

最適合：無狀態應用程式或使用相容序列化的應用程式 (Avro、相容的 Protobuf 結構描述、不含集合POJOs)

**路徑 2：二進位不相容**

**預期事項：**
+ 叫用升級操作
+ 操作失敗，並透過操作 API 和日誌顯示二進位不相容
+ 啟用自動轉返：應用程式會在幾分鐘內自動轉返，無需您的介入
+ 停用自動轉返：應用程式在未處理資料的情況下仍處於執行中狀態；您可以手動轉返至較舊的版本
+ 修正二進位檔案後，請使用 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 來獲得類似路徑 1 的體驗

最適合：使用 Flink 任務啟動期間偵測到之已移除 APIs 的應用程式

**路徑 3：不相容的應用程式狀態**

**預期事項：**
+ 叫用升級操作
+ 遷移一開始似乎成功
+ 當狀態還原失敗時，應用程式會在幾秒鐘內進入重新啟動迴圈
+ 透過顯示持續重新啟動的 CloudWatch 指標偵測失敗
+ 手動叫用轉返操作
+ 啟動復原後幾分鐘內返回生產環境
+ [狀態遷移](state-compatibility.md#state-compat-migration) 檢閱您的應用程式

最適合：狀態序列化不相容的應用程式 （具有集合、特定 Kryo 序列化狀態的 POJOs)

**注意**  
強烈建議您建立生產應用程式的複本，並在複本上測試下列每個升級階段，然後再遵循生產應用程式的相同步驟。

## 階段 1：準備
<a name="upgrade-guide-phase-1"></a>

**更新應用程式程式碼**

更新您的應用程式程式碼，使其與 Flink 2.2 相容：
+ **在 或 中將 Flink 相依性**更新至 2.2.0 版 `pom.xml` `build.gradle`
+ 將**連接器相依性更新**至 Flink 2.2 相容版本 （請參閱 [連接器可用性](flink-2-2.md#flink-2-2-connectors))
+ **移除已棄用的 API 用量**：
  + 以 DataStream API 或資料表 API/SQL 取代 DataSet API DataStream 
  + 使用 FLIP-27 來源和 FLIP-143 接收器 APIs 取代舊版 `SourceFunction`/`SinkFunction` 
  + 以 Java API 取代 Scala API 用量
+ **更新至 Java 17**

**上傳更新的應用程式程式碼**
+ 使用 Flink 2.2 相依性建置您的應用程式 JAR
+ 使用與目前 JAR **不同的檔案名稱**上傳到 Amazon S3 （例如 `my-app-flink-2.2.jar`)
+ 請注意要在升級步驟中使用的 S3 儲存貯體和金鑰

## 階段 2：啟用自動轉返
<a name="upgrade-guide-phase-2"></a>

自動轉返可讓 Amazon Managed Service for Apache Flink 在升級失敗時自動還原至先前的版本。

**檢查自動轉返狀態**

*AWS 管理主控台:*

1. 導覽至您的應用程式

1. 選擇 **Configuration** (組態)。

1. 在**應用程式設定**下，確認**系統復原**已啟用

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**啟用自動轉返 （如果未啟用）**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## 階段 3：拍攝快照 （選用）
<a name="upgrade-guide-phase-3"></a>

如果您的應用程式已啟用自動快照，您可以略過此步驟，否則請在升級之前擷取應用程式的快照以儲存應用程式的狀態。

**從執行中的應用程式擷取快照**

*AWS 管理主控台:*

1. 導覽至您的應用程式

1. 選擇**快照**

1. 選擇**建立快照**

1. 輸入快照名稱 （例如 `pre-flink-2.2-upgrade`)

1. 選擇 **Create** (建立)

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**驗證快照建立**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

等到 `SnapshotStatus` 變成 `READY`再繼續。

## 階段 4：升級應用程式
<a name="upgrade-guide-phase-4"></a>

您可以使用 [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)動作升級 Flink 應用程式。

您可以透過多種方式呼叫 `UpdateApplication` API：
+ **使用 AWS 管理主控台。**
  + 前往 上的應用程式頁面 AWS 管理主控台。
  + 選擇**設定**。
  + 選取您要從中開始的新執行期和快照，也稱為還原組態。使用最新的設定作為還原組態，從最新的快照啟動應用程式。指向 Amazon S3 上新升級的應用程式 JAR/zip。
+ **使用 AWS CLI**[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)動作。
+ **使用 CloudFormation。**
  + 更新 `RuntimeEnvironment` 欄位。先前， CloudFormation 已刪除應用程式並建立新的應用程式，導致您的快照和其他應用程式歷史記錄遺失。現在會 CloudFormation 更新您的`RuntimeEnvironment`就地，不會刪除您的應用程式。
+ **使用 AWS SDK。**
  + 如需您選擇的程式設計語言，請參閱 SDK 文件。請參閱 [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)。

您可以在應用程式處於 `RUNNING` 狀態或應用程式處於停止`READY`狀態時執行升級。Amazon Managed Service for Apache Flink 會驗證原始執行時間版本與目標執行時間版本之間的相容性。此相容性檢查會在處於 `UpdateApplication` `RUNNING` 狀態時執行，或在`READY`處於 狀態時升級`StartApplication`時於下一個執行。

**從 RUNNING 狀態升級**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**從 READY 狀態升級**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## 階段 5：監控升級
<a name="upgrade-guide-phase-5"></a>

**相容性檢查**
+ 使用 操作 API 檢查升級的狀態。如果任務啟動有二進位不相容或問題，升級操作將使用 日誌失敗。
+ 如果升級操作成功，但應用程式卡在重新啟動迴圈中，這表示狀態與新的 Flink 版本不相容，或更新的程式碼發生問題。[Flink 2.2 升級的狀態相容性指南](state-compatibility.md) 檢閱如何識別狀態不相容問題。

**監控應用程式運作狀態**

*應用程式狀態：*
+ 應用程式狀態應轉換：`RUNNING`→ `UPDATING` → `RUNNING`
+ 檢查應用程式的執行時間。如果是 2.2，表示升級操作成功。
+ 如果您的應用程式在 中，`RUNNING`但仍處於較舊的執行時間，則自動轉返會開始。操作 API 會將操作顯示為 `FAILED`。檢查日誌以尋找失敗的例外狀況。

此外，請在 CloudWatch 中監控這些指標：

*重新啟動指標：*
+ `numRestarts`：監控意外重新啟動 — 如果 `numRestarts`為零且 `uptime`或 `runningTime` 正在增加，則升級成功。

*檢查點指標：*
+ `lastCheckpointDuration`：應該類似於預先升級的值
+ `numberOfFailedCheckpoints`：應保持在 0

## 階段 6：驗證應用程式行為
<a name="upgrade-guide-phase-6"></a>

在 Flink 2.2 上執行應用程式之後：

**功能驗證**
+ 確認正在從來源讀取資料
+ 確認資料正在寫入目的地
+ 驗證業務邏輯產生預期結果
+ 比較輸出與升級前基準

**效能驗證**
+ 監控延遲指標 end-to-end處理時間）
+ 監控輸送量指標 （每秒記錄數）
+ 監控檢查點持續時間和大小
+ 監控記憶體和 CPU 使用率

**執行 24 小時以上**

允許應用程式在生產環境中執行至少 24 小時，以確保：
+ 沒有記憶體洩漏
+ 穩定檢查點行為
+ 沒有意外重新啟動
+ 一致的輸送量

## 階段 7：轉返程序
<a name="upgrade-guide-phase-7"></a>

如果升級失敗或應用程式正在執行但運作狀態不佳，請復原至先前的版本。

**自動轉返**

如果啟用自動轉返，且在啟動期間升級失敗，Amazon Managed Service for Apache Flink 會自動還原至先前的版本。

**手動轉返**

如果應用程式正在執行但運作狀態不佳，請使用 `RollbackApplication` API：

*AWS 管理主控台:*

1. 導覽至您的應用程式

1. 選擇**動作** → **復原**

1. 確認轉返

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**復原期間會發生的情況：**
+ 應用程式停止
+ 執行時間會還原至先前的 Flink 版本
+ 應用程式程式碼會還原至先前的 JAR
+ 應用程式會從升級**前**取得的最後一個成功快照重新啟動

**重要**  
您無法還原 Flink 1.x 上的 Flink 2.2 快照
轉返使用升級之前拍攝的快照
升級之前一律拍攝快照 （階段 3)

## 後續步驟
<a name="upgrade-guide-next-steps"></a>

有關升級期間的問題，請參閱 [Managed Service for Apache Flink 故障診斷](troubleshooting.md)或聯絡 AWS Support。

# Flink 2.2 升級的狀態相容性指南
<a name="state-compatibility"></a>

從 Flink 1.x 升級到 Flink 2.2 時，狀態相容性問題可能會使您的應用程式無法從快照還原。本指南可協助您識別潛在的相容性問題，並提供遷移策略。

## 了解狀態相容性變更
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 引進數個會影響狀態相容性的序列化變更。以下是主要項目：
+ **Kryo 版本升級**：Apache Flink 2.2 會將綁定的 Kryo 序列化程式從第 2 版升級到第 5 版。由於 Kryo v5 使用與 Kryo v2 不同的二進位編碼格式，因此無法在 Flink 2.2 中還原透過 Kryo 在 Flink 1.x 儲存點中序列化的任何運算子狀態。
+ **Java 集合序列化**：在 Flink 1.x 中，POJOs 內的 Java 集合 （例如 `ArrayList`、 `HashMap`和 `HashSet`) 使用 Kryo 序列化。Flink 2.2 推出與來自 1.x 的 Kryo 序列化狀態不相容的集合特定最佳化序列化程式。在 1.x 中使用 Java 集合搭配 POJO 或 Kryo 序列化程式的應用程式無法在 Flink 2.2 中還原此狀態。如需資料類型和序列化的詳細資訊，請參閱 Flink [文件](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)。
+ **Kinesis 連接器相容性**：低於 5.0 的 Kinesis Data Streams (KDS) 連接器版本會維持與 Flink 2.2 Kinesis 連接器 6.0 版不相容的狀態。您必須先遷移至連接器 5.0 版或更新版本，才能升級。

## 序列化相容性參考
<a name="state-compat-reference"></a>

檢閱應用程式中的所有狀態宣告，並將序列化類型與下表比對。如果有任何狀態類型不相容，請先參閱[狀態遷移](#state-compat-migration)一節，再繼續升級。


**序列化相容性參考**  

| 序列化類型 | 相容？ | 詳細資訊 | 
| --- | --- | --- | 
| Avro (SpecificRecord、GenericRecord) | 是 | 使用自己的二進位格式，獨立於 Kryo。確保您使用的是 Flink 的原生 Avro 類型資訊，而不是註冊為 Kryo 序列化程式的 Avro。 | 
| Protobuf | 是 | 使用自己的二進位編碼，獨立於 Kryo。驗證結構描述變更遵循回溯相容的演變規則。 | 
| 沒有集合POJOs  | 是 | 由 Flink 的 POJO 序列化程式處理 — 但僅限於類別符合所有 POJO 條件時：公有類別、公有無參數建構函數、所有公開或可透過 getter/setters 存取的欄位，以及所有欄位類型本身可透過 Flink 序列化。違反上述任何一項的 POJO 無提示地落回 Kryo，並變得不相容。 | 
| 自訂 TypeSerializers | 是 | 只有在您的序列化程式未在內部委派給 Kryo 時才相容。 | 
| SQL 和資料表 API 狀態 | 是 （具有警告） | 使用 Flink 的內部序列化程式。不過，Apache Flink 不保證資料表 API 應用程式的主要版本之間的狀態相容性。首先在非生產環境中進行測試。 | 
| 具有 Java 集合的 POJOs (HashMap、ArrayList、HashSet) | 否 | 在 Flink 1.x 中，POJOs 內的集合是透過 Kryo v2 序列化。Flink 2.2 推出專用集合序列化程式，其二進位格式與 Kryo v2 格式不相容。 | 
| Scala 案例類別 | 否 | 在 Flink 1.x 中透過 Kryo 序列化。Kryo v2 至 v5 升級會變更二進位格式。 | 
| Java 記錄 | 否 | 通常回到 Flink 1.x 中的 Kryo 序列化。使用 進行測試來驗證 disableGenericTypes()。 | 
| 第三方程式庫類型 | 否 | 沒有已註冊自訂序列化程式的類型會回到 Kryo。Kryo v2 到 v5 二進位格式變更會中斷相容性。 | 
| 使用 Kryo 備用的任何類型 | 否 | 如果 Flink 無法處理具有內建或已註冊序列化程式的類型，則會回到 Kryo。來自 1.x 的所有 Kryo 序列化狀態與 2.2 不相容。 | 

## 診斷方法
<a name="state-compat-diagnostics"></a>

您可以在 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 操作之後查看應用程式日誌或檢查日誌，以主動識別狀態相容性問題。

**識別應用程式中的 Kryo 備用**

您可以在日誌中使用下列 regex 模式來識別應用程式中的 Kryo 備用：

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

範例日誌：

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

如果使用 UpdateApplication API 升級失敗，下列例外狀況可能表示您遇到序列化程式型狀態不相容：

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## 升級前檢查清單
<a name="state-compat-checklist"></a>
+ 檢閱應用程式中的所有狀態宣告
+ 使用集合 (`HashMap`、`ArrayList`、`HashSet`) 檢查 POJOs 
+ 驗證每種狀態類型的序列化方法
+ 在此複本上使用 UpdateApplication API 建立生產複本應用程式並測試狀態相容性
+ 如果狀態不相容，請從 選取策略 [狀態遷移](#state-compat-migration)
+ 在生產 Flink 應用程式組態中啟用自動轉返

## 狀態遷移
<a name="state-compat-migration"></a>

**重建完成狀態**

最適合可以從來源資料重建狀態的應用程式。

如果您的應用程式可以從來源資料重建狀態：

1. 停止 Flink 1.x 應用程式

1. 使用更新的程式碼升級至 Flink 2.x

1. 從 開始 `SKIP_RESTORE_FROM_SNAPSHOT`

1. 允許應用程式重建狀態

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## 最佳實務
<a name="state-compat-best-practices"></a>

1. **一律將 Avro 或 Protobuf 用於複雜狀態** — 這些可提供結構描述演變，且與 Kryo 無關

1. **避免 POJOs 中的集合** — 請`MapState`改用 Flink 的原生 `ListState`和 。

1. 在**本機測試狀態還原** - 在生產升級之前，使用實際快照進行測試

1. **經常拍攝快照** - 特別是在主要版本升級之前

1. **啟用自動轉返** — 將您的 MSF 應用程式設定為在失敗時自動轉返

1. **記錄您的狀態類型** — 維護所有狀態類型及其序列化方法的文件

1. **監控檢查點大小** — 不斷增長的檢查點大小可能表示序列化問題

## 後續步驟
<a name="state-compat-next-steps"></a>

**規劃升級**：請參閱[升級到 Flink 2.2：完成指南](flink-2-2-upgrade-guide.md)。

如需遷移期間的問題，請參閱 [Managed Service for Apache Flink 故障診斷](troubleshooting.md)或聯絡 AWS Support。

# 在 Managed Service for Apache Flink 中實作應用程式擴展
<a name="how-scaling"></a>

您可以為 Amazon Managed Service for Apache Flink 設定任務的平行執行和資源配置，以實作擴展。如需有關 Apache Flink 如何排程任務平行執行個體的資訊，請參閱 Apache Flink 文件中的[平行執行](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/)。

**Topics**
+ [設定應用程式平行處理和ParallelismPerKPU](#how-parallelism)
+ [配置 Kinesis 處理單元](#how-scaling-kpus)
+ [更新應用程式的平行處理](#how-scaling-howto)
+ [在 Managed Service for Apache Flink 中使用自動擴展](how-scaling-auto.md)
+ [maxParallelism 考量](#how-scaling-auto-max-parallelism)

## 設定應用程式平行處理和ParallelismPerKPU
<a name="how-parallelism"></a>

您可以使用下列 [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html) 屬性，為 Managed Service for Apache Flink 應用程式任務 (例如從來源讀取或執行運算子) 設定平行執行：
+ `Parallelism`：使用此屬性可設定預設的 Apache Flink 應用程式平行處理層級。除非在應用程式的程式碼中覆寫，否則所有運算子、來源和接收器都按此平行處理層級執行。預設值為 `1`，最大值為 `256`。
+ `ParallelismPerKPU`：使用此屬性設定依應用程式每 Kinesis 處理單元 (KPU) 可排程的平行任務數目。預設值為 `1`，最大值為 `8`。對於具有封鎖作業 (例如 I/O) 的應用程式，較高的 `ParallelismPerKPU` 值會導致 KPU 資源的完整使用率。

**注意**  
`Parallelism` 的限制等於 KPU 的限制 `ParallelismPerKPU` 乘以 (預設值為 64)。KPU 限制可透過請求提高限制來增加。如需如何請求提高限制的指示，請參閱 [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) 中的「請求提高限制」。

如需為特定運算子設定任務平行處理的資訊，請參閱 Apache Flink 文件中的[設定平行處理：運算](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level)子。

## 配置 Kinesis 處理單元
<a name="how-scaling-kpus"></a>

Managed Service for Apache Flink 以 KPU 的形式佈建容量。單一 KPU 可為您提供 1 個 vCPU 和 4 GB 的記憶體。針對每個配置的 KPU，還會提供 50 GB 的執行中應用程式儲存體。

Managed Service for Apache Flink 會使用 `Parallelism` 和 `ParallelismPerKPU` 屬性計算執行應用程式所需的 KPU，如下所示：

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Managed Service for Apache Flink 可快速提供應用程式資源，以因應輸送量或處理活動尖峰。它會在活動尖峰過去後逐漸從應用程式中移除資源。若要停用資源的自動配置，請將 `AutoScalingEnabled` 值設定為 `false`，如稍後 [更新應用程式的平行處理](#how-scaling-howto) 中所述。

應用程式的 KPU 預設限制為 64。如需如何請求提高此限制的指示，請參閱 [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) 中的「請求提高限制」。

**注意**  
額外的 KPU 需要為了協同運作目的付費。如需詳細資訊，請參閱 [Managed Service for Apache Flink 定價](https://aws.amazon.com/kinesis/data-analytics/pricing/)。

## 更新應用程式的平行處理
<a name="how-scaling-howto"></a>

本節包含設定應用程式平行處理之 API 動作的範例請求。如需如何將請求區塊與 API 動作搭配使用的更多範例和指示，請參閱[Managed Service for Apache Flink API 範例程式碼](api-examples.md)。

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) 動作的下列請求範例會在您建立應用程式時設定平行處理：

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列請求範例會為現有的應用程式時設定平行處理：

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) 動作的下列請求範例會為現有的應用程式時停用平行處理：

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# 在 Managed Service for Apache Flink 中使用自動擴展
<a name="how-scaling-auto"></a>

Managed Service for Apache Flink 可彈性擴展應用程式的平行處理層級，以因應來源的資料輸送量和運算子在大多數情況下的複雜性。預設會啟用自動擴展。Managed Service for Apache Flink 會監控應用程式的資源 (CPU) 使用情況，並相應地彈性擴展應用程式的平行處理層級：
+ 如果 CloudWatch 指標最大值`containerCPUUtilization`大於 75% 或更高 15 分鐘，您的應用程式會縱向擴展 （增加平行處理）。這表示當 15 個連續資料點的 1 分鐘期間等於或超過 75% 時，就會啟動`ScaleUp`動作。`ScaleUp` 動作會將`CurrentParallelism`您應用程式的 加倍。 `ParallelismPerKPU` 不會修改。因此，配置KPUs 數量也會加倍。
+ 當 CPU 使用率維持在 10% 以下達 6小時時，應用程式會縮減規模 (減少平行處理層級)。這表示當有 360 個連續資料點且 1 分鐘期間小於 10% 時，就會啟動`ScaleDown`動作。`ScaleDown` 動作會減半 （四捨五入） 應用程式的平行處理。 `ParallelismPerKPU` 不會修改，配置KPUs 數量也會減半 （四捨五入）。

**注意**  
最多可以參考`containerCPUUtilization`超過 1 分鐘的期間，以尋找與用於擴展動作之資料點的關聯性，但不需要反映動作初始化時的確切時刻。

Managed Service for Apache Flink 不會將應用程式的 `CurrentParallelism` 值降低到小於應用程式的 `Parallelism` 設定。

當 Managed Service for Apache Flink 服務擴展應用程式時，應用程式將處於 `AUTOSCALING` 狀態。您可以使用 [DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html) 或 [ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html) 動作來檢查目前的應用程式狀態。當服務擴展您的應用程式時，您唯一可以使用的有效 API 動作是 [StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)，且 `Force` 參數設定為 `true`。

您可以使用 `AutoScalingEnabled` 屬性 ([https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html) 的一部分) 來啟用或停用自動擴展行為。 AWS 您的帳戶會針對 Managed Service for Apache Flink 佈建的 KPUs 收費，這是應用程式`parallelism`和`parallelismPerKPU`設定的函數。活動尖峰會增加您的 Managed Service for Apache Flink 成本。

如需定價相關的資訊，請參閱 [Amazon Managed Service for Apache Flink 定價](https://aws.amazon.com/kinesis/data-analytics/pricing/)。

請留意下列與應用程式擴展相關的資訊：
+ 預設會啟用自動擴展。
+ 擴展不適用於 Studio 筆記本。不過，如果您將 Studio 筆記本部署為具有持久狀態的應用程式，則擴展將套用到已部署的應用程式。
+ 應用程式的預設限制為 64 個 KPU。如需詳細資訊，請參閱[Managed Service for Apache Flink 和 Studio 筆記本配額](limits.md)。
+ 當自動擴展更新應用程式的平行處理層級時，應用程式會經歷停機。若要避免停機，請執行下列動作：
  + 停用自動擴展
  + 使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作設定應用程式的 `parallelism` 和 `parallelismPerKPU`。如需設定應用程式平行處理設定的詳細資訊，請參閱 [更新應用程式的平行處理](how-scaling.md#how-scaling-howto)。
  + 定期監控應用程式的資源使用量，以確認應用程式的工作負載具有正確的平行處理設定。如需資源配置情況的相關資訊，請參閱[Managed Service for Apache Flink 中的指標和維度](metrics-dimensions.md)。

## 實作自訂自動擴展
<a name="how-scaling-custom-autoscaling"></a>

如果您想要對自動擴展進行更精細的控制，或使用 以外的觸發指標`containerCPUUtilization`，您可以使用此範例：
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  此範例說明如何使用與 Apache Flink 應用程式不同的 CloudWatch 指標來擴展 Managed Service for Apache Flink 應用程式，包括使用 Amazon MSK 和 Amazon Kinesis Data Streams 做為來源或接收器的指標。

如需詳細資訊，請參閱 [Apache Flink 的增強型監控和自動擴展](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/)。

## 實作排定的自動擴展
<a name="how-scaling-scheduled-autoscaling"></a>

如果您的工作負載隨著時間遵循可預測的設定檔，您可能偏好先行擴展 Apache Flink 應用程式。這會在排程時間擴展您的應用程式，而不是根據指標被動擴展。若要在一天中的固定時間設定擴展和縮減，您可以使用此範例：
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## maxParallelism 考量
<a name="how-scaling-auto-max-parallelism"></a>

Flink 任務可以擴展的最大平行處理，受限於任務`maxParallelism`所有運算子的*最小值*。例如，如果您有一個只有來源和接收器的簡單任務，而來源的 為 `maxParallelism` 16，而目的地的 為 8，則應用程式無法擴展到超過 8 的平行處理。

若要了解如何計算 運算子`maxParallelism`的預設值，以及如何覆寫預設值，請參閱 Apache Flink 文件中的[設定最大平行處理](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism)。

作為基本規則，請注意，如果您未`maxParallelism`為任何運算子定義 ，並且您以小於或等於 128 `maxParallelism`的平行處理啟動應用程式，則所有運算子的 都將是 128。

**注意**  
任務的最大平行處理是擴展應用程式保留 狀態的平行處理上限。  
如果您修改`maxParallelism`現有應用程式，則應用程式將無法從先前使用舊 拍攝的快照重新啟動`maxParallelism`。您只能在沒有快照的情況下重新啟動應用程式。  
如果您計劃將應用程式擴展到大於 128 的平行處理，您必須在`maxParallelism`應用程式中明確設定 。
+ 自動擴展邏輯可防止將 Flink 任務擴展到超過任務最大平行處理的平行處理。
+ 如果您使用自訂自動擴展或排程擴展，請將它們設定為不超過任務的最大平行處理。
+ 如果您手動擴展應用程式超過最大平行處理，則應用程式無法啟動。

# 將標籤新增至 Managed Service for Apache Flink 應用程式
<a name="how-tagging"></a>



本節說明如何將索引鍵-值中繼資料標籤新增至 Managed Service for Apache Flink 應用程式。這些標籤可用於下列目的：
+ 決定個別 Managed Service for Apache Flink 應用程式的帳單。如需詳細資訊，請參閱《帳單和成本管理使用者指南》中的[使用成本分配標籤](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)**。
+ 根據標籤控制對應用程式資源的存取。如需詳細資訊，請參閱《AWS Identity and Access Management 使用者指南》**中的[使用標籤控制存取權](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html)。
+ 使用者定義的目的。您可以根據使用者標籤的存在來定義應用程式的功能。

注意有關標記的以下資訊：
+ 應用程式標籤的數目上限包括系統標籤。使用者定義的應用程式的標籤數目上限為 50。
+ 如果動作包含具有重複 `Key` 值的標籤清單，則服務會擲出 `InvalidArgumentException`。

**Topics**
+ [建立應用程式時新增標籤](how-tagging-create.md)
+ [新增或更新現有應用程式的標籤](how-tagging-add.md)
+ [列出應用程式的標籤](how-tagging-list.md)
+ [從應用程式移除標籤](how-tagging-remove.md)

# 建立應用程式時新增標籤
<a name="how-tagging-create"></a>

您可以在使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作的 `tags` 參數建立應用程式時新增標籤。

以下範例請求顯示 `CreateApplication` 請求的 `Tags` 節點：

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# 新增或更新現有應用程式的標籤
<a name="how-tagging-add"></a>

您可以使用 [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html) 動作將標籤新增至應用程式。您無法使用 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作為應用程式新增標籤。

若要更新現有的標籤，請使用與現有標籤相同的索引鍵新增標籤。

`TagResource` 動作的下列請求範例會新增標籤或更新現有標籤：

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# 列出應用程式的標籤
<a name="how-tagging-list"></a>

若要列出現有標籤，請使用 [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html) 動作。

`ListTagsForResource` 動作的下列請求範例可列出應用程式的標籤：

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# 從應用程式移除標籤
<a name="how-tagging-remove"></a>

若要從應用程式移除標籤，請使用 [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html) 動作。

`UntagResource` 動作的下列請求範例可移除應用程式的標籤：

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# 搭配 Managed Service for Apache Flink 使用 CloudFormation
<a name="lambda-cfn-flink"></a>

下列練習示範如何在相同堆疊中使用 Lambda 函數啟動 CloudFormation 使用 建立的 Flink 應用程式。

## 開始之前
<a name="before-you-begin"></a>

開始本練習之前，請遵循 CloudFormation 在 [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html) 使用 建立 Flink 應用程式的步驟。

## 撰寫 Lambda 函數
<a name="write-lambda-function"></a>

在建立或更新 Flink 應用程式後，若要啟動它，可以使用 kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html) API。建立 Flink 應用程式後， CloudFormation 事件會觸發呼叫。在本練習稍後部分，我們將討論如何設定堆疊以觸發 Lambda 函數，但我們先專注於 Lambda 函數宣告及其程式碼。我們在本範例中使用 `Python3.8` 執行期。

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

在上述程式碼中，Lambda 會處理傳入 CloudFormation 事件、篩選除 `Create`和 之外的所有內容`Update`、取得應用程式狀態，並在狀態為 時啟動它`READY`。若要取得應用程式狀態，您必須建立 Lambda 角色，如下所示。

## 建立 Lambda 角色
<a name="create-lambda-role"></a>

您可以為 Lambda 建立角色，以便與應用程式成功「通話」並寫入日誌。此角色使用預設受管政策，但您可能想要將其縮小為使用自訂政策。

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

請注意，Lambda 資源將在建立 Flink 應用程式之後在同一堆疊中建立，因為它們依賴於它。

## 叫用 Lambda 函數
<a name="invoking-lambda-function"></a>

現在剩下要做的就是調用 Lambda 函數。您可以使用[自訂資源](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html)來執行此操作。

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

以上是使用 Lambda 啟動 Flink 應用程式所需的一切。您現在可以建立自己的堆疊，也可以使用下面的完整範例來查看所有這些步驟的實際運作方式。

## 檢閱延伸範例
<a name="lambda-cfn-flink-full-example"></a>

下列範例是先前步驟的稍微擴充版本，並透過[範本參數](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html)完成額外的`RunConfiguration`調整。這是一個工作堆疊供您嘗試。請務必閱讀隨附的注意事項：

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

您也可以調整 Lambda 的角色以及應用程式本身的角色。

在建立上面的堆疊之前，不要忘記指定參數。

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

使用您的特定需求取代 `YOUR_BUCKET_ARN` 和 `YOUR_JAR`。您可以按照本[指南](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html)來建立 Amazon S3 儲存貯體和應用程式 jar。

現在建立堆疊 (使用您選擇的區域，例如 US-east-1，取代 YOUR\$1REGION)：

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

現在，您可以導覽到 [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) 並檢視進度。建立 Flink 應用程式後，您應該會看到該應用程式處於 `Starting` 狀態。可能需要幾分鐘的時間才開始 `Running`。

如需詳細資訊，請參閱下列內容：
+ [使用 AWS CloudFormation 擷取任何 AWS 服務屬性的四種方式 （第 1 部分，共 3 部分）](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/)。
+ [逐步導覽：查詢 Amazon Machine Image ID](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html)。

# 使用 Apache Flink 儀表板搭配 Managed Service for Apache Flink
<a name="how-dashboard"></a>

您可以使用應用程式的 Apache Flink 儀表板來監控 Managed Service for Apache Flink 應用程式的運作狀態。應用程式的儀表板顯示下列資訊：
+ 使用中的資源，包括任務管理員和任務空位。
+ 作業資訊，包括正在執行、已完成、已取消和失敗的作業。

如需 Apache Flink 任務管理員、任務空位和作業的相關資訊，請參閱 Apache Flink 網站上的 [Apache Flink 架構](https://flink.apache.org/what-is-flink/flink-architecture/)。

請注意下列將 Apache Flink 儀表板用於 Managed Service for Apache Flink 應用程式的相關事項：
+ 用於 Managed Service for Apache Flink 應用程式的 Apache Flink 儀表板是唯讀的。您無法使用 Apache Flink 儀表板變更 Managed Service for Apache Flink 應用程式。
+ Apache Flink 儀表板與 Microsoft Internet Explorer 不相容。

## 存取應用程式的 Apache Flink 儀表板
<a name="how-dashboard-accessing"></a>

您可以透過 Managed Service for Apache Flink 主控台存取應用程式的 Apache Flink 儀表板，也可以透過使用 CLI 請求安全 URL 端點。

### 使用 Managed Service for Apache Flink 主控台存取應用程式的 Apache Flink 儀表板
<a name="how-dashboard-accessing-console"></a>

若要從主控台存取應用程式的 Apache Flink 儀表板，請在應用程式頁面上選擇 **Apache Flink 儀表板**。

**注意**  
從 Managed Service for Apache Flink 主控台開啟儀表板時，主控台會產生 URL，其有效時間為 12 小時。

### 使用 Managed Service for Apache Flink CLI 存取應用程式的 Apache Flink 儀表板
<a name="how-dashboard-accessing-cli"></a>

您可以使用 Managed Service for Apache Flink CLI 產生 URL 來存取應用程式儀表板。所產生的 URL 在指定的時間內有效。

**注意**  
如果您在三分鐘內未存取產生的 URL，則該 URL 將不再有效。

您可以使用 [CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html) 動作來產生儀表板 URL。您可以為動作指定下列參數值：
+ 應用程式名稱
+ URL 有效時間 (秒)
+ 您可以指定 `FLINK_DASHBOARD_URL` 為 URL 類型。