

# Use in-place version upgrades for Apache Flink
<a name="how-in-place-version-upgrades"></a>

With in-place version upgrades for Apache Flink, you retain application traceability against a single ARN across Apache Flink versions. This includes snapshots, logs, metrics, tags, Flink configurations, resource limit increases, VPCs, and more. 

You can perform in-place version upgrades for Apache Flink to upgrade existing applications to a new Flink version in Amazon Managed Service for Apache Flink. To perform this task, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console.

**Note**  
You can't use in-place version upgrades for Apache Flink with Amazon Managed Service for Apache Flink Studio.

**Topics**
+ [

# Upgrade applications using in-place version upgrades for Apache Flink
](upgrading-applications.md)
+ [

# Upgrade your application to a new Apache Flink version
](upgrading-application-new-version.md)
+ [

# Roll back application upgrades
](rollback.md)
+ [

# General best practices and recommendations for application upgrades
](best-practices-recommendations.md)
+ [

# Precautions and known issues with application upgrades
](precautions.md)
+ [

# Upgrading to Flink 2.2: Complete guide
](flink-2-2-upgrade-guide.md)
+ [

# State compatibility guide for Flink 2.2 upgrades
](state-compatibility.md)

# Upgrade applications using in-place version upgrades for Apache Flink
<a name="upgrading-applications"></a>

Before you begin, we recommend that you watch this video: [In-Place Version Upgrades](https://www.youtube.com/watch?v=f1qGGdaP2XI).

To perform in-place version upgrades for Apache Flink, you can use the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. You can use this feature with any existing applications that you use with Managed Service for Apache Flink in a `READY` or `RUNNING` state. It uses the UpdateApplication API to add the ability to change the Flink runtime.

## Before upgrading: Update your Apache Flink application
<a name="before-upgrading"></a>

When you write your Flink applications, you bundle them with their dependencies into an application JAR and upload the JAR to your Amazon S3 bucket. From there, Amazon Managed Service for Apache Flink runs the job in the new Flink runtime that you've selected. You might have to update your applications to achieve compatibility with the Flink runtime you want to upgrade to. There can be inconsistencies between Flink versions that cause the version upgrade to fail. Most commonly, this will be with connectors for sources (ingress) or destinations (sinks, egress) and Scala dependencies. Flink 1.15 and later versions in Managed Service for Apache Flink are Scala-agnostic, and your JAR must contain the version of Scala you plan to use.

**To update your application**

1. Read the advice from the Flink community on upgrading applications with state. See [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Read the list of knowing issues and limitations. See [Precautions and known issues with application upgrades](precautions.md).

1. Update your dependencies and test your applications locally. These dependencies typically are:

   1. The Flink runtime and API.

   1. Connectors recommended for the new Flink runtime. You can find these on [Release versions](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) for the specific runtime you want to update to.

   1. Scala – Apache Flink is Scala-agnostic starting with and including Flink 1.15. You must include the Scala dependencies you want to use in your application JAR.

1. Build a new application JAR on zipfile and upload it to Amazon S3. We recommend that you use a different name from the previous JAR/zipfile. If you need to roll back, you will use this information.

1. If you are running stateful applications, we strongly recommend that you take a snapshot of your current application. This lets you roll back statefully if you encounter issues during or after the upgrade. 

# Upgrade your application to a new Apache Flink version
<a name="upgrading-application-new-version"></a>

You can upgrade your Flink application by using the [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

You can call the `UpdateApplication` API in multiple ways:
+ Use the existing **Configuration** workflow on the AWS Management Console.
  + Go to your app page on the AWS Management Console.
  + Choose **Configure**.
  + Select the new runtime and the snapshot that you want to start from, also known as restore configuration. Use the latest setting as the restore configuration to start the app from the latest snapshot. Point to the new upgraded application JAR/zip on Amazon S3.
+ Use the AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) action.
+ Use CloudFormation (CFN).
  + Update the [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment) field. Previously, CloudFormation deleted the application and created a new one, causing your snapshots and other app history to be lost. Now CloudFormation updates your RuntimeEnvironment in place and does not delete your application. 
+ Use the AWS SDK.
  + Consult the SDK documentation for the programming language of your choice. See [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

You can perform the upgrade while the application is in `RUNNING` state or while the application is stopped in `READY` state. Amazon Managed Service for Apache Flink validates to verify the compatibility between the original runtime version and the target runtime version. This compatibility check runs when you perform [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) while in `RUNNING` state or at the next [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) if you upgrade while in `READY` state. 

## Upgrade an application in `RUNNING` state
<a name="upgrading-running"></a>

The following example shows upgrading an app in `RUNNING` state named `UpgradeTest` to Flink 1.18 in US East (N. Virginia) using the AWS CLI and starting the upgraded app from the latest snapshot. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ If you enabled service snapshots and want to continue the application from the latest snapshot, Amazon Managed Service for Apache Flink verifies that the current `RUNNING` application's runtime is compatible with the selected target runtime.
+ If you have specified a snapshot from which to continue the target runtime, Amazon Managed Service for Apache Flink verifies that the target runtime is compatible with the specified snapshot. If the compatibility check fails, your update request is rejected and your application remains untouched in the `RUNNING` state.
+ If you choose to start your application without a snapshot, Amazon Managed Service for Apache Flink doesn't run any compatibility checks.
+ If your upgraded application fails or gets stuck in a transitive `UPDATING` state, follow the instructions in the [Roll back application upgrades](rollback.md) section to return to the healthy state. 

**Process flow for running state applications**

![\[The following diagram represents the recommended workflow to upgrade the application while running. We assume that the application is stateful and that you enabled snapshots. For this workflow, on update, you restore the application from the latest snapshot that was automatically taken by Amazon Managed Service for Apache Flink before updating.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/in-place-update-while-running.png)


## Upgrade an application in **READY** state
<a name="upgrading-ready"></a>

The following example shows upgrading an app in `READY` state named `UpgradeTest` to Flink 1.18 in US East (N. Virginia) using the AWS CLI. There is no specified snapshot to start the app because the application is not running. You can specify a snapshot when you issue the start application request.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ You can update the runtime of your applications in `READY` state to any Flink version. Amazon Managed Service for Apache Flink does not run any checks until you start your application.
+  Amazon Managed Service for Apache Flink only runs compatibility checks against the snapshot you selected to start the app. These are basic compatibility checks following the [Flink Compatibility Table](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). They only check the Flink version with which the snapshot was taken and the Flink version you are targeting. If the Flink runtime of the selected snapshot is incompatible with the app's new runtime, the start request might be rejected.

**Process flow for ready state applications**

![\[The following diagram represents the recommended workflow to upgrade the application while in ready state. We assume that the application is stateful and that you enabled snapshots. For this workflow, on update, you restore the application from the latest snapshot that was automatically taken by Amazon Managed Service for Apache Flink when the application was stopped.\]](http://docs.aws.amazon.com/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Roll back application upgrades
<a name="rollback"></a>

If you have issues with your application or find inconsistencies in your application code between Flink versions, you can roll back using the AWS CLI, AWS CloudFormation, AWS SDK, or the AWS Management Console. The following examples show what rolling back looks like in different failure scenarios.

## Runtime upgrade succeeded, the application is in `RUNNING` state, but the job is failing and continuously restarting
<a name="succeeded-restarting"></a>

Assume you are trying to upgrade a stateful application named `TestApplication` from Flink 1.15 to Flink 1.18 in US East (N. Virginia). However, the upgraded Flink 1.18 application is failing to start or is constantly restarting, even though the application is in `RUNNING` state. This is a common failure scenario. To avoid further downtime, we recommend that you roll back your application immediately to the previous running version (Flink 1.15), and diagnose the issue later.

To roll back the application to the previous running version, use the [rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI command or the [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API action. This API action rolls back the changes you've made that resulted in the latest version. Then it restarts your application using the latest successful snapshot. 

We strongly recommend that you take a snapshot with your existing app before you attempt to upgrade. This will help to avoid data loss or having to reprocess data. 

In this failure scenario, CloudFormation will not roll back the application for you. You must update the CloudFormation template to point to the previous runtime and to the previous code to force CloudFormation to update the application. Otherwise, CloudFormation assumes that your application has been updated when it transitions to the `RUNNING` state.

## Rolling back an application that is stuck in `UPDATING`
<a name="stuck-updating"></a>

If your application gets stuck in the `UPDATING` or `AUTOSCALING` state after an upgrade attempt, Amazon Managed Service for Apache Flink offers the [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI command, or the [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API action that can roll back the application to the version before the stuck `UPDATING` or `AUTOSCALING` state. This API rolls back the changes that you’ve made that caused the application to get stuck in `UPDATING` or `AUTOSCALING` transitive state.

# General best practices and recommendations for application upgrades
<a name="best-practices-recommendations"></a>
+ Test the new job/runtime without state on a non-production environment before attempting a production upgrade.
+ Consider testing the stateful upgrade with a non-production application first.
+ Make sure that your new job graph has a compatible state with the snapshot you will be using to start your upgraded application.
  + Make sure that the types stored in operator states stay the same. If the type has changed, Apache Flink can't restore the operator state.
  + Make sure that the Operator IDs you set using the `uid` method remain the same. Apache Flink has a strong recommendation for assigning unique IDs to operators. For more information, see [Assigning Operator IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) in the Apache Flink documentation.

    If you don't assign IDs to your operators, Flink automatically generates them. In that case, they might depend on the program structure and, if changed, can cause compatibility issues. Flink uses Operator IDs to match state in snapshot to operator. Changing Operator IDs results in the application not starting, or state stored in the snapshot being dropped, and the new operator starting without state.
  + Don't change the key used to store the keyed state.
  + Don't modify the input type of stateful operators like window or join. This implicitly changes the type of the internal state of the operator, causing a state incompatibility.

# Precautions and known issues with application upgrades
<a name="precautions"></a>

## Kafka Commit on checkpointing fails repeatedly after a broker restart
<a name="apache-kafka-connector"></a>

There is a known open source Apache Flink issue with the Apache Kafka connector in Flink version 1.15 caused by a critical open source Kafka Client bug in Kafka Client 2.8.1. For more information, see [Kafka Commit on checkpointing fails repeatedly after a broker restart](https://issues.apache.org/jira/browse/FLINK-28060) and [KafkaConsumer is unable to recover connection to group coordinator after commitOffsetAsync exception](https://issues.apache.org/jira/browse/KAFKA-13840).

To avoid this issue, we recommend that you use Apache Flink 1.18 or later in Amazon Managed Service for Apache Flink.

## Known limitations of state compatibility
<a name="state-precautions"></a>
+ If you are using the Table API, Apache Flink doesn't guarantee state compatibility between Flink versions. For more information, see [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) in the Apache Flink documentation.
+ Flink 1.6 states are not compatible with Flink 1.18. The API rejects your request if you try to upgrade from 1.6 to 1.18 and later with state. You can upgrade to 1.8, 1.11, 1.13 and 1.15 and take a snapshot, and then upgrade to 1.18 and later. For more information, see [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/) in the Apache Flink documentation.

## Known issues with the Flink Kinesis Connector
<a name="kinesis-connector-precautions"></a>
+ If you are using Flink 1.11 or earlier and using the `amazon-kinesis-connector-flink` connector for Enhanced-fan-out (EFO) support, you must take extra steps for a stateful upgrade to Flink 1.13 or later. This is because of the change in the package name of the connector. For more information, see [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  The `amazon-kinesis-connector-flink` connector for Flink 1.11 and earlier uses the packaging `software.amazon.kinesis`, whereas the Kinesis connector for Flink 1.13 and later uses `org.apache.flink.streaming.connectors.kinesis`. Use this tool to support your migration: [amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator).
+ If you are using Flink 1.13 or earlier with `FlinkKinesisProducer` and upgrading to Flink 1.15 or later, for a stateful upgrade you must continue to use `FlinkKinesisProducer` in Flink 1.15 or later, instead of the newer `KinesisStreamsSink`. However, if you already have a custom `uid` set on your sink, you should be able to switch to `KinesisStreamsSink` because `FlinkKinesisProducer` doesn't keep state. Flink will treat it as the same operator because a custom `uid` is set.

## Flink applications written in Scala
<a name="scala-precautions"></a>
+ As of Flink 1.15, Apache Flink doesn't include Scala in the runtime. You must include the version of Scala you want to use and other Scala dependencies in your code JAR/zip when upgrading to Flink 1.15 or later. For more information, see [Amazon Managed Service for Apache Flink for Apache Flink 1.15.2 release](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ If your application uses Scala and you are upgrading it from Flink 1.11 or earlier (Scala 2.11) to Flink 1.13 (Scala 2.12), make sure that your code uses Scala 2.12. Otherwise, your Flink 1.13 application may fail to find Scala 2.11 classes in the Flink 1.13 runtime.

## Things to consider when downgrading Flink application
<a name="downgrading-precautions"></a>
+ Downgrading Flink applications is possible, but limited to cases when the application was previously running with the older Flink version. For a stateful upgrade Managed Service for Apache Flink will require using a snapshot taken with matching or earlier version for the downgrade
+ If you are updating your runtime from Flink 1.13 or later to Flink 1.11 or earlier, and if your app uses the HashMap state backend, your application will continuously fail.

# Upgrading to Flink 2.2: Complete guide
<a name="flink-2-2-upgrade-guide"></a>

This guide provides step-by-step instructions for upgrading your Amazon Managed Service for Apache Flink application from Flink 1.x to Flink 2.2. This is a major version upgrade with breaking changes that require careful planning and testing.

**Major version upgrade is uni-directional**  
The Upgrade operation can move your application from Flink 1.x to 2.2 with state preservation, but you cannot move back from 2.2 to 1.x with 2.2 state. If your application becomes unhealthy after upgrading, use the Rollback API to return to the 1.x version with your original 1.x state from the latest snapshot.

## Prerequisites
<a name="upgrade-guide-prerequisites"></a>

Before beginning your upgrade:
+ Review [Breaking changes and deprecations](flink-2-2.md#flink-2-2-breaking-changes)
+ Review [State compatibility guide for Flink 2.2 upgrades](state-compatibility.md)
+ Ensure you have a non-production environment for testing
+ Document your current application configuration and dependencies

## Understanding your migration paths
<a name="upgrade-guide-migration-paths"></a>

Your upgrade experience depends on your application's compatibility with Flink 2.2. Understanding these paths helps you prepare appropriately and set realistic expectations.

**Path 1: Compatible binary and application state**

**What to expect:**
+ Invoke the Upgrade operation
+ Complete the migration to 2.2 with the application status transitioning: `RUNNING` → `UPDATING` → `RUNNING`
+ Preserve all application state without data loss or reprocessing
+ Same experience as minor version migrations

Best for: Stateless applications or applications using compatible serialization (Avro, compatible Protobuf schemas, POJOs without collections)

**Path 2: Binary incompatibilities**

**What to expect:**
+ Invoke the Upgrade operation
+ Operation fails and surfaces the binary incompatibility through Operations API and logs
+ With auto-rollback enabled: Applications automatically roll back within minutes without your intervention
+ With auto-rollback disabled: Applications remain in running state without data processing; you manually roll back to older version
+ Once the binary is fixed, use the [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) for an experience similar to Path 1

Best for: Applications using removed APIs that are detected during Flink job startup

**Path 3: Incompatible application state**

**What to expect:**
+ Invoke the Upgrade operation
+ Migration appears to succeed initially
+ Applications enter restart loops within seconds as state restoration fails
+ Detect failures through CloudWatch Metrics showing continuous restarts
+ Manually invoke the Rollback operation
+ Return to production within minutes after initiating rollback
+ Review [State migration](state-compatibility.md#state-compat-migration) for your application

Best for: Applications with state serialization incompatibilities (POJOs with collections, certain Kryo-serialized state)

**Note**  
It is highly recommended to create a replica of your production application and test each of the following phases of the upgrade on the replica before following the same steps for your production application.

## Phase 1: Preparation
<a name="upgrade-guide-phase-1"></a>

**Update application code**

Update your application code to be compatible with Flink 2.2:
+ **Update Flink dependencies** to version 2.2.0 in your `pom.xml` or `build.gradle`
+ **Update connector dependencies** to Flink 2.2-compatible versions (see [Connector availability](flink-2-2.md#flink-2-2-connectors))
+ **Remove deprecated API usage**:
  + Replace DataSet API with DataStream API or Table API/SQL
  + Replace legacy `SourceFunction`/`SinkFunction` with FLIP-27 Source and FLIP-143 Sink APIs
  + Replace Scala API usage with Java API
+ **Update to Java 17**

**Upload updated application code**
+ Build your application JAR with Flink 2.2 dependencies
+ Upload to Amazon S3 with a **different file name** than your current JAR (for example, `my-app-flink-2.2.jar`)
+ Note the S3 bucket and key for use in the upgrade step

## Phase 2: Enable auto-rollback
<a name="upgrade-guide-phase-2"></a>

Auto-rollback allows Amazon Managed Service for Apache Flink to automatically revert to the previous version if the upgrade fails.

**Check auto-rollback status**

*AWS Management Console:*

1. Navigate to your application

1. Choose **Configuration**

1. Under **Application settings**, verify **System rollback** is enabled

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Enable auto-rollback (if not enabled)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Phase 3: Take snapshot (optional)
<a name="upgrade-guide-phase-3"></a>

If automatic snapshots are enabled for your application you can skip this step, otherwise take a snapshot of your application to save the state of your application before upgrading.

**Take snapshot from running application**

*AWS Management Console:*

1. Navigate to your application

1. Choose **Snapshots**

1. Choose **Create snapshot**

1. Enter a snapshot name (for example, `pre-flink-2.2-upgrade`)

1. Choose **Create**

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Verify snapshot creation**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Wait until `SnapshotStatus` is `READY` before proceeding.

## Phase 4: Upgrade application
<a name="upgrade-guide-phase-4"></a>

You can upgrade your Flink application by using the [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) action.

You can call the `UpdateApplication` API in multiple ways:
+ **Use the AWS Management Console.**
  + Go to your app page on the AWS Management Console.
  + Choose **Configure**.
  + Select the new runtime and the snapshot that you want to start from, also known as restore configuration. Use the latest setting as the restore configuration to start the app from the latest snapshot. Point to the new upgraded application JAR/zip on Amazon S3.
+ **Use the AWS CLI** [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) action.
+ **Use CloudFormation.**
  + Update the `RuntimeEnvironment` field. Previously, CloudFormation deleted the application and created a new one, causing your snapshots and other app history to be lost. Now CloudFormation updates your `RuntimeEnvironment` in place and does not delete your application.
+ **Use the AWS SDK.**
  + Consult the SDK documentation for the programming language of your choice. See [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

You can perform the upgrade while the application is in `RUNNING` state or while the application is stopped in `READY` state. Amazon Managed Service for Apache Flink validates the compatibility between the original runtime version and the target runtime version. This compatibility check runs when you perform `UpdateApplication` while in `RUNNING` state or at the next `StartApplication` if you upgrade while in `READY` state.

**Upgrade from RUNNING state**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Upgrade from READY state**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Phase 5: Monitor upgrade
<a name="upgrade-guide-phase-5"></a>

**Compatibility check**
+ Use the Operations API to check the status of the upgrade. If there are binary incompatibilities or issues with job startup, the upgrade operation will fail with logs.
+ If the Upgrade Operation has succeeded but the application is stuck in restart loops, this means the state is incompatible with the new Flink version or there is a problem with the updated code. Review [State compatibility guide for Flink 2.2 upgrades](state-compatibility.md) on how to identify state incompatibility issues.

**Monitor application health**

*Application state:*
+ Application status should transition: `RUNNING` → `UPDATING` → `RUNNING`
+ Check the runtime of the application. If it is 2.2, the upgrade operation was successful.
+ If your application is in `RUNNING` but still on the older runtime, auto-rollback kicked in. Operations API will show operation as `FAILED`. Check logs to find the exception for failure.

In addition, monitor these metrics in CloudWatch:

*Restart metric:*
+ `numRestarts`: Monitor for unexpected restarts — the upgrade is successful if `numRestarts` is zero and `uptime` or `runningTime` is increasing.

*Checkpoint metrics:*
+ `lastCheckpointDuration`: Should be similar to pre-upgrade values
+ `numberOfFailedCheckpoints`: Should remain at 0

## Phase 6: Validate application behavior
<a name="upgrade-guide-phase-6"></a>

After the application is running on Flink 2.2:

**Functional validation**
+ Verify data is being read from sources
+ Verify data is being written to sinks
+ Verify business logic produces expected results
+ Compare output with pre-upgrade baseline

**Performance validation**
+ Monitor latency metrics (end-to-end processing time)
+ Monitor throughput metrics (records per second)
+ Monitor checkpoint duration and size
+ Monitor memory and CPU utilization

**Run for 24\$1 hours**

Allow the application to run for at least 24 hours in production to ensure:
+ No memory leaks
+ Stable checkpoint behavior
+ No unexpected restarts
+ Consistent throughput

## Phase 7: Rollback procedures
<a name="upgrade-guide-phase-7"></a>

If the upgrade fails or the application is running but unhealthy, roll back to the previous version.

**Automatic rollback**

If auto-rollback is enabled and the upgrade fails during startup, Amazon Managed Service for Apache Flink automatically reverts to the previous version.

**Manual rollback**

If the application is running but unhealthy, use the `RollbackApplication` API:

*AWS Management Console:*

1. Navigate to your application

1. Choose **Actions** → **Roll back**

1. Confirm the rollback

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**What happens during rollback:**
+ Application stops
+ Runtime reverts to previous Flink version
+ Application code reverts to previous JAR
+ Application restarts from the last successful snapshot taken **before** the upgrade

**Important**  
You cannot restore a Flink 2.2 snapshot on Flink 1.x
Rollback uses the snapshot taken before the upgrade
Always take a snapshot before upgrading (Phase 3)

## Next steps
<a name="upgrade-guide-next-steps"></a>

For questions or issues during upgrade, see the [Troubleshoot Managed Service for Apache Flink](troubleshooting.md) or contact AWS Support.

# State compatibility guide for Flink 2.2 upgrades
<a name="state-compatibility"></a>

When upgrading from Flink 1.x to Flink 2.2, state compatibility issues may prevent your application from restoring from snapshots. This guide helps you identify potential compatibility issues and provides migration strategies.

## Understanding state compatibility changes
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 introduces several serialization changes that affect state compatibility. The following are the major ones:
+ **Kryo Version Upgrade**: Apache Flink 2.2 upgrades the bundled Kryo serializer from version 2 to version 5. Because Kryo v5 uses a different binary encoding format than Kryo v2, any operator state that was serialized via Kryo in a Flink 1.x savepoint cannot be restored in Flink 2.2.
+ **Java Collections Serialization**: In Flink 1.x, Java collections (such as `HashMap`, `ArrayList`, and `HashSet`) within POJOs were serialized using Kryo. Flink 2.2 introduces collection-specific optimized serializers that are incompatible with the Kryo-serialized state from 1.x. Applications using Java collections with POJO or Kryo serializers in 1.x cannot restore this state in Flink 2.2. See Flink [documentation](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) for more details on data types and serialization.
+ **Kinesis Connector Compatibility**: The Kinesis Data Streams (KDS) connector version lower than 5.0 maintains state that is not compatible with the Flink 2.2 Kinesis connector version 6.0. You must migrate to connector version 5.0 or greater before your upgrade.

## Serialization compatibility reference
<a name="state-compat-reference"></a>

Review all state declarations in your application and match serialization types to the table below. If any state type is incompatible, see the [State migration](#state-compat-migration) section before proceeding with your upgrade.


**Serialization compatibility reference**  

| Serialization Type | Compatible? | Details | 
| --- | --- | --- | 
| Avro (SpecificRecord, GenericRecord) | Yes | Uses its own binary format independent of Kryo. Ensure you are using Flink's native Avro type information, not Avro registered as a Kryo serializer. | 
| Protobuf | Yes | Uses its own binary encoding independent of Kryo. Verify schema changes follow backward-compatible evolution rules. | 
| POJOs without collections | Yes | Handled by Flink's POJO serializer — but only if the class meets all POJO criteria: public class, public no-arg constructor, all fields either public or accessible via getters/setters, and all field types themselves serializable by Flink. A POJO that violates any of these silently falls back to Kryo and becomes incompatible. | 
| Custom TypeSerializers | Yes | Compatible only if your serializer does not delegate to Kryo internally. | 
| SQL and Table API state | Yes (with caveat) | Uses Flink's internal serializers. However, Apache Flink does not guarantee state compatibility between major versions for Table API applications. Test in a non-production environment first. | 
| POJOs with Java collections (HashMap, ArrayList, HashSet) | No | In Flink 1.x, collections within POJOs were serialized via Kryo v2. Flink 2.2 introduces dedicated collection serializers whose binary format is incompatible with the Kryo v2 format. | 
| Scala case classes | No | Serialized via Kryo in Flink 1.x. The Kryo v2 to v5 upgrade changes the binary format. | 
| Java records | No | Typically fall back to Kryo serialization in Flink 1.x. Verify by testing with disableGenericTypes(). | 
| Third-party library types | No | Types without a registered custom serializer fall back to Kryo. The Kryo v2 to v5 binary format change breaks compatibility. | 
| Any type using Kryo fallback | No | If Flink cannot handle a type with a built-in or registered serializer, it falls back to Kryo. All Kryo-serialized state from 1.x is incompatible with 2.2. | 

## Diagnostic methods
<a name="state-compat-diagnostics"></a>

You can either identify state compatibility issues proactively by looking at application logs or inspecting logs after the [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) operation.

**Identify Kryo fallback in your application**

You can use the following regex pattern in your logs to identify Kryo fallback in your application:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Sample log:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

If the upgrade fails using the UpdateApplication API, the following exceptions might signal that you are encountering serializer-based state incompatibility:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Pre-upgrade checklist
<a name="state-compat-checklist"></a>
+ Review all state declarations in your application
+ Check for POJOs with collections (`HashMap`, `ArrayList`, `HashSet`)
+ Verify serialization methods for each state type
+ Create a prod replica application and test state compatibility using UpdateApplication API on this replica
+ If state is incompatible, select a strategy from [State migration](#state-compat-migration)
+ Enable auto-rollback in your production Flink application configuration

## State migration
<a name="state-compat-migration"></a>

**Rebuild complete state**

Best for applications where state can be rebuilt from source data.

If your application can rebuild state from source data:

1. Stop the Flink 1.x application

1. Upgrade to Flink 2.x with updated code

1. Start with `SKIP_RESTORE_FROM_SNAPSHOT`

1. Allow application to rebuild state

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Best practices
<a name="state-compat-best-practices"></a>

1. **Always use Avro or Protobuf for complex state** — These provide schema evolution and are Kryo-independent

1. **Avoid collections in POJOs** — Use Flink's native `ListState` and `MapState` instead

1. **Test state restoration locally** — Before production upgrade, test with actual snapshots

1. **Take snapshots frequently** — Especially before major version upgrades

1. **Enable auto-rollback** — Configure your MSF application to automatically rollback on failure

1. **Document your state types** — Maintain documentation of all state types and their serialization methods

1. **Monitor checkpoint sizes** — Growing checkpoint sizes may indicate serialization issues

## Next steps
<a name="state-compat-next-steps"></a>

**Plan your upgrade**: See [Upgrading to Flink 2.2: Complete guide](flink-2-2-upgrade-guide.md).

For questions or issues during migration, see the [Troubleshoot Managed Service for Apache Flink](troubleshooting.md) or contact AWS Support.