

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Managed Service para Apache Flink: cómo funciona
<a name="how-it-works"></a>

Managed Service para Apache Flink es un servicio de Amazon completamente administrado, que permite usar una aplicación de Apache Flink para procesar datos de transmisión. Primero, se programa la aplicación Apache Flink y, a continuación, se crea la aplicación Managed Service para Apache Flink.

## Programación de la aplicación de Apache Flink
<a name="how-it-works-programming"></a>

Una aplicación Apache Flink es una aplicación Java o Scala que se crea con el marco Apache Flink. La aplicación Apache Flink se crea y compila de forma local. 

Las aplicaciones utilizan principalmente la [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) o la [API de tabla](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/). Los demás Apache Flink también APIs están disponibles para su uso, pero se utilizan con menos frecuencia para crear aplicaciones de streaming.

Las características de los dos APIs son las siguientes:

### DataStream API
<a name="how-it-works-prog-datastream"></a>

El modelo de programación de la DataStream API Apache Flink se basa en dos componentes:
+ **Flujo de datos:** la representación estructurada de un flujo continuo de registros de datos.
+ **Operador de transformación:** toma uno o más flujos de datos como entrada y produce uno o más flujos de datos como salida.

Las aplicaciones creadas con la DataStream API hacen lo siguiente:
+ Leen los datos de un origen de datos (como un flujo de Kinesis o un tema de Amazon MSK).
+ Aplican transformaciones a los datos, como el filtrado, la agregación o el enriquecimiento.
+ Escriben los datos transformados en un receptor de datos.

Las aplicaciones que utilizan la DataStream API se pueden escribir en Java o Scala y pueden leer desde una transmisión de datos de Kinesis, un tema de Amazon MSK o una fuente personalizada.

La aplicación procesa los datos mediante un *conector*. Apache Flink utiliza los siguientes tipos de conectores: 
+ **Origen**: conector que se utiliza para leer datos externos.
+ **Receptor**: conector que se utiliza para escribir en ubicaciones externas. 
+ **Operador**: conector que se utiliza para procesar datos dentro de la aplicación.

Una aplicación típica consta de al menos un flujo de datos con un origen, un flujo de datos con uno o más operadores y al menos un receptor de datos.

Para obtener más información sobre el uso de la DataStream API, consulte. [Revise los componentes DataStream de la API](how-datastream.md)

### API de tabla
<a name="how-it-works-prog-table"></a>

El modelo de programación de la API Apache Flink se basa en los siguientes componentes:
+ **Entorno de tablas:** una interfaz para los datos subyacentes que se utiliza para crear y alojar una o más tablas. 
+ **Tabla:** objeto que proporciona acceso a una tabla o vista de SQL.
+ **Origen de tabla:** se utiliza para leer datos de una fuente externa, como un tema de Amazon MSK.
+ **Función de tabla:** consulta SQL o llamada a la API que se utiliza para transformar datos.
+ **Receptor de tabla:** se utiliza para escribir datos en una ubicación externa, como un bucket de Amazon S3.

Las aplicaciones creadas con la API de tabla hacen lo siguiente:
+ Crean un `TableEnvironment` conectándose a un `Table Source`. 
+ Crean una tabla en el `TableEnvironment` mediante consultas SQL o funciones de la API de tablas.
+ Ejecutan una consulta en la tabla mediante la API de tabla o SQL.
+ Aplican transformaciones a los resultados de la consulta mediante funciones de tabla o consultas SQL.
+ Escriben los resultados de la consulta o función en un `Table Sink`.

Las aplicaciones que utilizan la API de tablas se pueden escribir en Java o Scala, y pueden consultar datos mediante llamadas a la API o consultas SQL. 

Para obtener más información sobre el uso de la API de tabla, consulte [Revisión de los componentes de la API de tabla](how-table.md).

## Creación de la aplicación de Managed Service para Apache Flink
<a name="how-it-works-app"></a>

El servicio gestionado para Apache Flink es un AWS servicio que crea un entorno para alojar la aplicación Apache Flink y le proporciona la siguiente configuración:
+ **[Uso de propiedades de tiempo de ejecución](how-properties.md):** parámetros que puede proporcionar a su aplicación. Puede cambiar estos parámetros sin tener que volver a compilar el código de la aplicación.
+ **[Implementación de tolerancia a errores](how-fault.md)**: cómo se recupera la aplicación de las interrupciones y se reinicia.
+ **[Registro y supervisión en Amazon Managed Service para Apache Flink](monitoring-overview.md)**: Cómo registra su aplicación los eventos en Logs. CloudWatch 
+ **[Implementación del escalado de aplicaciones](how-scaling.md)**: cómo aprovisiona su aplicación los recursos informáticos.

Crea una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. Para comenzar a crear una aplicación de Managed Service para Apache Flink, consulte [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

# Creación y ejecución de una aplicación de Managed Service para Apache Flink
<a name="how-creating-apps"></a>

En este tema, se incluye información acerca de cómo crear una aplicación de Managed Service para Apache Flink.

**Topics**
+ [Creación del código de su aplicación de Managed Service para Apache Flink](#how-creating-apps-building)
+ [Creación de la aplicación de Managed Service para Apache Flink](#how-creating-apps-creating)
+ [Uso de claves administradas por el cliente](#how-creating-apps-use-cmk)
+ [Inicio de su aplicación de Managed Service para Apache Flink](#how-creating-apps-starting)
+ [Verificación de su aplicación de Managed Service para Apache Flink](#how-creating-apps-verifying)
+ [Habilitación de las restauraciones del sistema para la aplicación Amazon Managed Service para Apache Flink](how-system-rollbacks.md)

## Creación del código de su aplicación de Managed Service para Apache Flink
<a name="how-creating-apps-building"></a>

En esta sección se describen los componentes que debe utilizar para crear el código de su aplicación Managed Service para Apache Flink. 

Le recomendamos que utilice la última versión compatible de Apache Flink para el código de la aplicación. Para obtener información sobre la actualización de aplicaciones Managed Service para Apache Flink, consulte [Uso de actualizaciones de versión locales para Apache Flink](how-in-place-version-upgrades.md). 

El código de la aplicación se debe crear con [Apache Maven](https://maven.apache.org/). Un proyecto de Apache Maven utiliza un archivo `pom.xml` para especificar las versiones de los componentes que utiliza. 

**nota**  
Managed Service para Apache Flink admite archivos JAR de hasta 512 MB de tamaño. Si utiliza un archivo JAR de un tamaño superior a este, la aplicación no podrá iniciarse.

Las aplicaciones ahora pueden usar la API de Java desde cualquier versión de Scala. Debe incluir la biblioteca estándar de Scala que elija en sus aplicaciones de Scala.

Para obtener información sobre cómo crear una aplicación de Managed Service para Apache Flink que utilice **Apache Beam**, consulte [Creación de aplicaciones de Managed Service para Apache Flink con Apache Beam](how-creating-apps-beam.md).

### Especificación de la versión de Apache Flink de su aplicación
<a name="how-creating-apps-building-flink"></a>

Al utilizar la versión 1.1.0 del tiempo de ejecución de Managed Service para Apache Flink y versiones posteriores, debe especificar la versión de Apache Flink que utilizará la aplicación al compilarla. Debe proporcionar la versión de Apache Flink con el parámetro `-Dflink.version`. Por ejemplo, si utiliza Apache Flink 2.2.0, proporcione lo siguiente:

```
mvn package -Dflink.version=2.2.0
```

Para crear aplicaciones con versiones anteriores de Apache Flink, consulte [Versiones anteriores](earlier.md).

## Creación de la aplicación de Managed Service para Apache Flink
<a name="how-creating-apps-creating"></a>

Una vez que haya creado el código de la aplicación, haga lo siguiente para crear su aplicación de Managed Service para Apache Flink (Amazon MSF):
+ **Cargue el código de la aplicación**: cargue el código de la aplicación en un bucket de Amazon S3. Al crear la aplicación, especifique el nombre del bucket de S3 y el nombre del objeto del código de la aplicación. Para ver un tutorial que muestra cómo cargar el código de la aplicación, consulte el tutorial [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).
+ **Cree su aplicación de Managed Service para Apache Flink**: utilice uno de los siguientes métodos para crear su aplicación de Amazon MSF;
**nota**  
Amazon MSF cifra la aplicación de forma predeterminada mediante. Claves propiedad de AWS También puede crear su nueva aplicación con claves administradas por el AWS KMS cliente (CMKs) para crear, poseer y administrar sus claves usted mismo. Para obtener información al respecto CMKs, consulte[Administración de claves en Amazon Managed Service para Apache Flink](key-management-flink.md).
  + **Cree su aplicación Amazon MSF mediante la AWS consola:** puede crear y configurar su aplicación mediante la AWS consola. 

    Al crear la aplicación mediante la consola, se crean automáticamente los recursos dependientes de la aplicación (como los CloudWatch registros, las transmisiones, las funciones de IAM y las políticas de IAM). 

    Al crear la aplicación mediante la consola, debe especificar qué versión de Apache Flink utiliza la aplicación seleccionándola en el menú desplegable de la página **Managed Service para Apache Flink: Crear aplicación**. 

    Para ver un tutorial acerca de cómo utilizar la consola para crear una aplicación, consulte el tutorial [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).
  + **Cree su aplicación Amazon MSF mediante la AWS CLI:** puede crear y configurar su aplicación mediante la AWS CLI. 

    Al crear la aplicación mediante la CLI, también debe crear los recursos dependientes de la aplicación (como las transmisiones de CloudWatch registros, las funciones de IAM y las políticas de IAM) de forma manual.

    Al crear la aplicación mediante la CLI, debe especificar qué versión de Apache Flink utiliza la aplicación mediante el parámetro `RuntimeEnvironment` de la acción `CreateApplication`.
**nota**  
Se puede cambiar el `RuntimeEnvironment` de una aplicación existente. Para aprender a hacerlo, consulte [Uso de actualizaciones de versión locales para Apache Flink](how-in-place-version-upgrades.md).

## Uso de claves administradas por el cliente
<a name="how-creating-apps-use-cmk"></a>

En Amazon MSF, las claves administradas por el cliente (CMKs) son una función mediante la cual puede cifrar los datos de su aplicación con una clave que usted crea, posee y administra en AWS Key Management Service ()AWS KMS. En el caso de una aplicación de Amazon MSF, esto significa que todos los datos sujetos a un [punto de control](how-fault.md) o una [instantánea](how-snapshots.md) de Flink se cifran con una CMK que se define para esa aplicación.

Para utilizar CMK con su aplicación, primero debe [crear la nueva aplicación](#how-creating-apps-creating) y, a continuación, aplicar una CMK. Para obtener más información sobre su uso CMKs, consulte. [Administración de claves en Amazon Managed Service para Apache Flink](key-management-flink.md)

## Inicio de su aplicación de Managed Service para Apache Flink
<a name="how-creating-apps-starting"></a>

Una vez que haya creado el código de la aplicación, lo haya cargado en S3 y creado la aplicación Managed Service para Apache Flink, inicie la aplicación. El inicio de una aplicación Managed Service para Apache Flink normalmente tarda varios minutos.

Utilice uno de los siguientes métodos para iniciar la aplicación:
+ **Inicie la aplicación Managed Service for Apache Flink mediante la AWS consola:** puede ejecutar la aplicación seleccionando **Ejecutar** en la página de la aplicación de la AWS consola.
+ **Inicie su aplicación Managed Service for Apache Flink mediante la AWS API:** puede ejecutar la aplicación mediante la [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)acción. 

## Verificación de su aplicación de Managed Service para Apache Flink
<a name="how-creating-apps-verifying"></a>

Puede comprobar que la aplicación funciona de las siguientes maneras:
+ **Uso de CloudWatch registros:** puede usar CloudWatch Logs y CloudWatch Logs Insights para comprobar que la aplicación se ejecuta correctamente. Para obtener información sobre el uso de CloudWatch Logs con su aplicación Managed Service for Apache Flink, consulte[Registro y supervisión en Amazon Managed Service para Apache Flink](monitoring-overview.md).
+ **Uso de CloudWatch métricas:** puede utilizar CloudWatch las métricas para supervisar la actividad de la aplicación o la actividad de los recursos que la aplicación utiliza como entrada o salida (como las transmisiones de Kinesis, las transmisiones de Firehose o los buckets de Amazon S3). Para obtener más información sobre CloudWatch las métricas, consulta [Cómo trabajar con métricas](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) en la Guía del CloudWatch usuario de Amazon.
+ **Supervisión de las ubicaciones de salida:** si la aplicación escribe la salida en una ubicación (como un bucket o una base de datos de Amazon S3), puede supervisar esa ubicación para localizar los datos escritos.

# Habilitación de las restauraciones del sistema para la aplicación Amazon Managed Service para Apache Flink
<a name="how-system-rollbacks"></a>

Con la función de restauración del sistema, puede lograr una mayor disponibilidad de la aplicación Apache Flink en ejecución en Amazon Managed Service para Apache Flink. Al optar por esta configuración, el servicio puede revertir de manera automática la aplicación a la versión que se estaba ejecutando anteriormente cuando una acción como `UpdateApplication` o `autoscaling` encuentra errores de código o configuración.

**nota**  
Para usar la característica de restauración del sistema, se la debe activar mediante la actualizando la aplicación. De forma predeterminada, las aplicaciones existentes no utilizarán de manera automática la restauración del sistema.

## Funcionamiento
<a name="how-rollback-works"></a>

Al iniciar una operación de aplicación, como una actualización o un escalado, Amazon Managed Service para Apache Flink primero intenta ejecutar esa operación. Si detecta problemas que impiden que la operación se lleve a cabo correctamente, por ejemplo, si hay errores de código o falta de permisos, el servicio inicia automáticamente una operación `RollbackApplication`.

La restauración intenta llevar la aplicación a la versión anterior que se estaba ejecutando correctamente, junto con el estado de la aplicación asociado. Si la restauración se realiza correctamente, la aplicación continúa procesando los datos con un tiempo de inactividad mínimo, mediante la versión anterior. Si la restauración automática también produce un error, Amazon Managed Service para Apache Flink pasa la aplicación al estado `READY` para que se puedan tomar más medidas, incluida la corrección del error y el reintento de la operación. 

Se debe optar por el uso de restauraciones automáticas del sistema. A partir de ahora, puede habilitarlo mediante la consola o la API para todas las operaciones de su aplicación. 

La siguiente solicitud de ejemplo de la acción `UpdateApplication` habilita las restauraciones del sistema para una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Revisión de las situaciones más comunes para la restauración automática del sistema
<a name="common-scenarios"></a>

Las siguientes situaciones ilustran los casos en los que las restauraciones automáticas del sistema son beneficiosas:
+ **Actualizaciones de la aplicación:** si se actualiza la aplicación con un código nuevo que contiene errores al inicializar la tarea de Flink mediante el método principal, la restauración automática permite restaurar la versión anterior en funcionamiento. Otros escenarios de actualización en los que las restauraciones del sistema son útiles incluyen los siguientes:
  + [Si la aplicación se actualiza para que se ejecute con un paralelismo superior al de MaxParallelism.](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto)
  + Si la aplicación se actualiza para ejecutarse con subredes incorrectas para una aplicación de VPC, se produce un error durante el inicio del trabajo de Flink. 
+ **Actualizaciones de la versión de Flink:** cuando se actualiza a una nueva versión de Apache Flink y la aplicación actualizada detecta un problema de compatibilidad con las instantáneas, la restauración del sistema permite volver a la versión anterior de Flink de manera automática. 
+ **AutoScaling:** Cuando la aplicación se amplía, pero tiene problemas para restaurarse desde un punto de almacenamiento debido a que los operadores no coinciden entre la instantánea y el gráfico de tareas de Flink.

## Utilice la operación para revertir el sistema APIs
<a name="operation-apis"></a>

Para ofrecer una mejor visibilidad, Amazon Managed Service para Apache Flink tiene dos APIs relacionados con las operaciones de las aplicaciones que pueden ayudarle a realizar un seguimiento de los errores y las reversiones del sistema relacionadas.

`ListApplicationOperations`

Esta API enumera todas las operaciones realizadas en la aplicación, incluidas `UpdateApplication`, `Maintenance`, `RollbackApplication` y otras en orden cronológico inverso. La siguiente solicitud de ejemplo de la acción `ListApplicationOperations` muestra las 10 primeras instantáneas del estado actual de la aplicación:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

El siguiente ejemplo de solicitud `ListApplicationOperations` ayuda a filtrar la lista para incluir actualizaciones anteriores de la aplicación:

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

Esta API proporciona información detallada sobre una operación específica enumerada por `ListApplicationOperations`, incluido el motivo del error, si corresponde. En la siguiente solicitud de ejemplo de la acción `DescribeApplicationOperation` se muestran los detalles de una operación específica de una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

Para obtener información sobre la resolución de problemas, consulte [Prácticas recomendadas de reversión del sistema](troubleshooting-system-rollback.md).

# Ejecución de una aplicación de Managed Service para Apache Flink
<a name="how-running-apps"></a>

En este tema, se incluye información acerca de cómo ejecutar una aplicación de Managed Service para Apache Flink.

Al ejecutar la aplicación de Managed Service para Apache Flink, el servicio crea un trabajo de Apache Flink. Un trabajo de Apache Flink es el ciclo de vida de ejecución de la aplicación de Managed Service para Apache Flink. El administrador de trabajos administra la ejecución del trabajo y los recursos que utiliza. El administrador de trabajos divide la ejecución de la aplicación en tareas. Cada tarea es gestionada por un administrador de tareas. Al supervisar el rendimiento de la aplicación, se puede examinar el rendimiento de cada administrador de tareas o del administrador de trabajos en su conjunto. 

Para obtener más información sobre trabajos en Apache Flink, consulte [Job and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) en la documentación de Apache Flink.

## Identificación del estado del trabajo y de la aplicación
<a name="how-running-job-status"></a>

Tanto su solicitud como el trabajo de la solicitud tienen un estado de ejecución actual:
+ **Estado de la solicitud:** su solicitud tiene un estado actual que describe su fase de ejecución. El estado de la aplicación puede ser cualquiera de los siguientes:
  + **Estado de solicitud estable:** la solicitud normalmente permanece en este estado hasta que se realiza un cambio de estado:
    + **LISTA:** una aplicación nueva o detenida se encuentra LISTA hasta que se la ejecuta.
    + **EN EJECUCIÓN:** una aplicación que se ha iniciado correctamente se encuentra EN EJECUCIÓN.
  + **Estado de aplicación transitorio:** una aplicación en este estado suele estar en proceso de transición a otro estado. Si una aplicación permanece en estado transitorio durante un período de tiempo, puede detener la aplicación mediante la [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)acción con el `Force` parámetro establecido en. `true` Este estado incluye los siguientes:
    + `STARTING:`Se produce después de la [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)acción. La aplicación está pasando del estado `READY` al `RUNNING`.
    + `STOPPING:`Se produce después de la [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)acción. La aplicación está pasando del estado `RUNNING` al `READY`.
    + `DELETING:`Se produce después de la [DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)acción. La aplicación está en proceso de ser eliminada.
    + `UPDATING:`Se produce después de la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción. La aplicación se está actualizando y volverá al estado `RUNNING` o `READY`.
    + `AUTOSCALING:`La aplicación tiene la `AutoScalingEnabled` propiedad de la aplicación [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)establecida en y el servicio aumenta el paralelismo de la aplicación. `true` Cuando la aplicación se encuentra en este estado, la única acción de API válida que puedes usar es la [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)acción con el `Force` parámetro establecido en. `true` Para obtener información sobre el escalado automático, consulte [Uso del escalado automático en Managed Service para Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:`Se produce después de llamar a la [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)acción con el `Force` parámetro establecido en`true`. La aplicación está en proceso de ser detenida forzosamente. La aplicación está pasando del estado `STARTING`, `UPDATING`, `STOPPING` o `AUTOSCALING` al `READY`.
    + `ROLLING_BACK:`Se produce después de llamar a la [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)acción. La aplicación está en proceso de revertirse a una versión anterior. La aplicación está pasando del estado `UPDATING` o `AUTOSCALING` al `RUNNING`.
    + `MAINTENANCE:` se produce mientras Managed Service para Apache Flink aplica parches a la aplicación. Para obtener más información, consulte [Administración de las tareas de mantenimiento de Managed Service para Apache Flink](maintenance.md).

  Puede comprobar el estado de la aplicación mediante la consola o mediante la [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)acción.
+ **Estado del trabajo:** cuando la solicitud está en el estado `RUNNING`, el trabajo tiene un estado que describe su fase de ejecución actual. El trabajo comienza en el estado `CREATED`y, a continuación, pasa al estado `RUNNING` cuando se inicia. Si se producen condiciones de error, la aplicación pasa al siguiente estado: 
  + En el caso de las aplicaciones que utilizan Apache Flink 1.11 y versiones posteriores, la aplicación ingresa al estado `RESTARTING`.
  + En el caso de las aplicaciones que utilizan Apache Flink 1.8 y versiones anteriores, la aplicación ingresa al estado `FAILING`.

  A continuación, la aplicación pasa al estado `RESTARTING` o `FAILED`, en función de si se puede reiniciar el trabajo. 

  Puedes comprobar el estado del trabajo examinando el CloudWatch registro de tu solicitud para ver si hay cambios de estado.

## Ejecución de cargas de trabajo en lotes
<a name="batch-workloads"></a>

Managed Service para Apache Flink admite la ejecución de cargas de trabajo por lotes de Apache Flink. En un trabajo por lotes, cuando un trabajo de Apache Flink pasa al estado **FINALIZADO**, el estado de la aplicación de Managed Service para Apache Flink se establece en **LISTO**. Para obtener más información sobre el estado de los trabajos de Flink, consulte [Job Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Revisión de recursos de aplicaciones de Managed Service para Apache Flink
<a name="how-resources"></a>

En esta sección se describen los recursos del sistema que utiliza la aplicación. Comprender cómo Managed Service para Apache Flink aprovisiona y utiliza los recursos le ayudará a diseñar, crear y mantener una aplicación Managed Service para Apache Flink estable y de alto rendimiento.

## Recursos de aplicaciones de Managed Service para Apache Flink
<a name="how-resources-kda"></a>

El servicio gestionado para Apache Flink es un AWS servicio que crea un entorno para alojar la aplicación Apache Flink. El servicio Managed Service for Apache Flink proporciona recursos mediante unidades denominadas unidades de **procesamiento de Kinesis** (). KPUs

Una KPU representa los siguientes recursos del sistema:
+ Un núcleo de CPU
+ 4 GB de memoria, de los cuales un GB es memoria nativa y tres GB son memoria de pila
+ 50 GB de espacio en disco

KPUs **ejecuta aplicaciones en distintas unidades de ejecución denominadas **tareas y subtareas**.** Se puede pensar en una subtarea como el equivalente a un hilo.

El número de espacios KPUs disponibles para una aplicación es igual a la `Parallelism` configuración de la aplicación dividida por la `ParallelismPerKPU` configuración de la aplicación. 

Para obtener más información acerca del paralelismo de las aplicaciones, consulte [Implementación del escalado de aplicaciones](how-scaling.md).

## Recursos de la aplicación de Apache Flink
<a name="how-resources-flink"></a>

El entorno Apache Flink asigna los recursos a su aplicación mediante unidades denominadas **ranuras de tareas.** Cuando Managed Service para Apache Flink asigna recursos a su aplicación, asigna uno o más ranuras de tareas de Apache Flink a una sola KPU. La cantidad de ranuras asignadas a una sola KPU es igual a la configuración de su aplicación `ParallelismPerKPU`. Para obtener más información acerca de las ranuras de tareas, consulte [Job Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) en la documentación de Apache Flink.

### Paralelismo del operador
<a name="how-resources-flink-operatorparallelism"></a>

Se puede establecer el número máximo de subtareas que puede utilizar un operador. Este valor se denomina **Paralelismo del operador**. De forma predeterminada, el paralelismo de cada operador de su aplicación es igual al paralelismo de la aplicación. Esto significa que, de forma predeterminada, cada operador de su aplicación puede usar todas las subtareas disponibles en la aplicación si es necesario.

Se puede establecer el paralelismo de los operadores de su aplicación mediante el método `setParallelism`. Con este método, puede controlar la cantidad de subtareas que cada operador puede usar a la vez.

Para obtener más información, consulte [Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) en la documentación de Apache Flink.

### Encadenamiento de operadores
<a name="how-resources-flink-operatorchaining"></a>

Normalmente, cada operador utiliza una subtarea independiente para ejecutar, pero si varios operadores se ejecutan siempre en secuencia, el tiempo de ejecución puede asignarlos todos a la misma tarea. Este proceso se denomina **Encadenamiento de operadores**.

Se pueden encadenar varios operadores secuenciales en una sola tarea si todos funcionan con los mismos datos. A continuación se muestran algunos requisitos necesarios para que esto sea cierto:
+ Los operadores realizan un enrutamiento sencillo de uno a uno.
+ Todos los operadores tienen el mismo paralelismo de operadores.

Cuando su aplicación encadena a los operadores en una sola subtarea, conserva los recursos del sistema, ya que el servicio no necesita realizar operaciones de red ni asignar subtareas a cada operador. Para determinar si su aplicación utiliza el encadenamiento de operadores, consulte el gráfico de trabajo de la consola de Managed Service para Apache Flink. Cada vértice de la aplicación representa uno o más operadores. El gráfico muestra los operadores que se han encadenado como un solo vértice.

# Facturación por segundo en Managed Service para Apache Flink
<a name="how-pricing"></a>

Managed Service para Apache Flink ahora se factura en incrementos de un segundo. Hay un cargo mínimo de diez minutos por aplicación. La facturación por segundo se aplica a las aplicaciones recién lanzadas o que ya están en ejecución. En esta sección, se describe cómo Managed Service para Apache Flink mide y factura su uso. Para obtener más información sobre Amazon Managed Service para Apache Flink, consulte [Precios de Amazon Managed Service para Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). 

## Funcionamiento
<a name="how-resources-kda"></a>

Managed Service for Apache Flink le cobra por la duración y el número de **unidades de procesamiento de Kinesis KPUs (**) que se facturan en incrementos de un segundo en las cantidades admitidas. Regiones de AWS Una sola KPU incluye 1 vCPU de cómputo y 4 GB de memoria. Se le cobrará una tarifa por hora en función del número de aplicaciones KPUs utilizadas para ejecutar sus aplicaciones. 

Por ejemplo, en el caso de una aplicación que se ejecute durante 20 minutos y 10 segundos se le cobrarán 20 minutos y 10 segundos, multiplicados por los recursos que haya utilizado. A una aplicación que se ejecute durante 5 minutos se le cobrará el mínimo de diez minutos, multiplicado por los recursos que se hayan utilizado.

Managed Service para Apache Flink indica el uso en horas. Por ejemplo, 15 minutos corresponde a 0,25 horas. 

En el caso de las aplicaciones de Apache Flink, se le cobrará una sola KPU adicional por aplicación, que se utilizará para la orquestación. A las aplicaciones también se les cobra por el almacenamiento y las copias de seguridad duraderas. El almacenamiento de aplicaciones en ejecución se utiliza para las capacidades de procesamiento con estado en Managed Service for Apache Flink y se cobra por cada. GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month 

En el modo de transmisión, Managed Service for Apache Flink ajusta automáticamente la cantidad de datos que KPUs necesita la aplicación de procesamiento de transmisiones a medida que fluctúan las demandas de memoria y procesamiento. Puede elegir aprovisionar su aplicación con la cantidad requerida de. KPUs 

## Región de AWS disponibilidad
<a name="how-pricing-regions"></a>

**nota**  
En este momento, la facturación por segundo no está disponible en las siguientes regiones: AWS GovCloud (EE. UU. Este), AWS GovCloud (EE. UU. Oeste), China (Pekín) y China (Ningxia).

La facturación por segundo está disponible en las siguientes Regiones de AWS: 
+ Este de EE. UU. (Norte de Virginia): us-east-1
+ Este de EE. UU. (Ohio): us-east-2
+ Oeste de EE. UU. (Norte de California): us-west-1
+ Oeste de EE. UU. (Oregón): us-west-2
+ África (Ciudad del Cabo): af-south-1
+ Asia-Pacífico (Hong Kong): ap-east-1
+ Asia Pacífico (Hyderabad) - ap-south-1
+ Asia-Pacífico (Yakarta): ap-southeast-3
+ Asia-Pacífico (Melbourne): ap-southeast-4
+ Asia-Pacífico (Mumbai): ap-south-1
+ Asia-Pacífico (Osaka): ap-northeast-3
+ Asia-Pacífico (Seúl): ap-northeast-2
+ Asia-Pacífico (Singapur): ap-southeast-1
+ Asia Pacífico (Sídney): ap-southeast-2
+ Asia Pacífico (Tokio): ap-northeast-1
+ Canadá (centro): ca-central-1
+ Oeste de Canadá (Calgary): ca-west-1
+ Europa (Fráncfort): eu-central-1
+ Europa (Irlanda): eu-west-1
+ Europa (Londres): eu-west-2
+ Europa (Milán): eu-south-1
+ Europa (París): eu-west-3
+ Europa (España): eu-south-2
+ Europa (Estocolmo): eu-north-1
+ Europa (Zúrich): eu-central-2
+ Israel (Tel Aviv) il-central-1
+ Medio Oriente (Baréin): me-south-1
+ Medio Oriente (EAU): me-central-1
+ América del Sur (São Paulo): sa-east-1

## Ejemplos de precios
<a name="how-pricing-examples"></a>

Se puede encontrar ejemplos de precios en la página de precios de Managed Service para Apache Flink. Para obtener más información, consulte [Precios de Amazon Managed Service para Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). A continuación se muestran más ejemplos con ilustraciones del informe de costo-uso para cada uno.

### Una carga de trabajo pesada y de larga duración
<a name="pricing-example-1"></a>

Es un gran servicio de transmisión de video y le gustaría crear una recomendación de video en tiempo real basada en las interacciones de sus usuarios. Utiliza una aplicación Apache Flink en Managed Service para Apache Flink para incorporar continuamente los eventos de interacción de los usuarios de varios flujos de datos de Kinesis y procesar los eventos en tiempo real antes de enviarlos a un sistema posterior. Los eventos de interacción del usuario se transforman mediante varios operadores. Esto incluye dividir los datos por tipo de evento, enriquecerlos con metadatos adicionales, ordenarlos por marca de tiempo y almacenarlos en búfer durante 5 minutos antes de su entrega. La aplicación tiene muchos pasos de transformación que requieren un uso intensivo de informática y son paralelizables. Su aplicación Flink está configurada para ejecutarse con 20 para adaptarse a la carga de trabajo. KPUs Su aplicación utiliza 1 GB de copias de seguridad de aplicaciones duraderas todos los días. Los cargos mensuales por Managed Service para Apache Flink se calcularán de la siguiente manera:

**Cargos mensuales**

El precio en la región Este de EE. UU. (Norte de Virginia) es de 0,11 USD por KPU/hora. El servicio administrado para Apache Flink asigna 50 GB de almacenamiento de aplicaciones en ejecución por KPU y cobra 0,10 USD por GB al mes.
+ Cargos mensuales de KPU: 24 horas \$1 30 días \$1 (20 KPUs más 1 KPU adicional para la aplicación de streaming) \$1 0,11\$1 por hora = 1584,00\$1
+ Cargos mensuales por almacenamiento de aplicaciones en ejecución: 30 días \$1 20 x 50\$1 \$1 0,10\$1 por GB al mes = 100\$1 KPUs GB/KPUs 
+ Cargos mensuales por almacenamiento duradero de aplicaciones: 30 días \$1 1 GB \$1 0,023 GB/mes = 0,03 USD
+ Cargos totales: 1.584,00 USD \$1 100 USD \$1 0,03 USD = **1.684,03 USD**

**Informe de uso de costos del servicio administrado para Apache Flink en la consola Billing and Cost Management del mes**

Kinesis Analytics
+ 1,684.03 USD - Este de EE. UU. (Norte de Virginia)
+ Amazon Kinesis Analytics CreateSnapshot
  + 0,023 USD por GB/mes de copias de seguridad duraderas de aplicaciones
    + 1 GB al mes: 0,03 USD
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD por GB de almacenamiento de aplicaciones en ejecución al mes
    + 1000 GB /mes: 100 USD
  + 0,11 USD por unidad de procesamiento de Kinesis por hora para aplicaciones de Apache Flink
    + 15.120 KPU/hora: 1.584 USD

### Una carga de trabajo por lotes que se ejecuta durante aproximadamente 15 minutos todos los días
<a name="pricing-example-2"></a>

Utiliza una aplicación Apache Flink en Managed Service para Apache Flink para transformar los datos de registro en Amazon Simple Storage Service (Amazon S3) en modo por lotes. Los datos de registro se transforman mediante varios operadores. Esto incluye aplicar un esquema a los diferentes eventos de registro, particionar los datos por tipo de evento y ordenarlos por marca de tiempo. La aplicación tiene muchos pasos de transformación, pero ninguno es intensivo desde el punto de vista computacional. Esta aplicación ingiere 2000 records/second datos durante 15 minutos todos los días durante un mes de 30 días. No se crean copias de seguridad duraderas de las aplicaciones. Los cargos mensuales por Managed Service para Apache Flink se calcularán de la siguiente manera:

**Cargos mensuales**

El precio en la región Este de EE. UU. (Norte de Virginia) es de 0,11 USD por KPU/hora. El servicio administrado para Apache Flink asigna 50 GB de almacenamiento de aplicaciones en ejecución por KPU y cobra 0,10 USD por GB al mes.
+ Carga de trabajo por lotes: durante los 15 minutos diarios, la aplicación Managed Service for Apache Flink procesa 2000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Cargos mensuales de KPU: 450 minutes/month dólares\$1 (2 KPUs \$1 1 KPU adicional para la aplicación de streaming) \$1 0,11 USD/hora = 2,48\$1
+ Cargos mensuales por almacenamiento de aplicaciones en ejecución: 450 minutes/month \$1 2 KPUs \$1 50 \$1 0,10 dólares/GB al mes = 0,11\$1 GB/KPUs 
+ Cargos totales: 2,48 USD \$1 0,11 USD = **2,59 USD**

**Informe de uso de costos del servicio administrado para Apache Flink en la consola Billing and Cost Management del mes**

Kinesis Analytics
+ 2,59 USD - Este de EE. UU. (Norte de Virginia)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD por GB/mes de copias de seguridad de aplicaciones en ejecución
    + 1,042 GB/mes: 0,11 USD
  + 0,11 USD por unidad de procesamiento de Kinesis por hora para aplicaciones de Apache Flink
    + 22,5 KPU/hora: 2,48 USD

### Una aplicación de prueba que se detiene y se inicia de forma continua en la misma hora, con varios cargos mínimos
<a name="pricing-example-3"></a>

Es una gran plataforma de comercio electrónico que procesa millones de transacciones todos los días. Quiere desarrollar una detección de fraudes en tiempo real. Utiliza una aplicación Apache Flink en Managed Service para Apache Flink para ingerir eventos de transacciones de Kinesis Data Streams y procesar eventos en tiempo real con diferentes pasos de transformación. Esto incluye el uso de una ventana deslizante para agregar eventos, dividirlos por tipos de eventos y aplicar reglas de detección específicas para diferentes tipos de eventos. Durante el desarrollo, la aplicación se inicia y se detiene varias veces para probar y depurar su comportamiento. Hay ocasiones en las que la aplicación solo se ejecuta durante unos minutos. Hay una hora en la que está probando su aplicación con 4 KPUs y la aplicación no utiliza copias de seguridad duraderas:
+ A las 10:05 a.m., inicia la aplicación, que se ejecuta durante 30 minutos antes de detenerse a las 10:35 a.m.
+ A las 10:40 a. m., vuelve a iniciar la aplicación, que se ejecuta durante 5 minutos antes de detenerse a las 10:45 a. m.
+ A las 10:50 a. m., se vuelve a iniciar la aplicación, que se ejecuta durante 2 minutos antes de detenerse a las 10:52 a. m.

El servicio gestionado para Apache Flink cobra un mínimo de 10 minutos de uso cada vez que una aplicación comienza a ejecutarse. El uso mensual de Managed Service para Apache Flink para su aplicación se calculará de la siguiente manera:
+ La primera vez que se inicia y se detiene la aplicación: 30 minutos de uso
+ La segunda vez que se inicia y se detiene la aplicación: 10 minutos de uso (la aplicación se ejecuta durante 5 minutos redondeados al importe mínimo de 10 minutos)
+ La tercera vez que se inicia y se detiene la aplicación: 10 minutos de uso (la aplicación se ejecuta durante 2 minutos, redondeados al importe mínimo de 10 minutos)

En total, se cobrará a su aplicación por 50 minutos de uso. Si la aplicación no se ejecuta en ningún otro momento del mes, los cargos mensuales por Managed Service para Apache Flink se calcularán de la siguiente manera:

**Cargos mensuales**

El precio en la región Este de EE. UU. (Norte de Virginia) es de 0,11 USD por KPU/hora. El servicio administrado para Apache Flink asigna 50 GB de almacenamiento de aplicaciones en ejecución por KPU y cobra 0,10 USD por GB al mes.
+ Cargos mensuales de KPU: 50 minutos\$1 (4 KPUs más 1 KPU adicional para la aplicación de streaming) \$1 0,11 dólares/hora = 0,46\$1 (redondeados al céntimo más próximo)
+ Cargos mensuales de almacenamiento de aplicaciones en ejecución: 50 minutos \$1 4 KPUs x 50\$1 \$1 0,10\$1 por GB al mes = GB/KPUs 0,03\$1 (redondeados al céntimo más próximo)
+ Cargos totales: 0,46 USD \$1 0,03 = **0,49 USD**

**Informe de uso de costos del servicio administrado para Apache Flink en la consola Billing and Cost Management del mes**

Kinesis Analytics
+ 0,49 USD - Este de EE. UU. (Norte de Virginia)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD por GB de almacenamiento de aplicaciones en ejecución al mes
    + 0,232 GB/mes: 0,03 USD
  + 0,11 USD por unidad de procesamiento de Kinesis por hora para aplicaciones de Apache Flink
    + 4,167 KPU/hora: 0,46 USD

# Revise los componentes DataStream de la API
<a name="how-datastream"></a>

Su aplicación Apache Flink usa la [ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) para transformar los datos en un flujo de datos. 

En esta sección se describen los diferentes componentes que mueven, transforman y rastrean los datos:
+ [Utilice conectores para mover datos en Managed Service for Apache Flink con la API DataStream](how-connectors.md): estos componentes mueven los datos entre la aplicación y el origen y destino de datos externos.
+ [Transforme los datos mediante operadores en Managed Service for Apache Flink con la API DataStream](how-operators.md): estos componentes transforman o agrupan los elementos de datos dentro de la aplicación.
+ [Realice un seguimiento de los eventos en Managed Service for Apache Flink mediante la API DataStream](how-time.md): En este tema se describe cómo Managed Service for Apache Flink rastrea los eventos cuando se usa la API. DataStream 

# Utilice conectores para mover datos en Managed Service for Apache Flink con la API DataStream
<a name="how-connectors"></a>

En la DataStream API de Amazon Managed Service for Apache Flink, los *conectores* son componentes de software que mueven datos hacia y desde una aplicación de Managed Service for Apache Flink. Los conectores son integraciones flexibles que permiten leer archivos y directorios. Los conectores constan de módulos completos para interactuar con los servicios de Amazon y los sistemas de terceros.

Entre los tipos de conectores, se incluyen:
+ [Agregación de orígenes de datos de streaming](how-sources.md): proporcione datos a su aplicación desde un flujo de datos de Kinesis, un archivo u otro origen de datos.
+ [Escritura de datos mediante receptores](how-sinks.md): envíe datos desde su aplicación a un flujo de datos de Kinesis, un flujo de Firehose u otro destino de datos.
+ [Uso de E/S asíncrona](how-async.md): Proporciona acceso asíncrono a un origen de datos (como una base de datos) para enriquecer los eventos de flujos. 

## Conectores disponibles
<a name="how-connectors-list"></a>

El marco de Apache Flink contiene conectores para acceder a los datos desde una variedad de fuentes. Para obtener información sobre los conectores disponibles en el marco Apache Flink, consulte la sección [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) de la [documentación de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**aviso**  
Si tiene aplicaciones que se ejecutan en Flink 1.6, 1.8, 1.11 o 1.13 y desea ejecutarlas en las regiones de Medio Oriente (EAU), Asia Pacífico (Hyderabad), Israel (Tel Aviv), Europa (Zúrich), Medio Oriente (EAU), Asia-Pacífico (Melbourne) o Asia Pacífico (Yakarta), puede que necesite volver a compilar el archivo de su aplicación con un conector actualizado o actualizar a Flink 1.18.   
Los conectores Apache Flink se almacenan en sus propios repositorios de código fuente. Si está actualizando a la versión 1.18 o posterior, debe actualizar sus dependencias. Para acceder al repositorio de los AWS conectores de Apache Flink, consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
La anterior fuente de Kinesis `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` está descontinuada y podría eliminarse en una futura versión de Flink. En su lugar, utilice [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
No hay compatibilidad de estados entre `FlinkKinesisConsumer` y `KinesisStreamsSource`. Para obtener más información, consulte [Migrating existing jobs to new Kinesis Streams Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) en la documentación de Apache Flink.  
 Las siguientes son las pautas recomendadas:   


**Actualizaciones de conectores**  

| Versión de Flink | Conector utilizado | Resolución | 
| --- | --- | --- | 
| 1.19, 1.20 | Origen de Kinesis |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de origen de Kinesis Data Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Sumidero Kinesis |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el concector de receptor de Kinesis Data Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | Origen de DynamoDB Streams |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de origen de DynamoDB Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Receptor de DynamoDB | Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de receptor de DynamoDB más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Receptor de Amazon SQS |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de receptor de Amazon SQS más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1.19, 1.20 | Receptor de Amazon Managed Service para Prometheus |  Al actualizar a las versiones 1.19 y 1.20 de Managed Service para Apache Flink, asegúrese de utilizar el conector de receptor más reciente de Amazon Managed Service para Prometheus. La versión de debe ser 1.0.0 o posterior. Para obtener más información, consulte [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Agregación de orígenes de datos de transmisión a Managed Service para Apache Flink
<a name="how-sources"></a>

Apache Flink proporciona conectores para leer archivos, sockets, colecciones y fuentes personalizadas. En el código de su aplicación, debe utilizar una [fuente de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para recibir datos de un flujo. En esta sección se describen las fuentes disponibles para los servicios de Amazon.

## Uso de Kinesis Data Streams
<a name="input-streams"></a>

La `KinesisStreamsSource` proporciona datos de transmisión a su aplicación desde un flujo de datos de Amazon Kinesis. 

### Creación de un `KinesisStreamsSource`
<a name="input-streams-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obtener más información sobre el uso de un`KinesisStreamsSource`, consulte [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Connector en la documentación de Apache Flink [y nuestro ejemplo KinesisConnectors público](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) en Github.

### Creación de un `KinesisStreamsSource` que utilice un consumidor de EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource` ahora es compatible con la [distribución mejorada (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un consumidor de Kinesis usa EFO, el servicio Kinesis Data Streams le proporciona su propio ancho de banda dedicado, en lugar de hacer que el consumidor comparta el ancho de banda fijo del flujo con los demás consumidores que leen el flujo.

Para obtener más información sobre el uso de EFO con el Kinesis Consumer, [consulte FLIP-128: salida de ventilador mejorada](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) para consumidores de Kinesis. AWS 

Para activar el consumidor EFO, configure los siguientes parámetros en el consumidor de Kinesis:
+ **READER\$1TYPE:** defina este parámetro en **EFO** para que su aplicación utilice un consumidor de EFO para acceder a los datos del flujo de datos de Kinesis. 
+ **EFO\$1CONSUMER\$1NAME:** defina este parámetro en un valor de cadena que sea único entre los consumidores de este flujo. La reutilización de un nombre de consumidor en el mismo flujo de datos de Kinesis provocará la cancelación del consumidor anterior que utilizó ese nombre. 

A fin de configurar un `KinesisStreamsSource` para que use EFO, añada los siguientes parámetros al consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para ver un ejemplo de una aplicación de Managed Service para Apache Flink que utiliza un consumidor de EFO, consulte [nuestro ejemplo de conectores de Kinesis públicos en Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Uso de Amazon MSK
<a name="input-msk"></a>

La fuente `KafkaSource` proporciona datos de streaming a su aplicación desde un tema de Amazon MSK. 

### Creación de un `KafkaSource`
<a name="input-msk-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obtener más información sobre cómo usar un `KafkaSource`, consulte [Replicación MSK](earlier.md#example-msk).

# Escritura de datos mediante receptores en Managed Service para Apache Flink
<a name="how-sinks"></a>

En el código de la aplicación, se puede utilizar cualquier conector de[receptor de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) para escribir en sistemas externos, incluidos los servicios AWS , como Kinesis Data Streams y DynamoDB.

Apache Flink también proporciona receptores para archivos y sockets, y se pueden implementar receptores personalizados. Entre los diversos receptores compatibles, se utilizan con frecuencia los siguientes:

## Uso de Kinesis Data Streams
<a name="sinks-streams"></a>

Apache Flink proporciona información sobre el [conector de Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) en la documentación de Apache Flink.

Para ver un ejemplo de una aplicación que utiliza un flujo de datos de Kinesis como entrada y salida, consulte [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

## Uso de Apache Kafka y Amazon Managed Streaming para Apache Kafka (MSK)
<a name="sinks-MSK"></a>

El [conector Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) ofrece un amplio soporte para publicar datos en Apache Kafka y Amazon MSK, incluidas las garantías de una sola vez. Para aprender a escribir en Kafka, consulte los [ejemplos de conectores Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) en la documentación de Apache Flink.

## Uso de Amazon S3.
<a name="sinks-s3"></a>

Puede utilizar el `StreamingFileSink` de Apache Flink para escribir objetos en un bucket de Amazon S3.

Para ver un ejemplo sobre cómo escribir objetos en S3, consulte [Ejemplo: escritura en un bucket de Amazon S3](earlier.md#examples-s3). 

## Uso de Firehose
<a name="sinks-firehose"></a>

El `FlinkKinesisFirehoseProducer` es un receptor de Apache Flink fiable y escalable para almacenar los resultados de las aplicaciones mediante el servicio [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). En esta sección se describe cómo configurar un proyecto de Maven para crear y utilizar un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creación de un `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Ejemplo de código de `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creación de un `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Ejemplo de código de `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

El siguiente ejemplo de código muestra cómo crear y configurar un `FlinkKinesisFirehoseProducer` y enviar datos desde un flujo de datos de Apache Flink al servicio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Para ver un tutorial completo sobre cómo utilizar el receptor de Firehose, consulte [Ejemplo: escritura en Firehose](earlier.md#get-started-exercise-fh).

# Utilice el modo I/O asincrónico en el servicio gestionado de Apache Flink
<a name="how-async"></a>

Un I/O operador asíncrono enriquece los datos de la transmisión mediante una fuente de datos externa, como una base de datos. Managed Service para Apache Flink enriquece los eventos del flujo de forma asíncrona para que las solicitudes se puedan agrupar en lotes y aumentar la eficiencia. 

Para obtener más información, consulte [E/S asíncrona](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) en la documentación de Apache Flink.

# Transforme los datos mediante operadores en Managed Service for Apache Flink con la API DataStream
<a name="how-operators"></a>

Para transformar los datos entrantes en un Managed Service para Apache Flink, utiliza un *operador* de Apache Flink. Un operador de Apache Flink transforma uno o más flujos de datos en un nuevo flujo de datos. El nuevo flujo de datos contiene datos modificados del flujo de datos original. Apache Flink proporciona más de 25 operadores de procesamiento de flujos prediseñados. Para obtener más información, consulte [Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) en la documentación de Apache Flink.

**Topics**
+ [Uso de operadores de transformación](#how-operators-transform)
+ [Uso de operadores de agregación](#how-operators-agg)

## Uso de operadores de transformación
<a name="how-operators-transform"></a>

El siguiente es un ejemplo de una transformación de texto simple en uno de los campos de un flujo de datos JSON. 

Este código crea un flujo de datos transformado. El nuevo flujo de datos tiene los mismos datos que el flujo original, con la cadena “` Company`” anexada al contenido del campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Uso de operadores de agregación
<a name="how-operators-agg"></a>

A continuación se muestra un ejemplo de operador de agregación. El código crea un flujo de datos agregado. El operador crea una ventana de caída de 5 segundos y devuelve la suma de los valores `PRICE` de los registros de la ventana con el mismo valor `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Para obtener ejemplos de código, consulte [Ejemplos de cómo crear y utilizar aplicaciones en Managed Service para Apache Flink](examples-collapsibles.md). 

# Realice un seguimiento de los eventos en Managed Service for Apache Flink mediante la API DataStream
<a name="how-time"></a>

Managed Service para Apache Flink realiza un seguimiento de los eventos mediante las siguientes marcas de tiempo:
+ **Hora de procesamiento:** se refiere a la hora del sistema de la máquina que está ejecutando la operación correspondiente.
+ **Hora del evento:** se refiere a la hora en que ocurrió cada evento individual en el dispositivo que lo produjo.
+ **Hora de adquisición de datos:** se refiere al momento en que los eventos ingresan al servicio Managed Service para Apache Flink.

El tiempo utilizado por el entorno de streaming se establece mediante`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Para obtener más información sobre marcas de tiempo, consulte [Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) en la documentación de Apache Fink.

# Revisión de los componentes de la API de tabla
<a name="how-table"></a>

Su aplicación Apache Flink usa la [API de tabla de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) para interactuar con los datos de un flujo mediante un modelo relacional. La API de tabla se usa para acceder a los datos mediante fuentes de tablas y, luego, las funciones de tabla se usan para transformar y filtrar los datos de las tablas. Los datos tabulares se pueden transformar y filtrar mediante funciones de la API, o bien, comandos SQL. 

Esta sección contiene los siguientes temas:
+ [Conectores de la API de tabla](how-table-connectors.md): estos componentes mueven los datos entre la aplicación y el origen y destino de datos externos.
+ [Atributos de tiempo de la API de tabla](how-table-timeattributes.md): este tema describe cómo Managed Service para Apache Flink realiza un seguimiento de los eventos cuando se usa la API de tabla.

# Conectores de la API de tabla
<a name="how-table-connectors"></a>

En el modelo de programación de Apache Flink, los conectores son componentes que la aplicación utiliza para leer o escribir datos de fuentes externas, como otros AWS servicios.

Con la API de tabla de Apache Flink, se pueden usar los siguientes tipos de conectores:
+ [Fuentes de la API de tabla](#how-table-connectors-source): se utilizan los conectores fuente de la API de tabla para crear tablas dentro de su `TableEnvironment` mediante solicitudes a la API o, bien, consultas SQL.
+ [Receptores de la API de tabla](#how-table-connectors-sink): se utilizan comandos SQL para escribir datos de tablas en fuentes externas, como un tema de Amazon MSK o un bucket de Amazon S3.

## Fuentes de la API de tabla
<a name="how-table-connectors-source"></a>

Se crea una fuente de tabla a partir de un flujo de datos. El siguiente código crea una tabla a partir de un tema de Amazon MSK:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

Para obtener más información sobre fuentes de tablas, consulte [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) en la documentación de Apache Flink.

## Receptores de la API de tabla
<a name="how-table-connectors-sink"></a>

Para escribir datos de tabla en un receptor, se crea el receptor en SQL y, a continuación, se ejecuta el receptor basado en SQL en el objeto `StreamTableEnvironment`.

En el siguiente código de ejemplo, se muestra cómo escribir datos de tablas a un receptor de Amazon S3:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 Se puede usar el parámetro `format` para controlar el formato que Managed Service para Apache Flink utiliza para escribir el resultado en el receptor. Para obtener información sobre formatos, consulte [Supported Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) en la documentación de Apache Flink.

## Orígenes y receptores definidos por el usuario
<a name="how-table-connectors-userdef"></a>

Se puede usar los conectores Apache Kafka existentes para enviar datos a otros servicios de AWS desde estos, como Amazon MSK y Amazon S3. Para interactuar con otros orígenes y destinos de datos, puede definir sus propios orígenes y receptores. Para obtener más información, consulte [User-Defined Sources and Sinks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) en la documentación de Apache Flink.

# Atributos de tiempo de la API de tabla
<a name="how-table-timeattributes"></a>

Cada registro de un flujo de datos tiene varias marcas de tiempo que definen cuándo ocurrieron los eventos relacionados con el registro:
+ **Hora del evento**: una marca de tiempo definida por el usuario que define cuándo ocurrió el evento que creó el registro.
+ **Hora de adquisición de datos**: la hora en que la aplicación obtuvo el registro del flujo de datos.
+ **Hora de procesamiento**: la hora en que su solicitud procesó el registro.

Cuando la API de tabla de Apache Flink crea ventanas en función de los tiempos de registro, usted define cuáles de estas marcas de tiempo utiliza con el método `setStreamTimeCharacteristic`. 

Para obtener más información sobre el uso de marcas de tiempo con la API de tabla, consulte [Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) y [Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) en la documentación de Apache Flink.

# Uso de Python con Managed Service para Apache Flink
<a name="how-python"></a>

**nota**  
Si está desarrollando la aplicación Python Flink en una Mac nueva con el chip Apple Silicon, es posible que encuentre algunos [problemas conocidos](https://issues.apache.org/jira/browse/FLINK-26981) con las dependencias de Python de PyFlink la versión 1.15. En este caso, recomendamos ejecutar el intérprete de Python en Docker. Para obtener step-by-step instrucciones, consulta el [desarrollo de la PyFlink versión 1.15 en Apple Silicon Mac](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon).

La versión 2.2 de Apache Flink incluye soporte para la creación de aplicaciones con la versión 3.12 de Python; se ha eliminado la compatibilidad con la versión 3.8 de Python. Para obtener más información, consulte [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). Para crear una aplicación de Managed Service para Apache Flink mediante Python, haga lo siguiente:
+ Cree el código de su aplicación de Python como un archivo de texto con un método `main`.
+ Agrupe el archivo de código de la aplicación y cualquier dependencia de Python o Java en un archivo zip y cárguelo en un bucket de Amazon S3.
+ Cree su aplicación Managed Service para Apache Flink especificando la ubicación del código de Amazon S3, las propiedades y la configuración de la aplicación.

En un nivel alto, la API Python Table es un envoltorio alrededor de la API Java Table. Para obtener información sobre la API de tabla de Python, consulte [Table API Tutorial](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) en la documentación de Apache Flink.

# Programación de una aplicación de Managed Service para Apache Flink para Python
<a name="how-python-programming"></a>

Usted codifica su aplicación Managed Service para Apache Flink para Python mediante la API Apache Flink Python Table. El motor Apache Flink traduce las declaraciones de la API de tablas de Python (que se ejecutan en la máquina virtual de Python) en declaraciones de la API de tabla de Java (que se ejecutan en la máquina virtual de Java). 

Para utilizar la Python Table API, siga estos pasos:
+ Cree una referencia a `StreamTableEnvironment`.
+ Cree `table` objetos a partir de sus datos de transmisión de origen ejecutando consultas en la referencia `StreamTableEnvironment`.
+ Ejecute consultas en sus objetos `table` para crear tablas de salida.
+ Escriba sus tablas de salida en sus destinos utilizando un `StatementSet`.

Para empezar a utilizar la API de tabla de Python en Managed Service para Apache Flink, consulte [Introducción a Amazon Managed Service para Apache Flink para Python](gs-python.md).

## Lectura y escritura de datos de transmisión
<a name="how-python-programming-readwrite"></a>

Para leer y escribir datos de streaming, ejecute consultas SQL en el entorno de tablas.

### Creación de una tabla
<a name="how-python-programming-readwrite-createtable"></a>

El siguiente ejemplo de código muestra una función definida por el usuario que crea una consulta SQL. La consulta SQL crea una tabla que interactúa con un flujo de Kinesis:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Lectura de datos de transmisión
<a name="how-python-programming-readwrite-read"></a>

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL `CreateTable` anterior en una referencia de entorno de tabla para leer datos:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Escritura de datos de transmisión
<a name="how-python-programming-readwrite-write"></a>

El siguiente ejemplo de código muestra cómo utilizar la consulta SQL del ejemplo `CreateTable` para crear una referencia a la tabla de salida y cómo utilizar un `StatementSet` para interactuar con las tablas y escribir datos en un flujo de Kinesis de destino:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Lectura de propiedades de tiempo de ejecución
<a name="how-python-programming-properties"></a>

Puede usar las propiedades de tiempo de ejecución para configurar su aplicación sin tener que cambiar el código de la aplicación.

Las propiedades de la aplicación se especifican de la misma manera que en el caso de una aplicación de Managed Service para Apache Flink para Java. Puede especificar propiedades del tiempo de ejecución de las siguientes maneras:
+ Uso de la acción [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html).
+ Uso de la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.
+ Configuración de su aplicación usando la consola

Puede recuperar las propiedades de la aplicación en el código leyendo un archivo json llamado `application_properties.json`, creado por el motor de ejecución Managed Service para Apache Flink.

El siguiente ejemplo de código demuestra las propiedades de aplicación de lectura desde el archivo `application_properties.json`:

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

El siguiente ejemplo de código de función definido por el usuario muestra la lectura de un grupo de propiedades del objeto de propiedades de la aplicación: recupera:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

En el siguiente ejemplo de código, se muestra la lectura de una propiedad denominada INPUT\$1STREAM\$1KEY de un grupo de propiedades que se devuelve en el ejemplo anterior:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Creación del paquete de códigos de la aplicación
<a name="how-python-programming-package"></a>

Una vez que haya creado su aplicación Python, agrupe el archivo de código y las dependencias en un archivo zip.

El archivo zip debe contener una secuencia de comandos de Python con un método `main` y, de forma opcional, puede contener lo siguiente:
+ Archivos de código Python adicionales
+ Código Java definido por el usuario en archivos JAR
+ Bibliotecas Java en archivos JAR

**nota**  
El archivo zip de la aplicación debe contener todas las dependencias de la aplicación. No puede hacer referencia a bibliotecas de otros orígenes para su aplicación.

# Creación de su aplicación Python de Managed Service para Apache Flink
<a name="how-python-creating"></a>

## Especificación de sus archivos de código
<a name="how-python-creating-code"></a>

Una vez que se crea el paquete de código de la aplicación, se carga a un bucket de Amazon S3. A continuación, crea la aplicación mediante la consola o la [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)acción.

Al crear la aplicación mediante la [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)acción, se especifican los archivos de código y se archivan en el archivo zip mediante un grupo de propiedades de la aplicación especial denominado`kinesis.analytics.flink.run.options`. Puede definir los siguientes tipos de archivos:
+ **python**: archivo de texto que contiene un método principal de Python.
+ **jarfile**: archivo JAR de Java que contiene funciones de Java definidas por el usuario.
+ **PyFiles**: un archivo de recursos de Python que contiene los recursos que utilizará la aplicación.
+ **PyArchives**: un archivo zip que contiene archivos de recursos para la aplicación.

Para obtener más información sobre los tipos de archivos de código Python de Apache Flink, consulte [Command-Line Interface](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/) en la documentación de Apache Flink.

**nota**  
Managed Service para Apache Flink no admite los tipos de archivo `pyModule`, `pyExecutable`, o `pyRequirements`. Todo el código, los requisitos y las dependencias deben estar en el archivo zip. No puede especificar las dependencias que se instalarán mediante pip. 

El siguiente fragmento json de ejemplo muestra cómo especificar las ubicaciones de los archivos dentro del archivo zip de la aplicación:

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Monitoreo de su aplicación de Python de Managed Service para Apache Flink
<a name="how-python-monitoring"></a>

Utiliza el CloudWatch registro de su aplicación para supervisar su aplicación Managed Service for Apache Flink Python.

Managed Service para Apache Flink registra los siguientes mensajes para las aplicaciones Python:
+ Mensajes escritos en la consola utilizando `print()` en el método `main` de la aplicación.
+ Mensajes enviados en funciones definidas por el usuario mediante el paquete `logging`. El siguiente ejemplo de código muestra cómo escribir en el registro de la aplicación desde una función definida por el usuario:

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Mensajes de error emitidos por la aplicación.

  Si la aplicación lanza una excepción en la función `main`, aparecerá en los registros de la aplicación.

  El siguiente ejemplo muestra una entrada de registro para una excepción lanzada desde el código Python:

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**nota**  
Debido a problemas de rendimiento, se recomienda que solo utilice mensajes de registro personalizados durante el desarrollo de la aplicación. 

## Consulta los registros con Insights CloudWatch
<a name="how-python-monitoring-insights"></a>

La siguiente consulta de CloudWatch Insights busca los registros creados por el punto de entrada de Python mientras se ejecuta la función principal de la aplicación:

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Uso de propiedades de tiempo de ejecución en Managed Service para Apache Flink
<a name="how-properties"></a>

Puede usar las *propiedades de tiempo de ejecución* para configurar su aplicación sin tener que volver a compilar el código de la aplicación. 

**Topics**
+ [Administración de las propiedades del tiempo de ejecución mediante la consola](#how-properties-console)
+ [Administración de las propiedades del tiempo de ejecución mediante la CLI](#how-properties-cli)
+ [Acceso a propiedades de tiempo de ejecución en un servicio gestionado para la aplicación Apache Flink](#how-properties-access)

## Administración de las propiedades del tiempo de ejecución mediante la consola
<a name="how-properties-console"></a>

Se pueden añadir, actualizar o eliminar propiedades de tiempo de ejecución de su aplicación Managed Service para Apache Flink mediante la Consola de administración de AWS.

**nota**  
Si se utiliza una versión anterior compatible de Apache Flink y desea actualizar sus aplicaciones actuales a Apache Flink 1.19.1, se puede hacer mediante las actualizaciones de versión integradas de Apache Flink. Con las actualizaciones de versión locales, se retiene la trazabilidad de las aplicaciones con respecto a un único ARN en todas las versiones de Apache Flink, incluidas las instantáneas, los registros, las métricas, las etiquetas, las configuraciones de Flink y más. Se puede utilizar esta función en el estado `RUNNING` y `READY`. Para obtener más información, consulte [Uso de actualizaciones de versión locales para Apache Flink](how-in-place-version-upgrades.md).

**Actualización de propiedades de tiempo de ejecución para un servicio gestionado para la aplicación Apache Flink**

1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

1. Elija su Managed Service para la aplicación Apache Flink. Elija **Detalles de la aplicación**.

1. En la página de su aplicación, elija **Configurar**.

1. Amplíe la sección **Propiedades**.

1. Utilice los controles de la sección **Propiedades** para definir un grupo de propiedades con pares de valor clave. Utilice estos controles para añadir, actualizar o eliminar grupos de propiedades y propiedades de tiempo de ejecución.

1. Elija **Actualizar**.

## Administración de las propiedades del tiempo de ejecución mediante la CLI
<a name="how-properties-cli"></a>

Puede agregar, actualizar o eliminar propiedades en tiempo de ejecución mediante [AWS CLI](https://docs.aws.amazon.com/cli). 

En esta sección se incluyen ejemplos de solicitudes de acciones de API para configurar las propiedades de tiempo de ejecución de una aplicación. Para obtener información sobre cómo utilizar un archivo JSON como entrada para una acción de API, consulte [Ejemplo de código de API de Managed Service para Apache Flink](api-examples.md).

**nota**  
Sustituya el ID de cuenta de muestra (*`012345678901`*) en los siguientes ejemplos por su ID de cuenta.

### Agregación de propiedades de tiempo de ejecución al crear una aplicación
<a name="how-properties-create"></a>

El siguiente ejemplo de solicitud para la [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)acción agrega dos grupos de propiedades de tiempo de ejecución (`ProducerConfigProperties` y `ConsumerConfigProperties`) al crear una aplicación:

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Agregación y actualización de propiedades de tiempo de ejecución en una aplicación existente
<a name="how-properties-update"></a>

El siguiente ejemplo de solicitud de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) agrega o actualiza las propiedades de tiempo de ejecución de una aplicación existente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**nota**  
Si usa una clave que no tiene una propiedad de tiempo de ejecución correspondiente en un grupo de propiedades, Managed Service para Apache Flink agrega el par de valor clave como una nueva propiedad. Si usa una clave para una propiedad de tiempo de ejecución existente en un grupo de propiedades, Managed Service para Apache Flink actualiza el valor de la propiedad. 

### Eliminación de propiedades de tiempo de ejecución
<a name="how-properties-remove"></a>

El siguiente ejemplo de solicitud de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) elimina todas las propiedades de tiempo de ejecución y grupos de propiedades de una aplicación existente:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**importante**  
Si omite un grupo de propiedades existente o una clave de propiedad existente en un grupo de propiedades, ese grupo de propiedades o propiedad se elimina.

## Acceso a propiedades de tiempo de ejecución en un servicio gestionado para la aplicación Apache Flink
<a name="how-properties-access"></a>

Las propiedades de tiempo de ejecución se recuperan en el código de la aplicación Java mediante el método estático `KinesisAnalyticsRuntime.getApplicationProperties()`, que devuelve un objeto `Map<String, Properties>`.

En el siguiente ejemplo de código Java, se recuperan las propiedades de tiempo de ejecución de su aplicación:

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

Puede recuperar un grupo de propiedades (como un objeto `Java.Util.Properties`) de la siguiente manera:

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

Por lo general, se configura una fuente o un receptor de Apache Flink pasando el objeto `Properties` sin necesidad de recuperar las propiedades individuales. El siguiente ejemplo de código muestra cómo crear una fuente de Flink pasando un objeto `Properties` recuperado de las propiedades de tiempo de ejecución:

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

Para ver ejemplos de código, consulte [Ejemplos de cómo crear y utilizar aplicaciones en Managed Service para Apache Flink](examples-collapsibles.md).

# Uso de los conectores de Apache Flink con Managed Service para Apache Flink
<a name="how-flink-connectors"></a>

Los conectores de Apache Flink son componentes de software que transfieren datos hacia y desde una aplicación de Amazon Managed Service para Apache Flink. Los conectores son integraciones flexibles que permiten leer archivos y directorios. Los conectores constan de módulos completos para interactuar con los servicios de Amazon y los sistemas de terceros.

Entre los tipos de conectores, se incluyen:
+ **Orígenes**: proporcione datos a su aplicación desde un flujo de datos de Kinesis, un archivo, un tema de Apache Kafka u otro origen de datos.
+ **Receptores**: envíe datos desde su aplicación a un flujo de datos de Kinesis, un flujo de Firehose, un tema de Apache Kafka u otro destino de datos.
+ **E/S asíncrona**: proporciona acceso asíncrono a un origen de datos (como una base de datos) para enriquecer los flujos. 

Los conectores Apache Flink se almacenan en sus propios repositorios de código fuente. La versión y el artefacto de los conectores de Apache Flink cambian en función de la versión de Apache Flink que utilice y de si utiliza la API DataStream Table o SQL. 

Amazon Managed Service para Apache Flink admite más de 40 conectores fuente y de recepción de Apache Flink prediseñados. En la siguiente tabla, se proporciona un resumen de los conectores más populares y sus versiones asociadas. También se pueden crear receptores personalizados con el marco Async-Sink. Para obtener más información, consulte [The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) en la documentación de Apache Flink.

 Para acceder al repositorio de los conectores de Apache Flink AWS , consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)

## Conectores para Flink 2.2
<a name="connectors-flink-2-2"></a>

Al actualizar a Flink 2.2, debe actualizar las dependencias de sus conectores a versiones que sean compatibles con el entorno de ejecución de Flink 2.x. Los conectores Flink se comercializan independientemente del entorno de ejecución de Flink, y no todos los conectores cuentan todavía con una versión compatible con Flink 2.x. En la siguiente tabla se resume la disponibilidad de los conectores más utilizados en Amazon Managed Service para Apache Flink en el momento de escribir este artículo:


**Conectores para Flink 2.2**  

| Connector | Versión Flink 2.0\$1 | Notas | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Recomendado para Flink 2.2 | 
| Kinesis Data Streams (fuente) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recomendado para Flink 2.2 | 
| Kinesis Data Streams (sumidero) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recomendado para Flink 2.2 | 
| FileSystem (S3, HDFS) | Incluido con Flink | Integrado en la distribución Flink, siempre disponible | 
| JDBC | Aún no se ha publicado para la versión 2.x | No hay disponible ninguna versión compatible con Flink 2.x | 
| OpenSearch | Aún no se ha publicado para la versión 2.x | No hay disponible ninguna versión compatible con Flink 2.x | 
| Elasticsearch | Aún no se ha publicado para la versión 2.x | Considere la posibilidad de migrar al conector OpenSearch  | 
| Servicio administrado por Amazon para Prometheus | Aún no se ha publicado para la versión 2.x | No hay una versión compatible con Flink 2.x en el momento de escribir este artículo | 

Si su aplicación depende de un conector que aún no tiene una versión Flink 2.2, tiene dos opciones: esperar a que el conector publique una versión compatible o evaluar si puede sustituirlo por una alternativa (por ejemplo, utilizando el catálogo JDBC o un receptor personalizado).

**Problemas conocidos**
+ Las aplicaciones que utilizan la `KinesisStreamsSource` ruta EFO (Enhanced Fan-Out/ SubscribeToShard) introducida en los conectores v5.0.0 y v6.0.0 pueden fallar cuando las transmisiones de Kinesis se refragmentan. Se trata de un problema conocido en la comunidad. Para obtener más información, consulte [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Las aplicaciones que utilizan la `KinesisStreamsSource` ruta EFO (Enhanced Fan-Out/ SubscribeToShard) introducida en los conectores v5.0.0 y v6.0.0 `KinesisStreamsSink` pueden sufrir bloqueos si la aplicación Flink está sometida a una contrapresión, lo que provoca una interrupción total del procesamiento de datos en una o varias aplicaciones. TaskManagers Para recuperar la aplicación, es necesaria una operación de parada forzada y otra de inicio de la aplicación. Este es un subcaso de un problema conocido en la comunidad: [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Conectores para versiones anteriores de Flink
<a name="connectors-older-versions"></a>


**Conectores para versiones anteriores de Flink**  

| Connector | Flink versión 1.15 | Flink versión 1.18 | Flink, versión 1.19 | Flink, versión 1.20 | 
| --- | --- | --- | --- | --- | 
| API de Kinesis Data Stream: fuente DataStream y tabla | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4,3,0-1,18 | flink-connector-kinesis, 5,0-1,19 | flink-connector-kinesis, 5,0-0-1,20 | 
| API de Kinesis Data Stream: receptáculo DataStream y tabla | flink-connector-aws-kinesis-streams, 1.15.4 | flink-connector-aws-kinesis-arroyos, 4.3.0-1.18 | flink-connector-aws-kinesis-arroyos, 5.0.0-1.19 | flink-connector-aws-kinesis-arroyos, 5.0.0-1.20 | 
| Kinesis Data Streams Source/Sink - - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4,3,0-1,18 | flink-sql-connector-kinesis, 5,0-1,19 | flink-sql-connector-kinesis-arroyos, 5.0.0-1.20 | 
| Kafka: y API de tablas DataStream  | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3.2,0-1,18 | flink-connector-kafka, 3,30-1,19 | flink-connector-kafka, 3,30-1,20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1,15.4 | flink-sql-connector-kafka, 3.2,0-1,18 | flink-sql-connector-kafka, 3,30-1,19 | flink-sql-connector-kafka, 3,30-1,20 | 
| Firehose DataStream y API de tablas | flink-connector-aws-kinesis-firehose, 1.15.4 | flink-connector-aws-firehose, 4.3,0-1.18 | flink-connector-aws-firehose, 5,0-1,19 | flink-connector-aws-firehose, 5,0-0-1,20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-manguera de incendios, 1.15.4 | flink-sql-connector-aws-manguera de incendios, 4.3.0-1.18 | flink-sql-connector-aws-manguera de incendios, 5.0.0-1.19 | flink-sql-connector-aws-manguera de incendios, 5.0.0-1.20 | 
| DynamoDB DataStream y API de tablas | flink-connector-dynamodb, 3.0.0-1.15 | flink-connector-dynamodb, 4,30-1,18 | flink-connector-dynamodb, 5,0-1,19 | flink-connector-dynamodb, 5,0-0-1,20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3,0-1,15 | flink-sql-connector-dynamodb, 4,30-1,18 | flink-sql-connector-dynamodb, 5,0-1,19 | flink-sql-connector-dynamodb, 5,0-0-1,20 | 
| OpenSearch - DataStream y API de tablas | - | flink-connector-opensearch, 1.2.0-1.18 | flink-connector-opensearch, 1.2,0-1,19 | flink-connector-opensearch, 1.2,0-1,19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-sql-connector-opensearch, 1.2,0-1,19 | flink-sql-connector-opensearch, 1.2,0-1,19 | 
| Amazon Managed Service para Prometheus DataStream | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-prometheus, 1,0-1,19 | flink-connector-prometheus, 1,0-0-1,20 | 
| Amazon SQS DataStream y API de tablas | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-sqs, 5,0-1,19 | flink-connector-sqs, 5,0-0-1,20 | 

Para obtener más información sobre los conectores de Amazon Managed Service para Apache Flink, consulte:
+ [DataStream Conectores API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Conectores de la API de tabla](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Problemas conocidos
<a name="connectors-known-issues"></a>

Existe un problema conocido de código abierto en Apache Flink con el conector Apache Kafka de Apache Flink 1.15. Este problema se resolvió en versiones posteriores de Apache Flink. 

Para obtener más información, consulte [Problemas conocidos](flink-1-15-2.md#flink-1-15-known-issues). 

# Implementación de tolerancia a errores en Managed Service para Apache Flink
<a name="how-fault"></a>

Los puntos de control son el método que se utiliza para implementar la tolerancia a errores en Amazon Managed Service para Apache Flink. Un *punto de control* es una up-to-date copia de seguridad de una aplicación en ejecución que se utiliza para recuperarse inmediatamente de una interrupción inesperada de la aplicación o una conmutación por error. 

Para obtener más información sobre los puntos de control en las aplicaciones de Apache Flink, consulte [Checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) en la documentación de Apache Flink.

Una *instantánea* es una copia de seguridad del estado de la aplicación creada y gestionada manualmente. Las instantáneas permiten restaurar la aplicación a un estado anterior mediante una llamada a [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html). Para obtener más información, consulte [Administración de copias de seguridad de aplicaciones con instantáneas](how-snapshots.md).

Si la aplicación tiene habilitados los puntos de control, el servicio ofrece tolerancia a errores al crear y cargar copias de seguridad de los datos de la aplicación en caso de que la aplicación se reinicie inesperadamente. Estos reinicios inesperados de las aplicaciones pueden deberse a reinicios inesperados de trabajos, errores en las instancias, etc. Esto proporciona a la aplicación la misma semántica que cuando se ejecuta sin errores durante estos reinicios. 

Si las instantáneas están habilitadas para la aplicación y se configuran con las de la aplicación, el servicio proporciona una semántica de [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)procesamiento de una sola vez durante las actualizaciones de la aplicación o durante el escalado o el mantenimiento relacionados con el servicio.

## Configuración de los puntos de control en Managed Service para Apache Flink
<a name="how-fault-configure"></a>

Puede configurar el comportamiento de los puntos de control de la aplicación. Puede definir si mantiene el estado de los puntos de control, con qué frecuencia guarda su estado en los puntos de control y el intervalo mínimo entre el final de una operación de punto de control y el comienzo de otra.

Los siguientes ajustes se configuran mediante las operaciones de la API [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) o [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html):
+ `CheckpointingEnabled`: indica si los puntos de control están habilitados en la aplicación.
+ `CheckpointInterval`: contiene el tiempo en milisegundos entre las operaciones de los puntos de control (persistencia).
+ `ConfigurationType`: establezca este valor en `DEFAULT` para utilizar el comportamiento predeterminado de los puntos de control. Establezca este valor en `CUSTOM` para configurar otros valores.
**nota**  
El comportamiento predeterminado de los puntos de control es el siguiente:  
**CheckpointingEnabled:** verdadero
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints: 5000**
Si **ConfigurationType**se establece en`DEFAULT`, se utilizarán los valores anteriores, incluso si se establecen en otros valores utilizando el código de la AWS Command Line Interface aplicación o configurando los valores en el código de la aplicación.
**nota**  
A partir de la versión 1.15 de Flink, Managed Service para Apache Flink utilizará `stop-with-savepoint` durante la creación automática de instantáneas, es decir, durante la actualización, el escalado o la detención de la aplicación. 
+ `MinPauseBetweenCheckpoints`: el tiempo mínimo en milisegundos entre el final de una operación de punto de control y el inicio de otra. Establecer este valor impide que la aplicación realice un punto de control de forma continua cuando una operación de punto de control tarda más de `CheckpointInterval`.

## Revisión de ejemplos de la API de puntos de control
<a name="how-fault-examples"></a>

En esta sección se incluyen ejemplos de solicitudes de acciones de API para configurar los puntos de control de una aplicación. Para obtener información sobre cómo utilizar un archivo JSON como entrada para una acción de API, consulte [Ejemplo de código de API de Managed Service para Apache Flink](api-examples.md).

### Configuración de puntos de control para una nueva aplicación
<a name="how-fault-examples-create-config"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) configura los puntos de control al crear una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Deshabilitación de los puntos de control para una nueva aplicación
<a name="how-fault-examples-create-disable"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) deshabilita los puntos de control al crear una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Configuración de los puntos de control para una aplicación existente
<a name="how-fault-examples-update-config"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) configura los puntos de control para una aplicación existente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Deshabilitación de los puntos de control para una aplicación existente
<a name="how-fault-examples-update-update-disable"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) deshabilita los puntos de control para una aplicación existente:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Administración de copias de seguridad de aplicaciones con instantáneas
<a name="how-snapshots"></a>

Una *instantánea* es la implementación de Managed Service para Apache Flink en un *punto de guardado* de Apache Flink. Una instantánea es una copia de seguridad del estado de la aplicación activada, creada y gestionada por el usuario o el servicio. Para obtener información sobre los puntos de guardado de Apache Flink, consulte [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) en la documentación de Apache Flink. Con las instantáneas, se puede reiniciar una aplicación a partir de una instantánea concreta del estado de la aplicación.

**nota**  
Se recomienda que la aplicación cree una instantánea varias veces al día para que se reinicie correctamente con los datos de estado correctos. La frecuencia correcta de las instantáneas depende de la lógica de negocios de la aplicación. Tomar instantáneas frecuentes permite recuperar datos más recientes, pero aumenta el costo y requiere más recursos del sistema.

En Managed Service para Apache Flink, las instantáneas se administran mediante las siguientes acciones de la API:
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

Para conocer el límite en el número de instantáneas por aplicación, consulte [Cuota de cuadernos de Managed Service para Apache Flink y cuadernos de Studio](limits.md). Si la aplicación alcanza el límite de instantáneas, la creación manual de una instantánea produce un error con una `LimitExceededException`. 

Managed Service para Apache Flink nunca borra las instantáneas. Usted debe eliminar las instantáneas manualmente usando la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html).

Para cargar una instantánea guardada del estado de la aplicación al iniciar una aplicación, utilice el parámetro [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) o [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html).

**Topics**
+ [Administración de creación de instantáneas automáticas](#how-fault-snapshot-update)
+ [Restauración de una instantánea que contiene datos de estado incompatibles](#how-fault-snapshot-restore)
+ [Revisión de ejemplos de la API de instantáneas](#how-fault-snapshot-examples)

## Administración de creación de instantáneas automáticas
<a name="how-fault-snapshot-update"></a>

Si `SnapshotsEnabled` se establece `true` en [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)para la aplicación, Managed Service for Apache Flink crea y usa instantáneas automáticamente cuando la aplicación se actualiza, escala o detiene para proporcionar una semántica de procesamiento de una sola vez.

**nota**  
Si `ApplicationSnapshotConfiguration::SnapshotsEnabled` se establece en `false`, se perderán datos durante las actualizaciones de la aplicación.

**nota**  
Managed Service para Apache Flink activa puntos de guardado intermedios durante la creación de instantáneas. En la versión 1.15 o superior de Flink, los puntos de guardado intermedios ya no producen efectos secundarios. Consulte [Triggering Savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Las instantáneas creadas automáticamente tienen las siguientes cualidades:
+ El servicio administra la instantánea, pero puede verla mediante la acción. [ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) Las instantáneas creadas automáticamente se tienen en cuenta para el límite de instantáneas.
+ Si la aplicación supera el límite de instantáneas, las instantáneas creadas manualmente fallarán, pero el servicio Managed Service para Apache Flink seguirá creando las instantáneas correctamente cuando la aplicación se actualice, escale o detenga. Debe eliminar manualmente las instantáneas mediante la [ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)acción antes de crear más instantáneas de forma manual.

## Restauración de una instantánea que contiene datos de estado incompatibles
<a name="how-fault-snapshot-restore"></a>

Como las instantáneas contienen información sobre los operadores, la restauración de los datos de estado de una instantánea para un operador que ha cambiado desde la versión anterior de la aplicación puede tener resultados inesperados. Una aplicación fallará si intenta restaurar los datos de estado de una instantánea que no corresponde al operador actual. La aplicación con errores se bloqueará en el estado `STOPPING` o `UPDATING`. 

Para permitir que una aplicación se restaure a partir de una instantánea que contiene datos de estado incompatibles, [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)defina el `AllowNonRestoredState` parámetro para `true` utilizar la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.

Verá el siguiente comportamiento cuando se restaure una aplicación a partir de una instantánea obsoleta:
+ **Operador agregado:** si se agrega un operador nuevo, el punto de guardado no tiene datos de estado para el nuevo operador. No se producirá ningún error y no es necesario establecer `AllowNonRestoredState`.
+ **Operador eliminado:** si se elimina un operador existente, el punto de guardado contiene los datos de estado del operador que falta. Se producirá un error a menos que `AllowNonRestoredState` se establezca en `true`.
+ **Operador modificado:** si se realizan cambios compatibles, como cambiar el tipo de un parámetro por uno compatible, la aplicación puede realizar la restauración a partir de la instantánea obsoleta. Para obtener más información sobre la restauración a partir de instantáneas, consulte [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) en la documentación de Apache Flink. Es posible restaurar una aplicación que utiliza la versión 1.8 o posterior de Apache Flink a partir de una instantánea con un esquema diferente. No se puede restaurar una aplicación que utilice la versión 1.6 de Apache Flink. En el two-phase-commit caso de los receptores, se recomienda utilizar una instantánea del sistema (SWs) en lugar de una instantánea creada por el usuario (). CreateApplicationSnapshot

  En el caso de Flink, Managed Service para Apache Flink activa puntos de guardado intermedios durante la creación de la instantánea. A partir de la versión 1.15 de Flink, los puntos de guardado intermedios ya no producen efectos secundarios. Consulte [Triggering Savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Si necesita reanudar una aplicación que no es compatible con los datos de puntos de almacenamiento existentes, le recomendamos que omita la restauración desde la instantánea configurando el `ApplicationRestoreType` parámetro de la acción en. [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)`SKIP_RESTORE_FROM_SNAPSHOT`

Para obtener más información sobre cómo Apache Flink trata los datos de estado incompatibles, consulte [State Schema Evolution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) en la *documentación de Apache Flink*.

## Revisión de ejemplos de la API de instantáneas
<a name="how-fault-snapshot-examples"></a>

En esta sección se incluyen solicitudes de ejemplo de acciones de API para usar instantáneas con una aplicación. Para obtener información sobre cómo utilizar un archivo JSON como entrada para una acción de API, consulte [Ejemplo de código de API de Managed Service para Apache Flink](api-examples.md).

### Habilitación de las instantáneas para una aplicación
<a name="how-fault-savepoint-examples-enable"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) habilita las instantáneas para una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Crear una instantánea
<a name="how-fault-savepoint-examples-create"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) crea una instantánea del estado actual de la aplicación:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Lista de instantáneas de una aplicación
<a name="how-fault-snapshot-examples-list"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) muestra las 50 primeras instantáneas del estado actual de la aplicación:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### Lista de detalles de la instantánea de una aplicación
<a name="how-fault-snapshot-examples-describe"></a>

En la siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) se muestran los detalles de una instantánea específica de una aplicación:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Eliminar una instantánea
<a name="how-fault-snapshot-examples-delete"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) elimina una instantánea guardada anteriormente. Se puede obtener el valor de `SnapshotCreationTimestamp` usando [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) o [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html):

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Reinicio de una aplicación usando una instantánea con nombre
<a name="how-fault-snapshot-examples-load-custom"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) inicia la aplicación utilizando el estado guardado de una instantánea específica:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Reinicio de una aplicación usando la instantánea más reciente
<a name="how-fault-snapshot-examples-load-recent"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) inicia la aplicación usando la instantánea más reciente:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Reinicio de una aplicación sin utilizar ninguna instantánea
<a name="how-fault-snapshot-examples-load-none"></a>

La siguiente solicitud de ejemplo de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) inicia la aplicación sin cargar el estado de la aplicación, incluso si está presente una instantánea:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Uso de actualizaciones de versión locales para Apache Flink
<a name="how-in-place-version-upgrades"></a>

Con actualizaciones de versión locales para Apache Flink, se conserva la trazabilidad de las aplicaciones en relación a un único ARN en todas las versiones de Apache Flink. Esto incluye instantáneas, registros, métricas, etiquetas, configuraciones de Flink VPCs, aumentos del límite de recursos y más. 

Se pueden realizar actualizaciones de versión locales de Apache Flink para actualizar las aplicaciones existentes a una nueva versión de Flink en Amazon Managed Service para Apache Flink. Para realizar esta tarea, puede utilizar el AWS CLI AWS CloudFormation, el AWS SDK o el. Consola de administración de AWS

**nota**  
No se pueden utilizar las actualizaciones de las versiones in situ para Apache Flink con Amazon Managed Service para Apache Flink Studio.

**Topics**
+ [Actualización de aplicaciones mediante actualizaciones de las versiones in situ para Apache Flink](upgrading-applications.md)
+ [Actualice su aplicación a una nueva versión de Apache Flink](upgrading-application-new-version.md)
+ [Reversiones de las actualizaciones de aplicaciones](rollback.md)
+ [Prácticas recomendadas y sugerencias generales para actualizaciones de aplicaciones](best-practices-recommendations.md)
+ [Precauciones y problemas conocidos relacionados con actualizaciones de aplicaciones](precautions.md)
+ [Actualización a Flink 2.2: guía completa](flink-2-2-upgrade-guide.md)
+ [Guía de compatibilidad estatal para las actualizaciones de Flink 2.2](state-compatibility.md)

# Actualización de aplicaciones mediante actualizaciones de las versiones in situ para Apache Flink
<a name="upgrading-applications"></a>

Antes de comenzar, se recomienda que vea este video: [In-Place Version Upgrades](https://www.youtube.com/watch?v=f1qGGdaP2XI).

Para realizar actualizaciones locales de las versiones de Apache Flink, puede usar el AWS CLI, AWS CloudFormation, AWS SDK o. Consola de administración de AWS Se puede utilizar esta función con cualquier aplicación existente que se utilice con Managed Service para Apache Flink en un estado `READY` o estado .`RUNNING` Utiliza la UpdateApplication API para añadir la posibilidad de cambiar el tiempo de ejecución de Flink.

## Antes de actualización: actualice la aplicación Apache Flink
<a name="before-upgrading"></a>

Al escribir las aplicaciones de Flink, las agrupa con sus dependencias en una JAR de aplicaciones y carga la JAR en su bucket de Amazon S3. A partir de ahí, Amazon Managed Service para Apache Flink ejecuta el trabajo en el nuevo tiempo de ejecución de Flink que haya seleccionado. Quizá se deban actualizar las aplicaciones para lograr la compatibilidad con el entorno de ejecución de Flink al que desea actualizar. Se puede haber inconsistencias entre las versiones de Flink que provoquen un error en la actualización de la versión. Lo más habitual es que se utilice con conectores para fuentes (entrada) o destinos (receptores, egresos) y dependencias de Scala. Flink 1.15 y las versiones posteriores de Managed Service para Apache Flink son independientes de Scala, y su JAR debe contener la versión de Scala que planea usar.

**Cómo actualizar su aplicación**

1. Lea los consejos en la comunidad de Flink sobre cómo actualizar las aplicaciones con el estado. Consulte [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Lea la lista de problemas y limitaciones conocidos. Consulte [Precauciones y problemas conocidos relacionados con actualizaciones de aplicaciones](precautions.md).

1. Actualice sus dependencias y pruebe sus aplicaciones localmente. Estas dependencias suelen ser:

   1. El tiempo de ejecución y la API de Flink.

   1. Se recomiendan conectores para el nuevo tiempo de ejecución de Flink. Se puede encontrarlos en [las versiones de lanzamiento](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) correspondientes al tiempo de ejecución específico al que desee actualizar.

   1. Scala: Apache Flink es independiente de Scala a partir de la versión 1.15 de Flink, incluida esta versión. Debe incluir las dependencias de Scala que desee utilizar en la JAR de la aplicación.

1. Cree una JAR de aplicación en un archivo zip y cárguelo en Amazon S3. Se recomienda que utilice un nombre diferente de la jar/zip anterior. Si necesita realizar una reversión, utilizará esta información.

1. Si se ejecutan aplicaciones con estado, se recomienda que tome una instantánea de la aplicación actual. Esto permite revertirla de forma automática si surgen problemas durante o después de la actualización. 

# Actualice su aplicación a una nueva versión de Apache Flink
<a name="upgrading-application-new-version"></a>

Puede actualizar su aplicación Flink mediante la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.

Se puede llamar a la API de `UpdateApplication` de varias maneras:
+ Utilice el flujo de trabajo de **configuración** existente en la Consola de administración de AWS.
  + Vaya a la página de la aplicación en la Consola de administración de AWS.
  + Elija **Configurar**.
  + Seleccione el nuevo tiempo de ejecución y la instantánea desde la cual desee empezar, lo que también se conoce como configuración de restauración. Utilice la configuración más reciente como la configuración de restauración para iniciar la aplicación desde la última instantánea. Señale la nueva aplicación actualizada JAR/zip en Amazon S3.
+ Utilice la acción de AWS CLI [actualizar la aplicación.](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)
+ Utilice CloudFormation (CFN).
  + Actualice el [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment)campo. Anteriormente, CloudFormation eliminaba la aplicación y creaba una nueva, lo que provocaba la pérdida de las instantáneas y el resto del historial de la aplicación. Ahora CloudFormation actualiza tu RuntimeEnvironment aplicación y no la borra. 
+ Usa el AWS SDK.
  + Consulte la documentación del SDK para obtener información sobre el lenguaje de programación de su elección. Consulte [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

Se puede realizar la actualización mientras la aplicación está en estado `RUNNING` o mientras la aplicación está detenida en ese el estado `READY`. Amazon Managed Service para Apache Flink valida para verificar la compatibilidad entre la versión en tiempo de ejecución original y la versión en tiempo de ejecución de destino. Esta comprobación de compatibilidad se ejecuta cuando se está [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)en el `RUNNING` estado o, al siguiente, [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)si se actualiza mientras se está en el `READY` estado. 

## Actualización de una aplicación en estado `RUNNING`
<a name="upgrading-running"></a>

En el siguiente ejemplo, se muestra la actualización de una aplicación en un `RUNNING` estado denominado `UpgradeTest` Flink 1.18 en EE. UU. Este (Virginia del Norte) mediante la aplicación actualizada AWS CLI y el inicio de la aplicación actualizada a partir de la última instantánea. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ Si se activaron las instantáneas de servicio y se desea continuar con la aplicación desde la última instantánea, Amazon Managed Service para Apache Flink comprueba que el tiempo de ejecución de la aplicación actual `RUNNING` es compatible con el tiempo de ejecución de destino seleccionado.
+ Si se ha especificado una instantánea desde la que continuar con el tiempo de ejecución de destino, Amazon Managed Service para Apache Flink comprueba que el tiempo de ejecución de destino es compatible con la instantánea especificada. Si se produce un error en la comprobación de compatibilidad, se rechaza la solicitud de actualización y la aplicación permanece intacta en el estado `RUNNING`.
+ Si se decide iniciar la aplicación sin una instantánea, Amazon Managed Service para Apache Flink no realiza ninguna comprobación de compatibilidad.
+ Si la aplicación actualizada falla o se queda atascada en un estado `UPDATING` transitivo, siga las instrucciones de la sección [Reversiones de las actualizaciones de aplicaciones](rollback.md) para volver al estado correcto. 

**Flujo de proceso para ejecutar aplicaciones en estado**

![\[En el siguiente diagrama se representa el flujo de trabajo recomendado para actualizar la aplicación mientras está en ejecución. Asumimos que la aplicación está en estado y que se han activado las instantáneas. Para este flujo de trabajo, durante la actualización, se restaura la aplicación a partir de la última instantánea que Amazon Managed Service para Apache Flink realizó automáticamente antes de la actualización.\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/images/in-place-update-while-running.png)


## Actualización de una aplicación en estado **READY**
<a name="upgrading-ready"></a>

En el siguiente ejemplo se muestra la actualización de una aplicación en el estado `READY` denominado `UpgradeTest` a Flink 1.18 en Este de EE. UU. (Norte de Virginia) mediante la AWS CLI. No hay una instantánea específica para iniciar la aplicación porque la aplicación no se está ejecutando. Se puede especificar una instantánea al emitir la solicitud de inicio de la aplicación.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ Se puede actualizar el tiempo de ejecución de sus aplicaciones en estado `READY` a cualquier versión de Flink. Amazon Managed Service para Apache Flink no realiza ninguna comprobación hasta que se inicia la aplicación.
+  Amazon Managed Service para Apache Flink solo ejecuta comprobaciones de compatibilidad con la instantánea que se seleccionó para iniciar la aplicación. Se trata de comprobaciones de compatibilidad básicas que siguen la [tabla de compatibilidad de Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). Solo comprueban la versión de Flink con la que se tomó la instantánea y la versión de Flink objetivo. Si el tiempo de ejecución de Flink de la instantánea seleccionada no es compatible con el nuevo tiempo de ejecución de la aplicación, se puede rechazar la solicitud de inicio.

**Flujo de proceso para aplicaciones en estado Ready**

![\[En el siguiente diagrama se representa el flujo de trabajo recomendado para actualizar la aplicación mientras está en estado Ready. Asumimos que la aplicación está en estado y que se han activado las instantáneas. Para este flujo de trabajo, durante la actualización, se restaura la aplicación a partir de la última instantánea que Amazon Managed Service para Apache Flink tomó cuando se detuvo la aplicación.\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Reversiones de las actualizaciones de aplicaciones
<a name="rollback"></a>

Si tiene problemas con la aplicación o encuentra incoherencias en el código de la aplicación entre las versiones de Flink, puede revertirla utilizando el AWS CLI AWS CloudFormation, AWS el SDK o el. Consola de administración de AWS En los siguientes ejemplos, se muestra el aspecto de la reversión en distintos escenarios de error.

## La actualización en tiempo de ejecución se realizó correctamente, la aplicación está en estado `RUNNING`, pero el trabajo falla y se reinicia continuamente
<a name="succeeded-restarting"></a>

Supongamos que se intenta actualizar una aplicación con estado denominada `TestApplication` de Flink 1.15 a Flink 1.18 en Este de EE. UU. (Norte de Virginia). Sin embargo, la aplicación Flink 1.18 actualizada no se inicia o se reinicia constantemente, aunque la aplicación esté en ese estado `RUNNING`. Este es un escenario de error común. Para evitar un mayor tiempo de inactividad, se recomienda que restablezca la aplicación inmediatamente a la versión en ejecución anterior (Flink 1.15) y que diagnostique el problema más adelante.

Para revertir la aplicación a la versión en ejecución anterior, usa el comando [rollback-application o la acción](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI de la API. [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) Esta acción de la API revierte los cambios realizados y que dieron como resultado la versión más reciente. A continuación, se reinicia la aplicación con la última instantánea correcta. 

Se recomienda encarecidamente que se tome una instantánea de la aplicación existente antes de intentar actualizarla. Esto ayudará a evitar la pérdida de datos o tener que volver a procesarlos. 

En este escenario de error, no CloudFormation revertirá la aplicación por usted. Debe actualizar la CloudFormation plantilla para que apunte al tiempo de ejecución anterior y al código anterior CloudFormation para forzar la actualización de la aplicación. De lo contrario, se CloudFormation supone que la aplicación se ha actualizado cuando pase al `RUNNING` estado.

## Reversión de una aplicación que está atascada en `UPDATING`
<a name="stuck-updating"></a>

Si tu aplicación se queda atascada en el `AUTOSCALING` estado `UPDATING` o después de un intento de actualización, Amazon Managed Service for Apache Flink ofrece el AWS CLI comando [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html), o la acción de [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API que puede revertir la aplicación a la versión anterior al bloqueo o estado. `UPDATING` `AUTOSCALING` Esta API revierte los cambios que se han realizado y que han provocado que la aplicación quede atascada o en un estado transitivo `UPDATING` o `AUTOSCALING`.

# Prácticas recomendadas y sugerencias generales para actualizaciones de aplicaciones
<a name="best-practices-recommendations"></a>
+ Pruebe la nueva versión job/runtime sin estado en un entorno que no sea de producción antes de intentar una actualización de producción.
+ Primero puede probar la actualización con estado con una aplicación que no sea de producción.
+ Asegúrese de que el nuevo gráfico de tareas tenga un estado compatible con la instantánea que se usará para iniciar la aplicación actualizada.
  + Asegúrese de que los tipos almacenados en los estados del operador permanezcan iguales. Si el tipo ha cambiado, Apache Flink no podrá restaurar el estado del operador.
  + Asegúrese de que el operador IDs que configuró con el `uid` método siga siendo el mismo. Apache Flink recomienda encarecidamente asignar elementos exclusivos IDs a los operadores. Para obtener más información, consulte [Asignación de operadores IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) en la documentación de Apache Flink.

    Si no los asigna IDs a sus operadores, Flink los genera automáticamente. En ese caso, pueden depender de la estructura del programa y, si se modifican, pueden provocar problemas de compatibilidad. Flink usa Operator IDs para hacer coincidir el estado de la instantánea con el operador. Al cambiar IDs el operador, la aplicación no se inicia o se elimina el estado almacenado en la instantánea y el nuevo operador se inicia sin estado.
  + No cambie la clave utilizada para almacenar el estado de la clave.
  + No modifique el tipo de entrada de los operadores con estado, como window o join. Hacerlo cambia implícitamente el tipo de estado interno del operador y provoca una incompatibilidad de estados.

# Precauciones y problemas conocidos relacionados con actualizaciones de aplicaciones
<a name="precautions"></a>

## La confirmación de Kafka en los puntos de control falla repetidamente tras el reinicio de un agente
<a name="apache-kafka-connector"></a>

Existe un problema conocido de código abierto con Apache Flink en el conector Apache Kafka de la versión 1.15 de Flink causado por un error crítico de código abierto en el cliente de Kafka en Kafka Client 2.8.1. Para obtener más información, consulte [Kafka Commit sobre los puntos de control que fallan repetidamente tras el reinicio de un broker](https://issues.apache.org/jira/browse/FLINK-28060) y [KafkaConsumer no puede recuperar la conexión con el coordinador del grupo tras commitOffsetAsync ](https://issues.apache.org/jira/browse/KAFKA-13840) una excepción.

Para evitar este problema, se recomienda que utilice Apache Flink 1.18 o versiones posteriores en Amazon Managed Service para Apache Flink.

## Limitaciones conocidas de compatibilidad entre estados
<a name="state-precautions"></a>
+ Si se utiliza la API de tabla, Apache Flink no garantiza la compatibilidad de estado entre las versiones de Flink. Para obtener más información, consulte [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) en la documentación de Apache Flink.
+ Los estados de Flink 1.6 no son compatibles con Flink 1.18. La API rechazará su solicitud si intenta actualizar de la versión 1.6 a la versión 1.18 o versiones posteriores con el estado. Se puede actualizar a las versiones 1.8, 1.11, 1.13 y 1.15 y tomar una instantánea y, después, actualizar a la versión 1.18 y versiones posteriores. Para obtener más información, consulte [Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/) en la documentación de Apache Flink.

## Problemas conocidos con el conector Flink Kinesis
<a name="kinesis-connector-precautions"></a>
+ Si utiliza Flink 1.11 o una versión anterior y utiliza el `amazon-kinesis-connector-flink` conector de soporte Enhanced-fan-out (EFO), debe tomar medidas adicionales para realizar una actualización completa a Flink 1.13 o posterior. Esto se debe al cambio en el nombre del paquete del conector. Para obtener más información, consulte [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  El conector `amazon-kinesis-connector-flink` de Flink 1.11 y versiones anteriores usa el paquete `software.amazon.kinesis`, mientras que el conector de Kinesis para Flink 1.13 y versiones posteriores usa `org.apache.flink.streaming.connectors.kinesis`. [Utilice esta herramienta para respaldar su migración: -state-migrator. amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)
+ Si se utiliza Flink 1.13 o una versión anterior con `FlinkKinesisProducer` y se la actualiza a Flink 1.15 o una versión posterior, para obtener una actualización completa se debe seguir usando `FlinkKinesisProducer` en Flink 1.15 o posterior, en lugar de utilizar la versión más reciente `KinesisStreamsSink`. Sin embargo, si ya tiene un conjunto `uid` personalizado en el receptor, se debe poder cambiarlo a `KinesisStreamsSink` porque `FlinkKinesisProducer` no mantiene el estado actual. Flink lo tratará como el mismo operador porque `uid` tiene una configuración personalizada.

## Aplicaciones de Flink escritas en Scala
<a name="scala-precautions"></a>
+ A partir de la versión 1.15, Apache Flink no incluye Scala en el tiempo de ejecución. Debe incluir la versión de Scala que desee usar y otras dependencias de Scala en su código JAR/zip cuando actualice a Flink 1.15 o una versión posterior. Para obtener más información, consulte [Amazon Managed Service para Apache Flink para Apache Flink versión 1.15.2](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html)
+ Si la aplicación usa Scala y la está actualizando de Flink 1.11 o anterior (Scala 2.11) a Flink 1.13 (Scala 2.12), asegúrese de que su código use Scala 2.12. De lo contrario, es posible que la aplicación Flink 1.13 no encuentre las clases de Scala 2.11 en el tiempo de ejecución de Flink 1.13.

## Aspectos que se deben tener en cuenta al cambiar de aplicación Flink
<a name="downgrading-precautions"></a>
+ Es posible cambiar las aplicaciones de Flink, pero se limita a los casos en que la aplicación se ejecutaba anteriormente con la versión anterior de Flink. Para una actualización de estado completo, Managed Service para Apache Flink requerirá el uso de una instantánea tomada con una versión coincidente o anterior para el cambio
+ Si estás actualizando tu entorno de ejecución de Flink 1.13 o posterior a Flink 1.11 o anterior, y si tu aplicación usa el backend de estado, la HashMap aplicación fallará continuamente.

# Actualización a Flink 2.2: guía completa
<a name="flink-2-2-upgrade-guide"></a>

Esta guía proporciona step-by-step instrucciones para actualizar la aplicación Amazon Managed Service for Apache Flink de Flink 1.x a Flink 2.2. Se trata de una actualización de versión importante con cambios importantes que requieren una planificación y pruebas cuidadosas.

**La actualización de la versión principal es unidireccional**  
La operación de actualización puede hacer que la aplicación pase de la versión 1.x a la 2.2 manteniendo el estado, pero no puede volver de la versión 2.2 a la versión 1.x con el estado 2.2. Si su aplicación deja de funcionar correctamente tras la actualización, utilice la API Rollback para volver a la versión 1.x con el estado 1.x original de la última instantánea.

## Requisitos previos
<a name="upgrade-guide-prerequisites"></a>

Antes de comenzar la actualización:
+ Revisar [Cambios y desaprobaciones de última hora](flink-2-2.md#flink-2-2-breaking-changes)
+ Revisión [Guía de compatibilidad estatal para las actualizaciones de Flink 2.2](state-compatibility.md)
+ Asegúrese de tener un entorno que no sea de producción para las pruebas
+ Documente la configuración y las dependencias actuales de su aplicación

## Comprenda sus rutas de migración
<a name="upgrade-guide-migration-paths"></a>

Su experiencia de actualización depende de la compatibilidad de su aplicación con Flink 2.2. Comprender estos caminos le ayuda a prepararse adecuadamente y a establecer expectativas realistas.

**Ruta 1: estado del binario y de la aplicación compatibles**

**Qué esperar:**
+ Invoque la operación de actualización
+ Complete la migración a la versión 2.2 con la transición del estado de la aplicación: → → `RUNNING` `UPDATING` `RUNNING`
+ Conserve todo el estado de la aplicación sin perder ni reprocesar los datos
+ La misma experiencia que en las migraciones de versiones menores

Ideal para: aplicaciones sin estado o aplicaciones que utilizan una serialización compatible (Avro, esquemas de Protobuf compatibles, sin colecciones) POJOs 

**Ruta 2: Incompatibilidades binarias**

**Qué esperar:**
+ Invoque la operación de actualización
+ La operación falla y revela la incompatibilidad binaria a través de la API de operaciones y los registros
+ Con la reversión automática habilitada: las aplicaciones se restablecen automáticamente en cuestión de minutos sin su intervención
+ Con la reversión automática desactivada: las aplicaciones permanecen en estado de ejecución sin procesamiento de datos; se restablece manualmente a la versión anterior
+ Una vez que se haya corregido el binario, utilice la [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) para disfrutar de una experiencia similar a la de Path 1

Ideal para: aplicaciones eliminadas APIs que se detectan durante el inicio de un trabajo de Flink

**Ruta 3: estado de la aplicación incompatible**

**Qué esperar:**
+ Invoque la operación de actualización
+ Al principio parece que la migración se ha realizado correctamente
+ Las aplicaciones entran en ciclos de reinicio en cuestión de segundos cuando se produce un error en la restauración del estado
+ Detecte los fallos mediante CloudWatch métricas que muestran los reinicios continuos
+ Invoque manualmente la operación de reversión
+ Vuelva a la producción unos minutos después de iniciar la reversión
+ Revise [Migración estatal](state-compatibility.md#state-compat-migration) su solicitud

Ideal para: aplicaciones con incompatibilidades de serialización por estado (POJOs con colecciones, cierto estado de serialización criptográfica)

**nota**  
Se recomienda encarecidamente crear una réplica de la aplicación de producción y probar cada una de las siguientes fases de la actualización en la réplica antes de seguir los mismos pasos para la aplicación de producción.

## Fase 1: Preparación
<a name="upgrade-guide-phase-1"></a>

**Actualizar el código de la aplicación**

Actualice el código de su aplicación para que sea compatible con Flink 2.2:
+ **Actualice las dependencias de Flink** a la versión 2.2.0 en su o `pom.xml` `build.gradle`
+ **Actualice las dependencias de los conectores a las versiones compatibles** con Flink 2.2 (consulte) [Disponibilidad de conectores](flink-2-2.md#flink-2-2-connectors)
+ **Elimine el uso obsoleto** de la API:
  + Sustituya DataSet la API por una DataStream API o una tabla (API/SQL)
  + Sustituya la versión antigua`SourceFunction`/por FLIP-27 `SinkFunction` Source y FLIP-143 Sink APIs
  + Sustituya el uso de la API de Scala por la API de Java
+ **Actualización a Java 17**

**Cargue el código de aplicación actualizado**
+ Cree el JAR de su aplicación con las dependencias de Flink 2.2
+ Cargue en Amazon S3 con un **nombre de archivo diferente** al de su JAR actual (por ejemplo,`my-app-flink-2.2.jar`)
+ Anote el depósito y la clave de S3 que se utilizarán en el paso de actualización

## Fase 2: habilitar la reversión automática
<a name="upgrade-guide-phase-2"></a>

La reversión automática permite a Amazon Managed Service for Apache Flink volver automáticamente a la versión anterior si se produce un error en la actualización.

**Compruebe el estado de la reversión automática**

*Consola de administración de AWS:*

1. Navegue hasta su solicitud

1. Elija **Configuración**

1. En **Configuración de la aplicación**, compruebe que la **reversión del sistema esté habilitada**

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Habilitar la reversión automática (si no está habilitada)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Fase 3: tomar una instantánea (opcional)
<a name="upgrade-guide-phase-3"></a>

Si las instantáneas automáticas están habilitadas para su aplicación, puede omitir este paso; de lo contrario, tome una instantánea de la aplicación para guardar el estado de la aplicación antes de la actualización.

**Tome una instantánea de la aplicación en ejecución**

*Consola de administración de AWS:*

1. Navegue hasta su aplicación

1. Elige **Instantáneas**

1. Elija **Crear instantánea**

1. Introduzca el nombre de una instantánea (por ejemplo,`pre-flink-2.2-upgrade`)

1. Elija **Create (Creación de)**.

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Verifique la creación de la instantánea**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Espere hasta que `SnapshotStatus` sea así `READY` antes de continuar.

## Fase 4: Actualizar la aplicación
<a name="upgrade-guide-phase-4"></a>

Puede actualizar su aplicación Flink mediante la [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.

Se puede llamar a la API de `UpdateApplication` de varias maneras:
+ **Utilice la Consola de administración de AWS.**
  + Vaya a la página de la aplicación en la Consola de administración de AWS.
  + Elija **Configurar**.
  + Seleccione el nuevo tiempo de ejecución y la instantánea desde la cual desee empezar, lo que también se conoce como configuración de restauración. Utilice la configuración más reciente como la configuración de restauración para iniciar la aplicación desde la última instantánea. Señale la nueva aplicación actualizada JAR/zip en Amazon S3.
+ **Usa la AWS CLI[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)**acción.
+ **Usa CloudFormation.**
  + Actualiza el `RuntimeEnvironment` campo. Anteriormente, CloudFormation eliminaba la aplicación y creaba una nueva, lo que provocaba la pérdida de las instantáneas y del resto del historial de la aplicación. Ahora CloudFormation actualiza tu `RuntimeEnvironment` aplicación y no la borra.
+ **Usa el AWS SDK.**
  + Consulte la documentación del SDK para obtener información sobre el lenguaje de programación de su elección. Consulte [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

Se puede realizar la actualización mientras la aplicación está en estado `RUNNING` o mientras la aplicación está detenida en ese el estado `READY`. Amazon Managed Service for Apache Flink valida la compatibilidad entre la versión en tiempo de ejecución original y la versión en tiempo de ejecución de destino. Esta comprobación de compatibilidad se ejecuta cuando se está en `UpdateApplication` el `RUNNING` estado o, al siguiente, `StartApplication` si se actualiza mientras se está en `READY` el estado.

**Actualización desde el estado DE EJECUCIÓN**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Actualización desde el estado READY**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Fase 5: Actualización del monitor
<a name="upgrade-guide-phase-5"></a>

**Comprobación de compatibilidad**
+ Utilice la API de operaciones para comprobar el estado de la actualización. Si hay incompatibilidades binarias o problemas al iniciar el trabajo, la operación de actualización fallará con los registros.
+ Si la operación de actualización se ha realizado correctamente, pero la aplicación está atascada en los bucles de reinicio, significa que el estado no es compatible con la nueva versión de Flink o que hay un problema con el código actualizado. Revise [Guía de compatibilidad estatal para las actualizaciones de Flink 2.2](state-compatibility.md) cómo identificar los problemas de incompatibilidad entre estados.

**Controle el estado de las aplicaciones**

*Estado de la aplicación:*
+ El estado de la solicitud debe cambiar: `RUNNING` → `UPDATING` → `RUNNING`
+ Compruebe el tiempo de ejecución de la aplicación. Si es 2.2, la operación de actualización se realizó correctamente.
+ Si la aplicación está activa`RUNNING`, pero aún se encuentra en el tiempo de ejecución anterior, se activará la reversión automática. La API de operaciones mostrará la operación como. `FAILED` Compruebe los registros para encontrar la excepción en caso de error.

Además, monitorea estas métricas en CloudWatch:

*Reinicie la métrica:*
+ `numRestarts`: Supervise los reinicios inesperados: la actualización se realiza correctamente si el valor `numRestarts` es cero `uptime` o `runningTime` va en aumento.

*Métricas de puntos de control:*
+ `lastCheckpointDuration`: Debe ser similar a los valores anteriores a la actualización
+ `numberOfFailedCheckpoints`: Debe permanecer en 0

## Fase 6: Validar el comportamiento de la aplicación
<a name="upgrade-guide-phase-6"></a>

Una vez que la aplicación se ejecute en Flink 2.2:

**Validación funcional**
+ Compruebe que los datos se estén leyendo de las fuentes
+ Compruebe que los datos se estén escribiendo en los sumideros
+ Compruebe que la lógica empresarial produzca los resultados esperados
+ Compare los resultados con la línea base previa a la actualización

**Validación del rendimiento**
+ Supervise las métricas de latencia (tiempo end-to-end de procesamiento)
+ Supervise las métricas de rendimiento (registros por segundo)
+ Supervise la duración y el tamaño de los puntos de control
+ Supervise la utilización de la memoria y la CPU

**Funciona durante más de 24 horas**

Deje que la aplicación se ejecute durante al menos 24 horas en producción para garantizar que:
+ Sin pérdidas de memoria
+ Comportamiento estable de los puntos de control
+ Sin reinicios inesperados
+ Rendimiento constante

## Fase 7: procedimientos de reversión
<a name="upgrade-guide-phase-7"></a>

Si la actualización falla o la aplicación se está ejecutando pero no funciona correctamente, vuelva a la versión anterior.

**Reversión automática**

Si la reversión automática está habilitada y la actualización falla durante el inicio, Amazon Managed Service for Apache Flink vuelve automáticamente a la versión anterior.

**Reversión manual**

Si la aplicación se está ejecutando pero no funciona correctamente, usa la `RollbackApplication` API:

*Consola de administración de AWS:*

1. Navega hasta tu aplicación

1. Elija **Acciones** → **Revertir**

1. Confirme la reversión

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**Qué ocurre durante la reversión:**
+ La aplicación se detiene
+ El tiempo de ejecución vuelve a la versión anterior de Flink
+ El código de la aplicación vuelve al JAR anterior
+ La aplicación se reinicia desde la última instantánea realizada correctamente **antes de** la actualización

**importante**  
No se puede restaurar una instantánea de Flink 2.2 en Flink 1.x
Rollback utiliza la instantánea tomada antes de la actualización
Realice siempre una instantánea antes de actualizar (fase 3)

## Siguientes pasos
<a name="upgrade-guide-next-steps"></a>

Si tiene preguntas o problemas durante la actualización, consulte [Resolución de problemas de Managed Service para Apache Flink](troubleshooting.md) o póngase en contacto con AWS Support.

# Guía de compatibilidad estatal para las actualizaciones de Flink 2.2
<a name="state-compatibility"></a>

Al actualizar de Flink 1.x a Flink 2.2, los problemas de compatibilidad de estado pueden impedir que la aplicación se restaure a partir de instantáneas. Esta guía le ayuda a identificar posibles problemas de compatibilidad y proporciona estrategias de migración.

## Comprender los cambios de compatibilidad de estados
<a name="state-compat-understanding"></a>

Amazon Managed Service para Apache Flink 2.2 introduce varios cambios de serialización que afectan a la compatibilidad de estados. Los principales son los siguientes:
+ **Actualización de la versión Kryo**: Apache Flink 2.2 actualiza el serializador Kryo incluido de la versión 2 a la versión 5. Como Kryo v5 utiliza un formato de codificación binaria diferente al de Kryo v2, cualquier estado del operador que se haya serializado mediante Kryo en un punto de almacenamiento de Flink 1.x no se puede restaurar en Flink 2.2.
+ **Serialización de colecciones de Java**: en Flink 1.x, las colecciones de Java (como, y) incluidas en ellas se serializaban mediante Kryo. `HashMap` `ArrayList` `HashSet` POJOs Flink 2.2 presenta serializadores optimizados para colecciones específicas que son incompatibles con el estado krio-serializado de la versión 1.x. Las aplicaciones que utilizan colecciones Java con serializadores POJO o Kryo en la versión 1.x no pueden restaurar este estado en Flink 2.2. Consulte la [documentación](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) de Flink para obtener más información sobre los tipos de datos y la serialización.
+ **Compatibilidad con el conector Kinesis**: la versión del conector Kinesis Data Streams (KDS) anterior a la 5.0 mantiene un estado que no es compatible con la versión 6.0 del conector Kinesis Flink 2.2. Debe migrar a la versión 5.0 o superior del conector antes de realizar la actualización.

## Referencia de compatibilidad de serialización
<a name="state-compat-reference"></a>

Revise todas las declaraciones estatales de su solicitud y compare los tipos de serialización con la tabla siguiente. Si algún tipo de estado no es compatible, consulte la [Migración estatal](#state-compat-migration) sección antes de continuar con la actualización.


**Referencia de compatibilidad de serialización**  

| Tipo de serialización | ¿Compatible? | Details | 
| --- | --- | --- | 
| Avro (SpecificRecord,GenericRecord) | Sí | Utiliza su propio formato binario independiente de Kryo. Asegúrese de utilizar la información de tipo Avro nativa de Flink, no la de Avro registrada como un serializador Kryo. | 
| Protobuf | Sí | Utiliza su propia codificación binaria independiente de Kryo. Compruebe que los cambios en el esquema sigan las reglas de evolución compatibles con versiones anteriores. | 
| POJOs sin colecciones | Sí | Se gestiona mediante el serializador POJO de Flink, pero solo si la clase cumple con todos los criterios de POJO: clase pública, constructor público sin arg, todos los campos son públicos o accesibles a través de getters/setters y Flink puede serializar todos los tipos de campo a su vez. El POJO que infrinja alguno de estos requisitos recaerá silenciosamente en manos de Kryo y pasará a ser incompatible. | 
| Personalizado TypeSerializers | Sí | Compatible solo si su serializador no delega en Kryo internamente. | 
| Estado de las API SQL y Table | Sí (con una advertencia) | Utiliza los serializadores internos de Flink. Sin embargo, Apache Flink no garantiza la compatibilidad estatal entre las versiones principales de las aplicaciones de la API de tablas. Primero, se deben realizar pruebas en un entorno que no sea de producción. | 
| POJOs con colecciones de Java (HashMap,ArrayList,HashSet) | No | En Flink 1.x, las colecciones incluidas POJOs se serializaban mediante Kryo v2. Flink 2.2 presenta serializadores de colecciones específicos cuyo formato binario es incompatible con el formato Kryo v2. | 
| Clases de casos de Scala | No | Serializado mediante Kryo en Flink 1.x. La actualización de Kryo v2 a v5 cambia el formato binario. | 
| Registros de Java | No | Normalmente recurre a la serialización de Kryo en Flink 1.x. Verifique probando con. disableGenericTypes() | 
| Tipos de bibliotecas de terceros | No | Los tipos sin un serializador personalizado registrado recurren a Kryo. El cambio de formato binario de Kryo v2 a v5 rompe la compatibilidad. | 
| Cualquier tipo que utilice Kryo fallback | No | Si Flink no puede gestionar un tipo con un serializador incorporado o registrado, recurre a Kryo. Todos los estados serializados con Kryo de la versión 1.x son incompatibles con la versión 2.2. | 

## Métodos de diagnóstico
<a name="state-compat-diagnostics"></a>

Puedes identificar los problemas de compatibilidad entre estados de forma proactiva consultando los registros de las aplicaciones o inspeccionándolos después de la operación de la [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

**Identifique la alternativa de Kryo en su aplicación**

Puedes usar el siguiente patrón de expresiones regulares en tus registros para identificar el retroceso de Kryo en tu aplicación:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Registros de ejemplo:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

Si se produce un error en la actualización mediante la UpdateApplication API, las siguientes excepciones podrían indicar que se está produciendo una incompatibilidad de estado basada en el serializador:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Lista de comprobación previa a la actualización
<a name="state-compat-checklist"></a>
+ Revise todas las declaraciones estatales de su solicitud
+ Consulte POJOs con las colecciones (`HashMap`,`ArrayList`,`HashSet`)
+ Verifique los métodos de serialización para cada tipo de estado
+ Cree una aplicación de réplica de producción y pruebe la compatibilidad de los estados mediante la UpdateApplication API de esta réplica
+ Si el estado no es compatible, seleccione una estrategia entre [Migración estatal](#state-compat-migration)
+ Habilite la reversión automática en la configuración de su aplicación Flink de producción

## Migración estatal
<a name="state-compat-migration"></a>

**Reconstruir el estado completo**

Ideal para aplicaciones en las que el estado se puede reconstruir a partir de los datos de origen.

Si la aplicación puede reconstruir el estado a partir de los datos de origen:

1. Detenga la aplicación Flink 1.x

1. Actualice a Flink 2.x con el código actualizado

1. Comience con `SKIP_RESTORE_FROM_SNAPSHOT`

1. Permitir que la aplicación reconstruya el estado

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Prácticas recomendadas
<a name="state-compat-best-practices"></a>

1. **Utilice siempre Avro o Protobuf para estados complejos**: estos proporcionan la evolución del esquema y son independientes de las criptomonedas

1. **Evite las colecciones en POJOs: utilice la versión nativa de Flink y, en** su lugar `ListState` `MapState`

1. **Pruebe la restauración del estado de forma local**: antes de la actualización de producción, pruébela con instantáneas reales

1. **Realice instantáneas con frecuencia**, especialmente antes de actualizar las versiones principales

1. **Habilitar la reversión automática**: configure su aplicación MSF para que se revierta automáticamente en caso de error

1. **Documente sus tipos de estado**: mantenga la documentación de todos los tipos de estados y sus métodos de serialización

1. **Supervise el tamaño de los puntos de control: el aumento del tamaño** de los puntos de control puede indicar problemas de serialización

## Siguientes pasos
<a name="state-compat-next-steps"></a>

**Planifique su actualización**: consulte. [Actualización a Flink 2.2: guía completa](flink-2-2-upgrade-guide.md)

Si tiene preguntas o problemas durante la migración, consulte [Resolución de problemas de Managed Service para Apache Flink](troubleshooting.md) o póngase en contacto con AWS Support.

# Implementación del escalado de aplicaciones en Managed Service para Apache Flink
<a name="how-scaling"></a>

Puede configurar la ejecución paralela de las tareas y la asignación de recursos para que Amazon Managed Service para Apache Flink implemente la reducción horizontal. Para obtener más información sobre cómo Apache Flink programa instancias paralelas de las tareas, consulte [Parallel Execution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/) en la documentación de Apache Flink.

**Topics**
+ [Configurar el paralelismo de aplicaciones y la KPU ParallelismPer](#how-parallelism)
+ [Asignación de unidades de procesamiento de Kinesis](#how-scaling-kpus)
+ [Actualización del paralelismo de la aplicación](#how-scaling-howto)
+ [Uso del escalado automático en Managed Service para Apache Flink](how-scaling-auto.md)
+ [Consideraciones sobre maxParallelism](#how-scaling-auto-max-parallelism)

## Configurar el paralelismo de aplicaciones y la KPU ParallelismPer
<a name="how-parallelism"></a>

Para configurar la ejecución paralela de las tareas de la aplicación de Managed Service para Apache Flink (como leer de una fuente o ejecutar un operador), se utilizan las siguientes propiedades [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html): 
+ `Parallelism`: utilice esta propiedad para establecer el paralelismo predeterminado de la aplicación de Apache Flink. Todos los operadores, las fuentes y los receptores se ejecutan con este paralelismo, a menos que estén anulados en el código de la aplicación. El valor predeterminado es `1`, y el valor máximo es `256`.
+ `ParallelismPerKPU`: utilice esta propiedad para configurar el número de tareas paralelas que se pueden programar por unidad de procesamiento de Kinesis (KPU) de la aplicación. El valor predeterminado es `1` y el máximo es `8`. En el caso de las aplicaciones que tienen operaciones de bloqueo (por ejemplo, E/S), un valor más alto de `ParallelismPerKPU` implica la plena utilización de los recursos de KPU.

**nota**  
El límite de `Parallelism` es igual a `ParallelismPerKPU` multiplicado por el límite de KPUs (que tiene un valor predeterminado de 64). El KPUs límite se puede aumentar solicitando un aumento del límite. Para obtener instrucciones sobre cómo solicitar un aumento de este límite, consulte “Para solicitar un aumento del límite” en [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

Para obtener información sobre cómo configurar el paralelismo de tareas para un operador específico, consulte [Setting the Parallelism: Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) en la documentación de Apache Flink.

## Asignación de unidades de procesamiento de Kinesis
<a name="how-scaling-kpus"></a>

El servicio gestionado para Apache Flink aprovisiona capacidad como KPUs. Una sola KPU le proporciona 1 vCPU y 4 GB de memoria. Por cada KPU asignada, también se proporcionan 50 GB de almacenamiento para aplicaciones en ejecución. 

Managed Service for Apache Flink calcula las KPUs propiedades necesarias para ejecutar la aplicación mediante las `ParallelismPerKPU` propiedades `Parallelism` y, de la siguiente manera:

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Managed Service para Apache Flink proporciona rápidamente recursos a las aplicaciones en respuesta a los picos de rendimiento o de la actividad de procesamiento. Elimina los recursos de la aplicación de forma gradual una vez que ha pasado el pico de actividad. Para deshabilitar la asignación automática de recursos, defina el valor `AutoScalingEnabled` en `false`, como se describe más adelante en [Actualización del paralelismo de la aplicación](#how-scaling-howto). 

El límite predeterminado KPUs para su aplicación es 64. Para obtener instrucciones sobre cómo solicitar un aumento de este límite, consulte “Para solicitar un aumento del límite” en [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**nota**  
Se cobra una KPU adicional por motivos de orquestación. Para obtener más información, consulte [Precios de Managed Service para Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Actualización del paralelismo de la aplicación
<a name="how-scaling-howto"></a>

Esta sección contiene ejemplos de solicitudes de acciones de la API que establecen el paralelismo de una aplicación. Para ver más ejemplos e instrucciones sobre cómo usar los bloques de solicitudes con las acciones de la API, consulte [Ejemplo de código de API de Managed Service para Apache Flink](api-examples.md).

El siguiente ejemplo de solicitud de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) establece el paralelismo al crear una aplicación:

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

El siguiente ejemplo de solicitud de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) establece el paralelismo para una aplicación existente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

El siguiente ejemplo de solicitud de la acción [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) deshabilita el paralelismo para una aplicación existente:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Uso del escalado automático en Managed Service para Apache Flink
<a name="how-scaling-auto"></a>

Managed Service para Apache Flink escala elásticamente el paralelismo de la aplicación para adaptarse al rendimiento de datos de su fuente y a la complejidad del operador en la mayoría de los escenarios. El escalado automático está habilitado de forma predeterminada. Managed Service para Apache Flink supervisa el uso de recursos (CPU) de la aplicación y amplía o reduce el paralelismo de la aplicación elásticamente en consecuencia:
+ La aplicación se amplía (aumenta el paralelismo) si el máximo CloudWatch métrico `containerCPUUtilization` es superior al 75 por ciento o superior durante 15 minutos. Esto significa que la acción de `ScaleUp` se activa cuando hay 15 puntos de datos consecutivos con un período de 1 minuto iguales o superiores al 75 por ciento. Una acción de `ScaleUp` duplica el `CurrentParallelism` de su aplicación. `ParallelismPerKPU` no se modifica. Como consecuencia, el número de asignadas KPUs también se duplica. 
+ La aplicación se reduce verticalmente (reduce el paralelismo) cuando el uso de la CPU permanece por debajo del 10 por ciento durante seis horas. Esto significa que la acción de `ScaleDown` se activa cuando hay 360 puntos de datos consecutivos con un período de 1 minuto inferiores al 10 por ciento. Una `ScaleDown` acción reduce a la mitad (redondeado hacia arriba) el paralelismo de la aplicación. `ParallelismPerKPU`no se modifica y el número de asignadas KPUs también se reduce a la mitad (redondeado al alza). 

**nota**  
Se puede hacer referencia a un período máximo de `containerCPUUtilization` de más de 1 minuto para encontrar la correlación con un punto de datos utilizado para la acción de escalado, pero no es necesario reflejar el momento exacto en que se inicia la acción.

Managed Service para Apache Flink no reducirá el valor `CurrentParallelism` de la aplicación a un valor inferior al de la configuración `Parallelism` de la aplicación.

Cuando el servicio de Managed Service para Apache Flink escale su aplicación, aparecerá en estado `AUTOSCALING`. Puede comprobar el estado actual de su solicitud mediante las [ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)acciones [ DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html)o. Mientras el servicio escala tu aplicación, la única acción válida de la API que puedes usar es [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)establecer el `Force` parámetro en`true`.

Puede usar la propiedad `AutoScalingEnabled` (parte de [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)) para habilitar o deshabilitar el comportamiento de escalado automático. Se le cobrará a su AWS cuenta las prestaciones del servicio gestionado por Apache Flink, que depende de la `parallelismPerKPU` configuración `parallelism` y de la aplicación. KPUs Si se produce un pico de actividad, ello aumentará los costos de Managed Service para Apache Flink.

Para obtener más información sobre precios, consulte [Precios de Amazon Managed Service para Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

Tenga en cuenta lo siguiente en relación con el escalado de la aplicación:
+ El escalado automático está habilitado de forma predeterminada.
+ El escalado no se aplica a los cuadernos de Studio. Sin embargo, si implementa un cuaderno de Studio como una aplicación de estado perdurable, el escalado se aplicará a la aplicación implementada.
+ Su aplicación tiene un límite predeterminado de 64 KPUs. Para obtener más información, consulte [Cuota de cuadernos de Managed Service para Apache Flink y cuadernos de Studio](limits.md).
+ Cuando el escalado automático actualiza el paralelismo de la aplicación, la aplicación sufre un tiempo de inactividad. Para evitar este tiempo de inactividad, haga lo siguiente:
  + Deshabilite el escalado automático.
  + Configura el `parallelism` y `parallelismPerKPU` con la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción de tu aplicación. Para obtener más información sobre cómo realizar la configuración de paralelismo de la aplicación, consulte [Actualización del paralelismo de la aplicación](how-scaling.md#how-scaling-howto).
  + Supervise periódicamente el uso de los recursos de la aplicación para comprobar que la aplicación tenga una configuración de paralelismo correcta adecuada para su carga de trabajo. Para obtener información sobre supervisión del uso de recursos de asignación, consulte [Métricas y dimensiones en Managed Service para Apache Flink](metrics-dimensions.md).

## Implementación del escalado automático personalizado
<a name="how-scaling-custom-autoscaling"></a>

Si quiere tener un control más preciso del escalado automático o utilizar otras métricas de activación que no sean `containerCPUUtilization`, se puede utilizar este ejemplo: 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  Este ejemplo ilustra cómo escalar la aplicación Managed Service for Apache Flink utilizando una CloudWatch métrica diferente de la aplicación Apache Flink, incluidas las métricas de Amazon MSK y Amazon Kinesis Data Streams, utilizadas como fuentes o receptáculos.

Para obtener información adicional, consulte [Enhanced monitoring and automatic scaling for Apache Flink](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/)

## Implementación del escalado automático programado
<a name="how-scaling-scheduled-autoscaling"></a>

Si su carga de trabajo sigue un perfil predecible a lo largo del tiempo, tal vez prefiera escalar la aplicación Apache Flink de forma preventiva. Esto escala la aplicación a una hora programada, en lugar de escalarla de forma reactiva en función de una métrica. Para configurar el escalado ascendente y descendente a horas fijas del día, puede usar este ejemplo:
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## Consideraciones sobre maxParallelism
<a name="how-scaling-auto-max-parallelism"></a>

El paralelismo máximo que puede escalar una tarea de Flink está limitado por el `maxParallelism` *mínimo* en todos los operadores de la tarea. Por ejemplo, en caso de un trabajo simple con solo un origen y un receptor, y el origen tiene un `maxParallelism` de 16 y el receptor tiene 8, la aplicación no podrá escalar más allá del paralelismo de 8.

Para saber cómo se calcula el valor predeterminado `maxParallelism` de un operador y cómo anularlo, consulte [Setting the Maximum Parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) en la documentación de Apache Flink.

Como regla básica, tenga en cuenta que si no define un `maxParallelism` para ningún operador e inicia la aplicación con un paralelismo menor o igual a 128, todos los operadores tendrán un `maxParallelism` de 128.

**nota**  
El paralelismo máximo del trabajo es el límite superior del paralelismo para escalar la aplicación y retener el estado.   
Si se modifica el `maxParallelism` de una aplicación existente, la aplicación no podrá reiniciarse a partir de una instantánea anterior tomada con el antiguo `maxParallelism`. Solo puede reiniciar la aplicación sin una instantánea.   
Si planea escalar la aplicación a un paralelismo superior a 128, debe configurar de forma explícita el `maxParallelism` en la aplicación.
+ La lógica del escalado automático evitará escalar una tarea de Flink a un paralelismo que supere el paralelismo máximo de la tarea.
+ Si se utiliza un escalado automático personalizado o un escalado programado, configúrelos para que no superen el paralelismo máximo del trabajo.
+ Si escala de manera manual la aplicación más allá del paralelismo máximo, la aplicación no se iniciará.

# Agregación de etiquetas a aplicaciones Managed Service para Apache Flink
<a name="how-tagging"></a>



En esta sección, se describe cómo añadir etiquetas de metadatos de clave-valor a las aplicaciones de Managed Service para Apache Flink. Estas etiquetas se pueden utilizar para lo siguiente:
+ Determinación de la facturación para aplicaciones individuales de Managed Service para Apache Flink Para obtener más información, consulte [Using Cost Allocation Tags](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) en la *Guía de administración de costos y facturación*.
+ Controlar el acceso a los recursos de la aplicación en función de las etiquetas. Para obtener más información, consulte [Controlling Access Using Tags](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) en la *Guía del usuario de AWS Identity and Access Management *.
+ Fines definidos por el usuario. Se puede definir la funcionalidad de la aplicación en función de la presencia de etiquetas del usuario.

Tenga en cuenta la siguiente información sobre el etiquetado:
+ El número máximo de etiquetas de la aplicación incluye las etiquetas del sistema. El número máximo de etiquetas de la aplicación definidas por el usuario es 50.
+ Si una acción incluye una lista de etiquetas que tiene valores `Key` duplicados, el servicio genera una `InvalidArgumentException`.

**Topics**
+ [Agregación de etiquetas al crear una aplicación](how-tagging-create.md)
+ [Agregación o actualización de etiquetas para una aplicación existente](how-tagging-add.md)
+ [Muestra de las etiquetas de una aplicación](how-tagging-list.md)
+ [Eliminación de etiquetas de una aplicación](how-tagging-remove.md)

# Agregación de etiquetas al crear una aplicación
<a name="how-tagging-create"></a>

Las etiquetas se añaden al crear una aplicación mediante el `tags` parámetro de la [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)acción.

En la siguiente solicitud de ejemplo, se muestra el nodo `Tags` de una solicitud `CreateApplication`:

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Agregación o actualización de etiquetas para una aplicación existente
<a name="how-tagging-add"></a>

Las etiquetas se añaden a una aplicación mediante la [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html)acción. No puede añadir etiquetas a una aplicación mediante la [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)acción.

Para actualizar una etiqueta existente, añada otra etiqueta con la misma clave.

En la siguiente solicitud de ejemplo de la acción `TagResource`, se añaden nuevas etiquetas o se actualizan las etiquetas existentes:

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# Muestra de las etiquetas de una aplicación
<a name="how-tagging-list"></a>

Para enumerar las etiquetas existentes, utilice la [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html)acción.

En la siguiente solicitud de ejemplo de la acción `ListTagsForResource`, se muestran las etiquetas de una aplicación:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Eliminación de etiquetas de una aplicación
<a name="how-tagging-remove"></a>

Para eliminar etiquetas de una aplicación, utilice la [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html)acción.

En la siguiente solicitud de ejemplo de la acción `UntagResource`, se eliminan etiquetas de una aplicación:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Uso CloudFormation con el servicio gestionado para Apache Flink
<a name="lambda-cfn-flink"></a>

El siguiente ejercicio muestra cómo iniciar una aplicación Flink creada CloudFormation con una función Lambda en la misma pila. 

## Antes de empezar
<a name="before-you-begin"></a>

Antes de comenzar este ejercicio, siga los pasos para crear una aplicación Flink con at. CloudFormation [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html)

## Escribir una función de Lambda
<a name="write-lambda-function"></a>

Para iniciar una aplicación de Flink después de crearla o actualizarla, utilizamos la API kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html). La llamada se activará mediante un CloudFormation evento después de la creación de la aplicación Flink. Explicaremos cómo configurar la pila para activar la función de Lambda más adelante en este ejercicio, pero primero nos centraremos en la declaración de la función de Lambda y su código. En este ejemplo, utilizamos el tiempo de ejecución de `Python3.8`. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

En el código anterior, Lambda procesa los CloudFormation eventos entrantes, filtra todo lo demás `Create` y `Update` obtiene el estado de la aplicación y la inicia si el estado es. `READY` Para obtener el estado de la aplicación, debe crear el rol de Lambda, como se muestra a continuación:

## Creación de un rol de Lambda
<a name="create-lambda-role"></a>

Deberá crear un rol para que Lambda “hable” correctamente con la aplicación y escriba registros. Este rol usa políticas administradas predeterminadas, pero es posible que desee restringirlo usando políticas personalizadas.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Tenga en cuenta que los recursos de Lambda se crearán después de crear la aplicación Flink en la misma pila, ya que dependen de ella.

## Cómo invocar la función de Lambda
<a name="invoking-lambda-function"></a>

Ahora solo queda invocar la función de Lambda. Esto se hace mediante un [recurso personalizado](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html).

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

Esto es todo lo que necesita para iniciar la aplicación Flink con Lambda. Ahora puede crear su propia pila o utilizar el ejemplo completo que aparece a continuación para ver cómo funcionan todos esos pasos en la práctica.

## Revisión de un ejemplo extendido
<a name="lambda-cfn-flink-full-example"></a>

El siguiente ejemplo es una versión ligeramente ampliada de los pasos anteriores con un ajuste `RunConfiguration` adicional realizado mediante los [parámetros de la plantilla](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). Esta es una pila funcional para que la pruebe. Asegúrese de leer las notas adjuntas: 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Una vez más, es posible que desee ajustar los roles de Lambda y de la propia aplicación.

Antes de crear la pila anterior, no olvide especificar los parámetros.

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Reemplace `YOUR_BUCKET_ARN` y `YOUR_JAR` con sus requerimientos específicos. Se puede seguir esta [guía](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) para crear un bucket de Amazon S3 y un jar de aplicación.

Ahora cree la pila (sustituya YOUR\$1REGION por la región que prefiera, por ejemplo, us-east-1):

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

Ahora puede ir a [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) y ver el progreso. Una vez creada, debería ver su aplicación de Flink en el estado `Starting`. Se puede que tarde algunos minutos en comenzar a `Running`. 

Para obtener más información, consulte los siguientes temas:
+ [Cuatro formas de recuperar cualquier propiedad AWS de un servicio mediante AWS CloudFormation (parte 1 de 3](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/)).
+ [Tutorial: Búsqueda de imágenes IDs de Amazon Machine](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html).

# Uso del panel de control de Apache Flink con Managed Service para Apache Flink
<a name="how-dashboard"></a>

Se puede utilizar el panel de control Apache Flink de su aplicación para supervisar el estado de la aplicación Managed Service para Apache Flink. El panel de control de su aplicación muestra la siguiente información:
+ Recursos en uso, incluidos los administradores de tareas y los espacios de tareas. 
+ Información sobre los trabajos, incluidos los que están en ejecución, los que se han completado, los cancelados y los que han fallado. 

Para obtener información sobre los administradores de tareas, los espacios de tareas y los trabajos de Apache Flink, consulte [Apache Flink Architecture](https://flink.apache.org/what-is-flink/flink-architecture/) en el sitio web de Apache Flink. 

Note lo siguiente sobre el uso del panel de control de Apache Flink con Managed Service para Apache Flink
+ El panel de control de Apache Flink para Managed Service para Apache Flink es de solo lectura. No puede realizar cambios a Managed Service para Apache Flink usando el panel de control de Apache Flink.
+ El panel de control de Apache Flink no es compatible con Microsoft Internet Explorer.

## Acceso al panel de control de Apache Flink de su aplicación
<a name="how-dashboard-accessing"></a>

Se puede acceder al panel de control de Apache Flink de su aplicación a través de la consola de Managed Service para Apache Flink o solicitando un punto de conexión de URL seguro mediante la CLI.

### Acceso al panel de control de Apache Flink con la consola de Managed Service para Apache Flink.
<a name="how-dashboard-accessing-console"></a>

Para acceder al panel de control de Apache Flink de su aplicación desde la consola, seleccione el **panel de control de Apache Flink** en la página de su aplicación.

**nota**  
Al abrir el panel desde la consola de Managed Service para Apache Flink, la URL que genere la consola será válida durante 12 horas.

### Acceso al panel de control de Apache Flink con la CLI de Managed Service para Apache Flink
<a name="how-dashboard-accessing-cli"></a>

Se puede utilizar la CLI de Managed Service para Apache Flink para generar una URL que le permita acceder al panel de control de la aplicación. La URL que genere es válida durante un período especificado.

**nota**  
Si no accede a la URL generada en tres minutos, dejará de ser válida.

La URL del panel se genera mediante la [ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html)acción. Se puede especificar los siguientes valores para la acción: 
+ El nombre de la aplicación.
+ El tiempo en segundos durante el que la URL será válida
+ Se especifica `FLINK_DASHBOARD_URL` como tipo de URL.