

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Flink 2.2 升級的狀態相容性指南
<a name="state-compatibility"></a>

從 Flink 1.x 升級到 Flink 2.2 時，狀態相容性問題可能會使您的應用程式無法從快照還原。本指南可協助您識別潛在的相容性問題，並提供遷移策略。

## 了解狀態相容性變更
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 引進數個會影響狀態相容性的序列化變更。以下是主要項目：
+ **Kryo 版本升級**：Apache Flink 2.2 會將綁定的 Kryo 序列化程式從第 2 版升級到第 5 版。由於 Kryo v5 使用與 Kryo v2 不同的二進位編碼格式，因此無法在 Flink 2.2 中還原透過 Kryo 在 Flink 1.x 儲存點中序列化的任何運算子狀態。
+ **Java 集合序列化**：在 Flink 1.x 中，POJOs 內的 Java 集合 （例如 `ArrayList`、 `HashMap`和 `HashSet`) 使用 Kryo 序列化。Flink 2.2 推出與來自 1.x 的 Kryo 序列化狀態不相容的集合特定最佳化序列化程式。在 1.x 中使用 Java 集合搭配 POJO 或 Kryo 序列化程式的應用程式無法在 Flink 2.2 中還原此狀態。如需資料類型和序列化的詳細資訊，請參閱 Flink [文件](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)。
+ **Kinesis 連接器相容性**：低於 5.0 的 Kinesis Data Streams (KDS) 連接器版本會維持與 Flink 2.2 Kinesis 連接器 6.0 版不相容的狀態。您必須先遷移至連接器 5.0 版或更新版本，才能升級。

## 序列化相容性參考
<a name="state-compat-reference"></a>

檢閱應用程式中的所有狀態宣告，並將序列化類型與下表比對。如果有任何狀態類型不相容，請先參閱[狀態遷移](#state-compat-migration)一節，再繼續升級。


**序列化相容性參考**  

| 序列化類型 | 相容？ | 詳細資訊 | 
| --- | --- | --- | 
| Avro (SpecificRecord、GenericRecord) | 是 | 使用自己的二進位格式，獨立於 Kryo。確保您使用的是 Flink 的原生 Avro 類型資訊，而不是註冊為 Kryo 序列化程式的 Avro。 | 
| Protobuf | 是 | 使用自己的二進位編碼，獨立於 Kryo。驗證結構描述變更遵循回溯相容的演變規則。 | 
| 沒有集合POJOs  | 是 | 由 Flink 的 POJO 序列化程式處理 — 但僅限於類別符合所有 POJO 條件時：公有類別、公有無參數建構函數、所有公開或可透過 getter/setters 存取的欄位，以及所有欄位類型本身可透過 Flink 序列化。違反上述任何一項的 POJO 無提示地落回 Kryo，並變得不相容。 | 
| 自訂 TypeSerializers | 是 | 只有在您的序列化程式未在內部委派給 Kryo 時才相容。 | 
| SQL 和資料表 API 狀態 | 是 （具有警告） | 使用 Flink 的內部序列化程式。不過，Apache Flink 不保證資料表 API 應用程式的主要版本之間的狀態相容性。首先在非生產環境中進行測試。 | 
| 具有 Java 集合的 POJOs (HashMap、ArrayList、HashSet) | 否 | 在 Flink 1.x 中，POJOs 內的集合是透過 Kryo v2 序列化。Flink 2.2 推出專用集合序列化程式，其二進位格式與 Kryo v2 格式不相容。 | 
| Scala 案例類別 | 否 | 在 Flink 1.x 中透過 Kryo 序列化。Kryo v2 至 v5 升級會變更二進位格式。 | 
| Java 記錄 | 否 | 通常回到 Flink 1.x 中的 Kryo 序列化。使用 進行測試來驗證 disableGenericTypes()。 | 
| 第三方程式庫類型 | 否 | 沒有已註冊自訂序列化程式的類型會回到 Kryo。Kryo v2 到 v5 二進位格式變更會中斷相容性。 | 
| 使用 Kryo 備用的任何類型 | 否 | 如果 Flink 無法處理具有內建或已註冊序列化程式的類型，則會回到 Kryo。來自 1.x 的所有 Kryo 序列化狀態與 2.2 不相容。 | 

## 診斷方法
<a name="state-compat-diagnostics"></a>

您可以在 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 操作之後查看應用程式日誌或檢查日誌，以主動識別狀態相容性問題。

**識別應用程式中的 Kryo 備用**

您可以在日誌中使用下列 regex 模式來識別應用程式中的 Kryo 備用：

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

範例日誌：

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

如果使用 UpdateApplication API 升級失敗，下列例外狀況可能表示您遇到序列化程式型狀態不相容：

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## 升級前檢查清單
<a name="state-compat-checklist"></a>
+ 檢閱應用程式中的所有狀態宣告
+ 使用集合 (`HashMap`、`ArrayList`、`HashSet`) 檢查 POJOs 
+ 驗證每種狀態類型的序列化方法
+ 在此複本上使用 UpdateApplication API 建立生產複本應用程式並測試狀態相容性
+ 如果狀態不相容，請從 選取策略 [狀態遷移](#state-compat-migration)
+ 在生產 Flink 應用程式組態中啟用自動轉返

## 狀態遷移
<a name="state-compat-migration"></a>

**重建完成狀態**

最適合可以從來源資料重建狀態的應用程式。

如果您的應用程式可以從來源資料重建狀態：

1. 停止 Flink 1.x 應用程式

1. 使用更新的程式碼升級至 Flink 2.x

1. 從 開始 `SKIP_RESTORE_FROM_SNAPSHOT`

1. 允許應用程式重建狀態

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## 最佳實務
<a name="state-compat-best-practices"></a>

1. **一律將 Avro 或 Protobuf 用於複雜狀態** — 這些可提供結構描述演變，且與 Kryo 無關

1. **避免 POJOs 中的集合** — 請`MapState`改用 Flink 的原生 `ListState`和 。

1. 在**本機測試狀態還原** - 在生產升級之前，使用實際快照進行測試

1. **經常拍攝快照** - 特別是在主要版本升級之前

1. **啟用自動轉返** — 將您的 MSF 應用程式設定為在失敗時自動轉返

1. **記錄您的狀態類型** — 維護所有狀態類型及其序列化方法的文件

1. **監控檢查點大小** — 不斷增長的檢查點大小可能表示序列化問題

## 後續步驟
<a name="state-compat-next-steps"></a>

**規劃升級**：請參閱[升級到 Flink 2.2：完成指南](flink-2-2-upgrade-guide.md)。

如需遷移期間的問題，請參閱 [Managed Service for Apache Flink 故障診斷](troubleshooting.md)或聯絡 AWS Support。