

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Flink 2.2 升级的状态兼容性指南
<a name="state-compatibility"></a>

从 Flink 1.x 升级到 Flink 2.2 时，状态兼容性问题可能会使您的应用程序无法从快照中恢复。本指南可帮助您识别潜在的兼容性问题并提供迁移策略。

## 了解状态兼容性变化
<a name="state-compat-understanding"></a>

适用于 Apache Flink 2.2 的亚马逊托管服务引入了几项影响状态兼容性的序列化更改。以下是主要的：
+ **Kryo 版本升级**：Apache Flink 2.2 将捆绑的 Kryo 序列化程序从版本 2 升级到版本 5。由于 Kryo v5 使用的二进制编码格式与 Kryo v2 不同，因此在 Flink 1.x 保存点中通过 Kryo 序列化的任何运算符状态都无法在 Flink 2.2 中恢复。
+ **Java 集合序列化**：在 Flink 1.x 中，其中的 Java 集合（例如`HashMap``ArrayList`、和`HashSet`）是使用 Kryo 进行序列化 POJOs 的。Flink 2.2 引入了特定于集合的优化序列化器，这些序列化器与 1.x 中的 Kryo 序列化状态不兼容。在 1.x 中使用带有 POJO 或 Kryo 序列化器的 Java 集合的应用程序无法在 Flink 2.2 中恢复此状态。有关数据类型和序列化的更多详细信息，请参阅 Flink [文档](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)。
+ **Kinesis 连接器兼容性**：低于 5.0 的 Kinesis Data Streams (KDS) 连接器版本保持与 Flink 2.2 Kinesis 连接器版本 6.0 不兼容的状态。升级之前，必须迁移到连接器版本 5.0 或更高版本。

## 序列化兼容性参考
<a name="state-compat-reference"></a>

查看应用程序中的所有状态声明，并将序列化类型与下表进行匹配。如果有任何状态类型不兼容，请在继续升级之前参阅[州移民](#state-compat-migration)部分。


**序列化兼容性参考**  

| 序列化类型 | 兼容？ | Details | 
| --- | --- | --- | 
| Avro (SpecificRecord,GenericRecord) | 是 | 使用自己独立于 Kryo 的二进制格式。确保你使用的是 Flink 的原生 Avro 类型信息，而不是注册为 Kryo 序列化器的 Avro。 | 
| Protobuf | 是 | 使用自己独立于 Kryo 的二进制编码。验证架构更改是否遵循向后兼容的演变规则。 | 
| POJOs 没有收藏 | 是 | 由 Flink 的 POJO 序列化器处理——但前提是该类符合所有 POJO 标准：公共类、公共无参数构造函数、所有公共字段或可通过 getter/setter 访问的字段，以及所有字段类型本身均可由 Flink 序列化。违反其中任何一项的 POJO 都会默默地退回 Kryo 并变得不兼容。 | 
| 自定义 TypeSerializers | 是 | 仅当你的序列化器不在内部委托给 Kryo 时才兼容。 | 
| SQL 和表 API 状态 | 是（需要注意的是） | 使用 Flink 的内部序列化器。但是，Apache Flink 不保证 Table API 应用程序的主要版本之间的状态兼容性。请先在非生产环境中测试。 | 
| POJOs 使用 Java 集合 (HashMap、ArrayList、HashSet) | 否 | 在 Flink 1.x 中，其中的集合通过 Kry POJOs o v2 进行序列化。Flink 2.2 引入了专用的集合序列化器，其二进制格式与 Kryo v2 格式不兼容。 | 
| Scala 案例课 | 否 | 在 Flink 1.x 中通过 Kryo 进行序列化。Kryo v2 到 v5 的升级更改了二进制格式。 | 
| Java 记录 | 否 | 通常在 Flink 1.x 中回退到 Kryo 序列化。通过测试进行验证disableGenericTypes()。 | 
| 第三方库类型 | 否 | 没有注册的自定义序列化程序的类型可以追溯到 Kryo。Kryo v2 到 v5 二进制格式的更改破坏了兼容性。 | 
| 任何使用 Kryo 回退的类型 | 否 | 如果 Flink 无法使用内置或注册的序列化器处理类型，则会回退到 Kryo。1.x 中的所有 Kryo 序列化状态都与 2.2 不兼容。 | 

## 诊断方法
<a name="state-compat-diagnostics"></a>

您可以通过查看应用程序日志或在 [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 操作之后检查日志来主动识别状态兼容性问题。

**在你的应用程序中识别 Kryo 的后备方案**

你可以在日志中使用以下正则表达式模式来识别应用程序中的 Kryo fallback：

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

日志示例：

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

如果使用 UpdateApplication API 升级失败，则以下异常可能表明您遇到了基于序列化程序的状态不兼容问题：

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## 升级前清单
<a name="state-compat-checklist"></a>
+ 查看申请中的所有州声明
+ 查看馆藏 (`HashMap`,`ArrayList`,`HashSet`) POJOs 
+ 验证每种状态类型的序列化方法
+ 创建生产副本应用程序并在此副本上使用 UpdateApplication API 测试状态兼容性
+ 如果状态不兼容，请从中选择策略 [州移民](#state-compat-migration)
+ 在生产 Flink 应用程序配置中启用自动回滚

## 州移民
<a name="state-compat-migration"></a>

**重建完成状态**

最适合可以从源数据重建状态的应用程序。

如果您的应用程序可以从源数据重建状态：

1. 停止 Flink 1.x 应用程序

1. 使用更新的代码升级到 Flink 2.x

1. 从... 开始 `SKIP_RESTORE_FROM_SNAPSHOT`

1. 允许应用程序重建状态

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## 最佳实践
<a name="state-compat-best-practices"></a>

1. **始终使用 Avro 或 Protobuf 处理复杂状态** — 它们提供架构演变并且与 Kryo 无关

1. **避免在里面收藏 POJOs** — 改用 Flink 的原`ListState`生版本 `MapState`

1. 在@@ **本地测试状态恢复** — 在生产升级之前，使用实际快照进行测试

1. **经常拍摄快照** — 尤其是在主要版本升级之前

1. **启用自动回滚**-将 MSF 应用程序配置为在出现故障时自动回滚

1. **记录您的状态类型** — 维护所有状态类型及其序列化方法的文档

1. **监视检查点大小** — 检查点大小增加可能表明存在序列化问题

## 后续步骤
<a name="state-compat-next-steps"></a>

**计划升级**：请参阅[升级到 Flink 2.2：完整指南](flink-2-2-upgrade-guide.md)。

有关迁移过程中的疑问或问题，请参阅[Managed Service for Apache Flink 的故障排除](troubleshooting.md)或联系 Supp AWS ort。