

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Service géré pour Apache Flink : comment ça marche
<a name="how-it-works"></a>

Le service géré pour Apache Flink est un service Amazon entièrement géré qui vous permet d'utiliser une application Apache Flink pour traiter des données de streaming. Vous programmez d'abord votre application Apache Flink, puis vous créez votre application Managed Service for Apache Flink.

## Programmez votre application Apache Flink
<a name="how-it-works-programming"></a>

Une application Apache Flink est une application Java ou Scala créée avec l’environnement Apache Flink. Vous créez votre application Apache Flink localement. 

Les applications utilisent principalement l'[DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) ou l'[API Table](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/). Les autres Apache Flink APIs sont également disponibles, mais ils sont moins couramment utilisés pour créer des applications de streaming.

Les caractéristiques des deux APIs sont les suivantes :

### DataStream API
<a name="how-it-works-prog-datastream"></a>

Le modèle de programmation de DataStream l'API Apache Flink repose sur deux composants :
+ **Flux de données :** représentation structurée d’un flux continu d’enregistrements de données.
+ **Opérateur de transformation :** prend un ou plusieurs flux de données en entrée et produit un ou plusieurs flux de données en sortie.

Les applications créées à l'aide de DataStream l'API effectuent les opérations suivantes :
+ Lire les données d’une source de données (telle qu’un flux Kinesis ou une rubrique Amazon MSK).
+ Appliquer des transformations aux données, telles que le filtrage, l’agrégation ou l’enrichissement.
+ Écrire les données transformées dans un récepteur de données.

Les applications qui utilisent l' DataStream API peuvent être écrites en Java ou en Scala, et peuvent être lues à partir d'un flux de données Kinesis, d'une rubrique Amazon MSK ou d'une source personnalisée.

Votre application traite les données à l’aide d’un *connecteur*. Apache Flink utilise les types de connecteurs suivants : 
+ **Source** : connecteur utilisé pour lire des données externes.
+ **Récepteur** : connecteur utilisé pour écrire sur des emplacements externes. 
+ **Opérateur** : connecteur utilisé pour traiter les données au sein de l’application.

Une application classique comprend au moins un flux de données avec une source, un flux de données avec un ou plusieurs opérateurs et au moins un récepteur de données.

Pour plus d'informations sur l'utilisation de l' DataStream API, consultez[Vérifier les composants de DataStream l'API](how-datastream.md).

### API de table
<a name="how-it-works-prog-table"></a>

Le modèle de programmation de l’API de table Apache Flink repose sur deux composants :
+ **Environnement de table :** interface permettant d’accéder aux données sous-jacentes que vous utilisez pour créer et héberger une ou plusieurs tables. 
+ **Table :** objet donnant accès à une table ou à une vue SQL.
+ **Source de table :** utilisée pour lire des données provenant d’une source externe, telle qu’une rubrique Amazon MSK.
+ **Fonction de table :** requête SQL ou appel d’API utilisé pour transformer des données.
+ **Récepteur de table :** utilisé pour écrire des données dans un emplacement externe, tel qu’un compartiment Amazon S3.

Les applications créées avec l’API de table effectuent les opérations suivantes :
+ Créer un `TableEnvironment` en vous connectant à une `Table Source`. 
+ Créer une table dans l’`TableEnvironment` à l’aide de requêtes SQL ou de fonctions de l’API de table.
+ Exécuter une requête sur la table à l’aide de l’API de table ou de SQL.
+ Appliquer des transformations aux résultats de la requête à l’aide de fonctions de table ou de requêtes SQL.
+ Écrire les résultats de la requête ou de la fonction dans un `Table Sink`.

Les applications qui utilisent l’API de table peuvent être écrites en Java ou en Scala et peuvent interroger des données à l’aide d’appels d’API ou de requêtes SQL. 

Pour plus d’informations sur l’utilisation de l’API de table, consultez [Composants de l'API Review Table](how-table.md).

## Créez votre service géré pour l'application Apache Flink
<a name="how-it-works-app"></a>

Le service géré pour Apache Flink est un AWS service qui crée un environnement pour héberger votre application Apache Flink et lui fournit les paramètres suivants :
+ **[Utiliser les propriétés d'exécution](how-properties.md) :** paramètres que vous pouvez fournir à votre application. Vous pouvez modifier ces paramètres sans recompiler le code de votre application.
+ **[Mettre en œuvre la tolérance aux pannes](how-fault.md)** : comment votre application se rétablit après une interruption ou un redémarrage.
+ **[Journalisation et surveillance dans Amazon Managed Service pour Apache Flink](monitoring-overview.md)**: la façon dont votre application enregistre les événements dans CloudWatch Logs. 
+ **[Mettre en œuvre le dimensionnement des applications](how-scaling.md)** : comment votre application provisionne les ressources informatiques.

Vous pouvez créer votre application de service géré pour Apache Flink à l’aide de la console ou de l’interface AWS CLI. Pour commencer à créer une application de service géré pour Apache Flink, consultez [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

# Création d'un service géré pour l'application Apache Flink
<a name="how-creating-apps"></a>

Cette rubrique contient des informations sur la création d'un service géré pour l'application Apache Flink.

**Topics**
+ [Créez votre service géré pour le code d'application Apache Flink](#how-creating-apps-building)
+ [Créez votre service géré pour l'application Apache Flink](#how-creating-apps-creating)
+ [Utiliser des clés gérées par le client](#how-creating-apps-use-cmk)
+ [Démarrez votre application Managed Service for Apache Flink](#how-creating-apps-starting)
+ [Vérifiez votre service géré pour l'application Apache Flink](#how-creating-apps-verifying)
+ [Activez les annulations du système pour votre application Managed Service for Apache Flink](how-system-rollbacks.md)

## Créez votre service géré pour le code d'application Apache Flink
<a name="how-creating-apps-building"></a>

Cette section décrit les composants que vous utilisez pour créer le code d'application de votre application Managed Service for Apache Flink. 

Nous vous recommandons d’utiliser la dernière version prise en charge d’Apache Flink pour le code de votre application. Pour obtenir des informations sur la mise à niveau de l’application de service géré pour Apache Flink, consultez [Utiliser des mises à niveau de version sur place pour Apache Flink](how-in-place-version-upgrades.md). 

Vous créez le code de votre application à l’aide d’[Apache Maven](https://maven.apache.org/). Un projet Apache Maven utilise un fichier `pom.xml` pour spécifier les versions des composants qu’il utilise. 

**Note**  
Le service géré pour Apache Flink prend en charge les fichiers JAR d’une taille maximale de 512 Mo. Si vous utilisez un fichier JAR plus volumineux, votre application ne démarrera pas.

Les applications peuvent désormais utiliser l’API Java depuis n’importe quelle version de Scala. Vous devez regrouper la bibliothèque standard Scala de votre choix dans vos applications Scala.

Pour obtenir des informations sur la création d’une application de service géré pour Apache Flink utilisant **Apache Beam**, consultez [Utiliser Apache Beam avec un service géré pour les applications Apache Flink](how-creating-apps-beam.md).

### Spécifiez la version d'Apache Flink de votre application
<a name="how-creating-apps-building-flink"></a>

Lorsque vous utilisez l’exécution de service géré pour Apache Flink version 1.1.0 ou ultérieure, vous spécifiez la version d’Apache Flink utilisée par votre application lorsque vous compilez votre application. Vous fournissez la version d'Apache Flink avec le `-Dflink.version` paramètre. Par exemple, si vous utilisez Apache Flink 2.2.0, fournissez les informations suivantes :

```
mvn package -Dflink.version=2.2.0
```

Pour créer des applications avec des versions antérieures d'Apache Flink, consultez[Versions antérieures](earlier.md).

## Créez votre service géré pour l'application Apache Flink
<a name="how-creating-apps-creating"></a>

Après avoir créé le code de votre application, procédez comme suit pour créer votre application Managed Service for Apache Flink (Amazon MSF) :
+ **Charger votre code d’application** : chargez votre code d’application sur un compartiment Amazon S3. Vous spécifiez le nom du compartiment S3 et le nom d’objet du code d’application lorsque vous créez votre application. Pour un didacticiel expliquant comment télécharger le code de votre application, consultez le [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md) didacticiel.
+ **Créez votre application Managed Service for Apache Flink** : utilisez l'une des méthodes suivantes pour créer votre application Amazon MSF :
**Note**  
Amazon MSF chiffre votre application par défaut à l'aide de. Clés détenues par AWS Vous pouvez également créer votre nouvelle application à l'aide de clés gérées par le AWS KMS client (CMKs) pour créer, posséder et gérer vous-même vos clés. Pour plus d'informations sur CMKs, voir[Gestion des clés dans Amazon Managed Service pour Apache Flink](key-management-flink.md).
  + **Créez votre application Amazon MSF à l'aide de la AWS console :** vous pouvez créer et configurer votre application à l'aide de la AWS console. 

    Lorsque vous créez votre application à l'aide de la console, les ressources dépendantes de votre application (telles que CloudWatch les flux de journaux, les rôles IAM et les politiques IAM) sont créées pour vous. 

    Lorsque vous créez votre application à l’aide de la console, vous spécifiez la version d’Apache Flink utilisée par votre application en la sélectionnant dans le menu déroulant de la page **Service géré pour Apache Flink - Créer une application**. 

    Pour un didacticiel sur l'utilisation de la console pour créer une application, consultez le [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md) didacticiel.
  + **Créez votre application Amazon MSF à l'aide de la AWS CLI :** vous pouvez créer et configurer votre application à l'aide de la AWS CLI. 

    Lorsque vous créez votre application à l'aide de la CLI, vous devez également créer les ressources dépendantes de votre application (telles que CloudWatch les flux de journaux, les rôles IAM et les politiques IAM) manuellement.

    Lorsque vous créez votre application à l’aide de l’interface CLI, vous spécifiez la version d’Apache Flink utilisée par votre application en utilisant le paramètre `RuntimeEnvironment` de l’action `CreateApplication`.
**Note**  
Vous pouvez modifier `RuntimeEnvironment` une application existante. Pour savoir comment procéder, consultez [Utiliser des mises à niveau de version sur place pour Apache Flink](how-in-place-version-upgrades.md).

## Utiliser des clés gérées par le client
<a name="how-creating-apps-use-cmk"></a>

Dans Amazon MSF, les clés gérées par le client (CMKs) sont une fonctionnalité qui vous permet de chiffrer les données de votre application à l'aide d'une clé que vous créez, détenez et gérez AWS Key Management Service ()AWS KMS. Pour une application Amazon MSF, cela signifie que toutes les données soumises à un [point de contrôle](how-fault.md) ou à un [instantané](how-snapshots.md) Flink sont chiffrées avec une clé CMK que vous définissez pour cette application.

Pour utiliser la clé CMK avec votre application, vous devez d'abord [créer votre nouvelle application](#how-creating-apps-creating), puis appliquer une clé CMK. Pour plus d'informations sur l'utilisation CMKs, consultez[Gestion des clés dans Amazon Managed Service pour Apache Flink](key-management-flink.md).

## Démarrez votre application Managed Service for Apache Flink
<a name="how-creating-apps-starting"></a>

Après avoir créé le code de votre application, l’avoir chargé dans S3 et créé votre application de service géré pour Apache Flink, vous pouvez démarrer votre application. Le démarrage d’une application de service géré pour Apache Flink prend généralement plusieurs minutes.

Utilisez l’une des méthodes suivantes pour démarrer votre application :
+ **Démarrez votre application Managed Service for Apache Flink à l'aide de la AWS console :** vous pouvez exécuter votre application en choisissant **Exécuter** sur la page de votre application dans la AWS console.
+ **Démarrez votre application Managed Service for Apache Flink à l'aide de l' AWS API :** vous pouvez exécuter votre application à l'aide de l'[StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)action. 

## Vérifiez votre service géré pour l'application Apache Flink
<a name="how-creating-apps-verifying"></a>

Vous pouvez vérifier que votre application fonctionne de la manière suivante :
+ **Utilisation CloudWatch des journaux :** vous pouvez utiliser CloudWatch Logs et CloudWatch Logs Insights pour vérifier que votre application fonctionne correctement. Pour plus d'informations sur l'utilisation CloudWatch des journaux avec votre application Managed Service for Apache Flink, consultez[Journalisation et surveillance dans Amazon Managed Service pour Apache Flink](monitoring-overview.md).
+ **Utilisation CloudWatch des métriques :** vous pouvez utiliser CloudWatch les métriques pour surveiller l'activité de votre application, ou l'activité des ressources qu'elle utilise pour les entrées ou les sorties (telles que les flux Kinesis, les flux Firehose ou les compartiments Amazon S3). Pour plus d'informations sur CloudWatch les métriques, consultez [Working with Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) dans le guide de CloudWatch l'utilisateur Amazon.
+ **Surveillance des emplacements de sortie :** si votre application écrit la sortie vers un emplacement (tel qu’un compartiment ou une base de données Amazon S3), vous pouvez surveiller cet emplacement pour détecter les données écrites.

# Activez les annulations du système pour votre application Managed Service for Apache Flink
<a name="how-system-rollbacks"></a>

Grâce à la fonctionnalité de restauration du système, vous pouvez améliorer la disponibilité de votre application Apache Flink en cours d'exécution sur Amazon Managed Service pour Apache Flink. Le fait d'opter pour cette configuration permet au service de rétablir automatiquement la version précédente de l'application lorsqu'une action telle que des bogues de code `UpdateApplication` ou de configuration `autoscaling` se heurte à de tels bogues.

**Note**  
Pour utiliser la fonction de restauration du système, vous devez vous y inscrire en mettant à jour votre application. Les applications existantes n'utiliseront pas automatiquement la restauration du système par défaut.

## Comment ça marche
<a name="how-rollback-works"></a>

Lorsque vous lancez une opération d'application, telle qu'une action de mise à jour ou de dimensionnement, Amazon Managed Service pour Apache Flink tente d'abord d'exécuter cette opération. S'il détecte des problèmes empêchant le succès de l'opération, tels que des bogues de code ou des autorisations insuffisantes, le service lance automatiquement une `RollbackApplication` opération.

L'annulation tente de restaurer la version précédente de l'application qui s'est exécutée avec succès, ainsi que l'état de l'application associé. Si la restauration est réussie, votre application continue de traiter les données avec un temps d'arrêt minimal en utilisant la version précédente. Si la restauration automatique échoue également, Amazon Managed Service pour Apache Flink fait passer l'application au `READY` statut, afin que vous puissiez prendre d'autres mesures, notamment corriger l'erreur et réessayer l'opération. 

Vous devez choisir d'utiliser les annulations automatiques du système. Vous pouvez l'activer à l'aide de la console ou de l'API pour toutes les opérations sur votre application à partir de maintenant. 

L'exemple de demande `UpdateApplication` d'action suivant permet d'annuler le système pour une application :

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Passez en revue les scénarios courants de restauration automatique du système
<a name="common-scenarios"></a>

Les scénarios suivants illustrent les avantages des annulations automatiques du système :
+ **Mises à jour de l'application :** si vous mettez à jour votre application avec un nouveau code comportant des bogues lors de l'initialisation de la tâche Flink par le biais de la méthode principale, le rollback automatique permet de restaurer la version fonctionnelle précédente. Parmi les autres scénarios de mise à jour dans lesquels les annulations du système sont utiles, citons :
  + [Si votre application est mise à jour pour fonctionner avec un parallélisme supérieur à MaxParallelism.](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto)
  + Si votre application est mise à jour pour s'exécuter avec des sous-réseaux incorrects pour une application VPC, cela entraîne un échec lors du démarrage de la tâche Flink. 
+ **Mises à niveau de la version de Flink :** lorsque vous effectuez une mise à niveau vers une nouvelle version d'Apache Flink et que l'application mise à niveau rencontre un problème de compatibilité avec les snapshots, la restauration du système vous permet de revenir automatiquement à la version précédente de Flink. 
+ **AutoScaling:** Lorsque l'application prend de l'ampleur mais rencontre des problèmes lors de la restauration à partir d'un point de sauvegarde, en raison d'un décalage entre l'opérateur et le graphe de tâches Flink.

## Utiliser l'opération APIs pour les annulations du système
<a name="operation-apis"></a>

Pour offrir une meilleure visibilité, Amazon Managed Service pour Apache Flink propose deux services APIs liés aux opérations des applications qui peuvent vous aider à suivre les défaillances et les annulations de système associées.

`ListApplicationOperations`

Cette API répertorie toutes les opérations effectuées sur l'application, y compris`UpdateApplication`,`Maintenance`, et les autres`RollbackApplication`, dans l'ordre chronologique inverse. L'exemple de demande `ListApplicationOperations` d'action suivant répertorie les 10 premières opérations d'application pour l'application :

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

L'exemple de demande d'`ListApplicationOperations`aide suivant permet de filtrer la liste en fonction des mises à jour précédentes de l'application :

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

Cette API fournit des informations détaillées sur une opération spécifique répertoriée par`ListApplicationOperations`, y compris la raison de l'échec, le cas échéant. L'exemple de demande `DescribeApplicationOperation` d'action suivant répertorie les détails d'une opération d'application spécifique :

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

Pour plus d’informations sur le dépannage, consultez [Meilleures pratiques en matière de restauration du système](troubleshooting-system-rollback.md).

# Exécuter un service géré pour l'application Apache Flink
<a name="how-running-apps"></a>

Cette rubrique contient des informations sur l’exécution d’un service géré pour Apache Flink.

Lorsque vous exécutez votre application de service géré pour Apache Flink, le service crée une tâche Apache Flink. Une tâche Apache Flink correspond au cycle de vie d’exécution de votre application de service géré pour Apache Flink. L’exécution de la tâche et les ressources qu’elle utilise sont gérées par le gestionnaire de tâches. Le gestionnaire de tâches divise l’exécution de l’application en tâches. Chaque tâche est gérée par un gestionnaire de tâches. Lorsque vous surveillez les performances de votre application, vous pouvez examiner les performances de chaque gestionnaire de tâches ou du gestionnaire de tâches global. 

Pour plus d'informations sur les tâches Apache Flink, consultez la section [Tâches et planification](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) dans la documentation d'Apache Flink.

## Identifier la candidature et le statut du poste
<a name="how-running-job-status"></a>

Votre application et la tâche de l’application ont tous deux un état d’exécution actuel :
+ **État de l’application :** l’état actuel de votre application décrit sa phase d’exécution. Les états de l’application sont les suivants :
  + **États d’application stables :** votre application conserve généralement ces états jusqu’à ce que vous modifiiez son état :
    + **PRÊT :** une application nouvelle ou arrêtée affiche l’état PRÊT jusqu’à ce que vous l’exécutiez.
    + **EN COURS D’EXÉCUTION :** une application démarrée avec succès est en cours d’exécution.
  + **États d’application transitoires :** une application présentant ces états est généralement en train de passer à un autre état. Si une application reste dans un état transitoire pendant un certain temps, vous pouvez arrêter l'application à l'aide de l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)action dont le `Force` paramètre est défini sur. `true` Ces états incluent les éléments suivants :
    + `STARTING:`Survient après l'[StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)action. L’application est en train de passer de l’état `READY` à l’état `RUNNING`.
    + `STOPPING:`Survient après l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)action. L’application est en train de passer de l’état `RUNNING` à l’état `READY`.
    + `DELETING:`Survient après l'[DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)action. L’application est en cours de suppression.
    + `UPDATING:`Survient après l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action. L’application est en cours de mise à jour et va revenir à l’état `RUNNING` ou `READY`.
    + `AUTOSCALING:`L'application possède la `AutoScalingEnabled` propriété [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)set to`true`, et le service augmente le parallélisme de l'application. Lorsque l'application est dans cet état, la seule action d'API valide que vous pouvez utiliser est [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)celle dont le `Force` paramètre est défini sur`true`. Pour obtenir des informations sur la mise à l’échelle automatique, consultez [Utiliser le dimensionnement automatique dans Managed Service pour Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:`Survient après l'appel de l'[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)action avec le `Force` paramètre défini sur`true`. Un arrêt forcé de l’application est en cours. L’application est en train de passer de l’état `STARTING`, `UPDATING`, `STOPPING`, ou `AUTOSCALING` à l’état `READY`.
    + `ROLLING_BACK:`Survient après l'appel de [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)l'action. L’application est en train de revenir à une version précédente. L’application est en train de passer de l’état `UPDATING` ou `AUTOSCALING` à l’état `RUNNING`.
    + `MAINTENANCE:` survient lorsque le service géré pour Apache Flink applique des correctifs à votre application. Pour de plus amples informations, veuillez consulter [Gestion des tâches de maintenance pour le service géré pour Apache Flink](maintenance.md).

  Vous pouvez vérifier l'état de votre application à l'aide de la console ou à l'aide de l'[DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)action.
+ **État de la tâche :** lorsque votre application est à l’état `RUNNING`, votre tâche a un état qui décrit sa phase d’exécution en cours. Une tâche commence avec le statut `CREATED`, puis passe à l’état `RUNNING` une fois qu’elle a démarré. En cas d’erreur, votre application passe au statut suivant : 
  + Pour les applications utilisant Apache Flink 1.11 et versions ultérieures, votre application entre dans l’état `RESTARTING`.
  + Pour les applications utilisant Apache Flink 1.8 et versions antérieures, votre application entre dans l’état `FAILING`.

  L’application passe ensuite à l’état `RESTARTING` ou `FAILED`, selon que la tâche peut être redémarrée ou non. 

  Vous pouvez vérifier le statut du poste en consultant le CloudWatch journal de votre candidature pour vérifier les changements de statut.

## Exécuter des charges de travail par lots
<a name="batch-workloads"></a>

Le service géré Apache Flink prend en charge l’exécution de charges de travail par lots Apache Flink. Dans un traitement par lots, lorsqu’une tâche Apache Flink atteint l’état **TERMINÉ**, l’état de l’application de service géré pour Apache Flink est défini sur **PRÊT**. Pour plus d’informations sur les états des tâches Flink, consultez la section [Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Consultez les ressources de l'application Managed Service for Apache Flink
<a name="how-resources"></a>

Cette section décrit les ressources système utilisées par votre application. Comprendre comment le service géré pour Apache Flink fournit et utilise les ressources vous aidera à concevoir, créer et maintenir un service géré performant et stable pour l’application Apache Flink.

## Service géré pour les ressources de l'application Apache Flink
<a name="how-resources-kda"></a>

Le service géré pour Apache Flink est un AWS service qui crée un environnement pour héberger votre application Apache Flink. Le service géré pour Apache Flink fournit des ressources à l'aide d'unités appelées **Kinesis Processing Units KPUs** ().

Un KPU représente les ressources système suivantes :
+ Un cœur de processeur
+ 4 Go de mémoire, dont 1 Go de mémoire native et 3 Go de mémoire de tas
+ 50 Go d’espace disque

KPUs exécuter des applications dans des unités d'exécution distinctes appelées **tâches** et **sous-tâches**. Vous pouvez comparer une sous-tâche à un fil d’actualité.

Le nombre de KPUs fichiers disponibles pour une application est égal au `Parallelism` paramètre de l'application, divisé par le `ParallelismPerKPU` paramètre de l'application. 

Pour de plus amples informations sur le parallélisme d’application, consultez [Mettre en œuvre le dimensionnement des applications](how-scaling.md).

## Ressources de l'application Apache Flink
<a name="how-resources-flink"></a>

L’environnement Apache Flink alloue des ressources à votre application à l’aide d’unités appelées **emplacements de tâche**. Lorsque le service géré pour Apache Flink alloue des ressources à votre application, il attribue un ou plusieurs emplacements de tâche Apache Flink à un seul KPU. Le nombre d’emplacements attribués à un seul KPU est égal au paramètre `ParallelismPerKPU` de votre application. Pour plus d'informations sur les créneaux de tâches, consultez la section [Planification des tâches](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) dans la documentation d'Apache Flink.

### Parallélisme des opérateurs
<a name="how-resources-flink-operatorparallelism"></a>

Vous pouvez définir le nombre maximal de sous-tâches qu’un opérateur peut utiliser. Cette valeur s’appelle le **parallélisme de l’opérateur**. Par défaut, le parallélisme de chaque opérateur de votre application est égal au parallélisme de l’application. Cela signifie que par défaut, chaque opérateur de votre application peut utiliser toutes les sous-tâches disponibles dans l’application si nécessaire.

Vous pouvez définir le parallélisme des opérateurs de votre application à l’aide de la méthode `setParallelism`. Grâce à cette méthode, vous pouvez contrôler le nombre de sous-tâches que chaque opérateur peut utiliser simultanément.

Pour plus d'informations sur les opérateurs, consultez la section [Opérateurs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) dans la documentation d'Apache Flink.

### Chainage des opérateurs
<a name="how-resources-flink-operatorchaining"></a>

Normalement, chaque opérateur utilise une sous-tâche distincte à exécuter, mais si plusieurs opérateurs s’exécutent toujours en séquence, le moteur d’exécution peut les affecter tous à la même tâche. Ce processus s’appelle le **chaînage d’opérateurs**.

Plusieurs opérateurs séquentiels peuvent être enchaînés dans une même tâche s’ils opèrent tous sur les mêmes données. Voici quelques-uns des critères nécessaires pour que cela soit vrai :
+ Les opérateurs effectuent un transfert simple de 1 à 1.
+ Les opérateurs ont tous le même parallélisme d’opérateur.

Lorsque votre application regroupe les opérateurs dans une seule sous-tâche, elle économise les ressources du système, car le service n’a pas besoin d’effectuer des opérations réseau et d’allouer des sous-tâches à chaque opérateur. Pour déterminer si votre application utilise le chaînage d’opérateurs, examinez le graphique des tâches dans la console du service géré pour Apache Flink. Chaque sommet de l’application représente un ou plusieurs opérateurs. Le graphique montre les opérateurs qui ont été enchaînés en tant que sommet unique.

# Facturation à la seconde dans le service géré pour Apache Flink
<a name="how-pricing"></a>

Le service géré pour Apache Flink est désormais facturé par tranches d'une seconde. Il y a un minimum de dix minutes de frais par demande. La facturation à la seconde s'applique aux applications récemment lancées ou déjà en cours d'exécution. Cette section décrit comment Managed Service for Apache Flink mesure et facture votre utilisation. Pour en savoir plus sur la tarification du service géré pour Apache Flink, consultez la section Tarification [d'Amazon Managed Service pour Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). 

## Comment ça marche
<a name="how-resources-kda"></a>

Le service géré pour Apache Flink vous facture la durée et le nombre d'**unités de traitement Kinesis KPUs (**) facturées par tranches d'une seconde dans la limite prise en charge. Régions AWS Un seul KPU comprend 1 vCPU de calcul et 4 Go de mémoire. Un taux horaire vous est facturé en fonction du nombre d'applications KPUs utilisées pour exécuter vos applications. 

Par exemple, une application exécutée pendant 20 minutes et 10 secondes sera facturée pendant 20 minutes et 10 secondes, multipliée par les ressources utilisées. Une application qui s'exécute pendant 5 minutes se verra facturer le minimum de dix minutes, multiplié par les ressources utilisées.

Le service géré pour Apache Flink indique l'utilisation en heures. Par exemple, 15 minutes correspondent à 0,25 heure. 

Pour les applications Apache Flink, un seul KPU supplémentaire par application vous est facturé, utilisé pour l'orchestration. Les applications sont également facturées pour l'exécution du stockage et les sauvegardes durables. Le stockage des applications en cours d'exécution est utilisé pour les capacités de traitement dynamique dans Managed Service for Apache Flink et est facturé par unité. GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month 

En mode streaming, le service géré pour Apache Flink adapte automatiquement le nombre de données KPUs requises par votre application de traitement de flux en fonction des besoins en mémoire et en calcul. Vous pouvez choisir de fournir à votre demande le nombre requis de KPUs. 

## Région AWS disponibilité
<a name="how-pricing-regions"></a>

**Note**  
Pour le moment, la facturation à la seconde n'est pas disponible dans les régions suivantes : AWS GovCloud (USA Est), AWS GovCloud (USA Ouest), Chine (Pékin) et Chine (Ningxia).

La facturation à la seconde est disponible dans les formats suivants Régions AWS : 
+ USA Est (Virginie du Nord) - us-east-1
+ USA Est (Ohio) - us-east-2
+ USA Ouest (Californie du Nord) – us-west-1
+ USA Ouest (Oregon) - us-west-2
+ Afrique (Le Cap) – af-south-1
+ Asie-Pacifique (Hong Kong) – ap-east-1
+ Asie-Pacifique (Hyderabad) - ap-south-1
+ Asie-Pacifique (Jakarta) – ap-southeast-3
+ Asie-Pacifique (Melbourne) - ap-southeast-4
+ Asie-Pacifique (Mumbai) – ap-south-1
+ Asie-Pacifique (Osaka) – ap-northeast-3
+ Asie-Pacifique (Séoul) – ap-northeast-2
+ Asie-Pacifique (Singapour) – ap-southeast-1
+ Asie-Pacifique (Sydney) - ap-southeast-2
+ Asie-Pacifique (Tokyo) - ap-northeast-1
+ Canada (Centre) – ca-central-1
+ Canada Ouest (Calgary) – ca-west-1
+ Europe (Francfort) eu-central-1
+ Europe (Irlande) – eu-west-1
+ Europe (Londres) – eu-west-2
+ Europe (Milan) – eu-south-1
+ Europe (Paris) – eu-west-3
+ Europe (Espagne) – eu-south-2
+ Europe (Stockholm) – eu-north-1
+ Europe (Zurich) – eu-central-2
+ Israël (Tel Aviv) - il-central-1
+ Moyen-Orient (Bahreïn) – me-south-1
+ Moyen-Orient (Émirats arabes unis) – me-central-1
+ Amérique du Sud (São Paulo) – sa-east-1

## Exemples de prix
<a name="how-pricing-examples"></a>

Vous trouverez des exemples de tarification sur la page de tarification du service géré pour Apache Flink. Pour plus d'informations, consultez la section [Tarification d'Amazon Managed Service pour Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/). Vous trouverez ci-dessous d'autres exemples illustrés par le rapport d'utilisation des coûts pour chacun d'entre eux.

### Une charge de travail longue et lourde
<a name="pricing-example-1"></a>

Vous êtes un important service de streaming vidéo et vous souhaitez élaborer une recommandation vidéo en temps réel basée sur les interactions de vos utilisateurs. Vous utilisez une application Apache Flink dans Managed Service for Apache Flink afin d'ingérer en permanence les événements d'interaction utilisateur provenant de plusieurs flux de données Kinesis et de traiter les événements en temps réel avant de les transmettre à un système en aval. Les événements d'interaction utilisateur sont transformés à l'aide de plusieurs opérateurs. Cela inclut le partitionnement des données par type d'événement, l'enrichissement des données avec des métadonnées supplémentaires, le tri des données par horodatage et la mise en mémoire tampon des données pendant 5 minutes avant la livraison. L'application comporte de nombreuses étapes de transformation qui nécessitent beaucoup de calcul et sont parallélisables. Votre application Flink est configurée pour fonctionner avec 20 afin de s'adapter KPUs à la charge de travail. Votre application utilise 1 Go de sauvegarde durable chaque jour. Les frais mensuels du service géré pour Apache Flink seront calculés comme suit :

**Charges mensuelles**

Le prix dans la région de l'est des États-Unis (Virginie du Nord) est de 0,11\$1 par KPU-heure. Le service géré pour Apache Flink alloue 50 Go de stockage d'applications en cours d'exécution par KPU et facture 0,10 USD par Go et par mois.
+ Frais KPU mensuels : 24 heures\$1 30 jours\$1 (20 KPUs \$11 KPU supplémentaire pour l'application de streaming) \$1 0,11 \$1/heure = 1 584,00\$1
+ Frais mensuels de stockage des applications : 30 jours\$1 20 KPUs \$1 50\$1 GB/KPUs \$1 0,10 \$1/Go par mois = 100,00\$1
+ Frais mensuels de stockage durable des applications : 30 jours\$1 1 Go \$1 0,023 Go/mois = 0,03 USD
+ **Total des frais : 1 584,00\$1 \$1 100\$1 \$1 0,03\$1 = 1 684,03\$1**

**Rapport d'utilisation des coûts pour Managed Service for Apache Flink sur la console Billing and Cost Management pour le mois**

Kinesis Analytics
+ 1 684,03 USD - Est des États-Unis (Virginie du Nord)
+ Amazon Kinesis Analytics CreateSnapshot
  + 0,023\$1 par Go par mois de sauvegardes d'applications durables
    + 1 Go par mois - 0,03 USD
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD par Go par mois de stockage d'applications en cours d'exécution
    + 1 000 Go par mois - 100 USD
  + 0,11 USD par heure d'unité de traitement Kinesis pour les applications Apache Flink
    + 15 120 KPU/heure - 1 584 USD

### Une charge de travail par lots qui s'exécute pendant environ 15 minutes par jour
<a name="pricing-example-2"></a>

Vous utilisez une application Apache Flink dans Managed Service for Apache Flink pour transformer les données de journal dans Amazon Simple Storage Service (Amazon S3) en mode batch. Les données du journal sont transformées à l'aide de plusieurs opérateurs. Cela inclut l'application d'un schéma aux différents événements du journal, le partitionnement des données par type d'événement et le tri des données par horodatage. L'application comporte de nombreuses étapes de transformation, mais aucune ne nécessite de calculs intensifs. Cette application ingère 2 000 données records/second pendant 15 minutes chaque jour pendant un mois de 30 jours. Vous ne créez aucune sauvegarde d'application durable. Les frais mensuels du service géré pour Apache Flink seront calculés comme suit :

**Charges mensuelles**

Le prix dans la région de l'est des États-Unis (Virginie du Nord) est de 0,11\$1 par KPU-heure. Le service géré pour Apache Flink alloue 50 Go de stockage d'applications en cours d'exécution par KPU et facture 0,10 USD par Go et par mois.
+ Charge de travail par lots : pendant les 15 minutes par jour, l'application Managed Service for Apache Flink traite 2 000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Frais KPU mensuels : minutes/month 450\$1 (2 KPUs \$11 KPU supplémentaire pour l'application de streaming) \$1 0,11 \$1/heure = 2,48\$1
+ Frais mensuels de stockage des applications : 450 minutes/month \$1 2 \$1 50 KPUs GB/KPUs \$1 0,10 \$1/Go par mois = 0,11 dollar
+ **Total des frais : 2,48\$1 \$1 0,11 = 2,59\$1**

**Rapport d'utilisation des coûts pour Managed Service for Apache Flink sur la console Billing and Cost Management pour le mois**

Kinesis Analytics
+ 2,59 USD - Est des États-Unis (Virginie du Nord)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD par Go par mois de sauvegarde des applications en cours
    + 1,042 Go par mois - 0,11 USD
  + 0,11 USD par heure d'unité de traitement Kinesis pour les applications Apache Flink
    + 22,5 KPU/heure - 2,48 USD

### Une application de test qui s'arrête et démarre en continu au cours de la même heure, entraînant plusieurs charges minimales
<a name="pricing-example-3"></a>

Vous êtes une grande plateforme de commerce électronique qui traite des millions de transactions chaque jour. Vous souhaitez développer la détection des fraudes en temps réel. Vous utilisez une application Apache Flink dans Managed Service for Apache Flink pour ingérer les événements de transaction provenant de Kinesis Data Streams et les traiter en temps réel en suivant différentes étapes de transformation. Cela inclut l'utilisation d'une fenêtre coulissante pour agréger les événements, le partitionnement des événements par type d'événement et l'application de règles de détection spécifiques pour différents types d'événements. Au cours du développement, vous démarrez et arrêtez votre application plusieurs fois pour tester et déboguer le comportement. Il arrive que votre application ne s'exécute que pendant quelques minutes. Il y a une heure pendant laquelle vous testez votre application avec 4 KPUs et celle-ci n'utilise aucune sauvegarde d'application durable :
+ À 10 h 05, vous démarrez votre application, qui s'exécute pendant 30 minutes avant de s'arrêter à 10 h 35.
+ À 10 h 40, vous redémarrez votre application, qui s'exécute pendant 5 minutes avant de s'arrêter à 10 h 45.
+ À 10 h 50, vous redémarrez l'application, qui s'exécute pendant 2 minutes avant de s'arrêter à 10 h 52.

Le service géré pour Apache Flink facture un minimum de 10 minutes d'utilisation chaque fois qu'une application démarre. L'utilisation mensuelle du service géré pour Apache Flink pour votre application sera calculée comme suit :
+ Premier démarrage et arrêt de votre application : 30 minutes d'utilisation
+ Deuxième démarrage et arrêt de votre application : 10 minutes d'utilisation (votre application fonctionne pendant 5 minutes arrondies à la charge minimale de 10 minutes)
+ Troisième démarrage et arrêt de votre application : 10 minutes d'utilisation (votre application fonctionne pendant 2 minutes, arrondie à la charge minimale de 10 minutes)

Au total, votre application sera facturée pour 50 minutes d'utilisation. À aucun autre moment du mois où votre application est exécutée, les frais mensuels du service géré pour Apache Flink seront calculés comme suit :

**Charges mensuelles**

Le prix dans la région de l'est des États-Unis (Virginie du Nord) est de 0,11\$1 par KPU-heure. Le service géré pour Apache Flink alloue 50 Go de stockage d'applications en cours d'exécution par KPU et facture 0,10 USD par Go et par mois.
+ Frais KPU mensuels : 50 minutes\$1 (4 KPUs \$11 KPU supplémentaire pour l'application de streaming) \$1 0,11 \$1/heure = 0,46\$1 (arrondi au centime le plus proche)
+ Frais mensuels de stockage des applications : 50 minutes\$1 4 KPUs \$1 50 \$1 0,10 \$1/Go GB/KPUs par mois = 0,03\$1 (arrondis au centime le plus proche)
+ **Total des frais : 0,46\$1 \$1 0,03 = 0,49\$1**

**Rapport d'utilisation des coûts pour Managed Service for Apache Flink sur la console Billing and Cost Management pour le mois**

Kinesis Analytics
+ 0,49 USD - Est des États-Unis (Virginie du Nord)
+ Amazon Kinesis Analytics StartApplication
  + 0,10 USD par Go par mois de stockage d'applications en cours d'exécution
    + 0,232 Go par mois - 0,03 USD
  + 0,11 USD par heure d'unité de traitement Kinesis pour les applications Apache Flink
    + 4,167 KPU/heure - 0,46 USD

# Vérifier les composants de DataStream l'API
<a name="how-datastream"></a>

Votre application Apache Flink utilise l'[ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) pour transformer les données en flux de données. 

Cette section décrit les différents composants qui déplacent, transforment et suivent les données :
+ [Utilisez des connecteurs pour déplacer des données dans le service géré pour Apache Flink avec l'API DataStream](how-connectors.md) : ces composants déplacent les données entre votre application et les sources de données et destinations externes.
+ [Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec l'API DataStream](how-operators.md) : ces composants transforment ou regroupent des éléments de données au sein de votre application.
+ [Suivez les événements dans le service géré pour Apache Flink à l'aide de l'API DataStream](how-time.md): cette rubrique décrit comment le service géré pour Apache Flink suit les événements lors de l'utilisation de l' DataStream API.

# Utilisez des connecteurs pour déplacer des données dans le service géré pour Apache Flink avec l'API DataStream
<a name="how-connectors"></a>

Dans l' DataStream API Amazon Managed Service for Apache Flink, les *connecteurs* sont des composants logiciels qui déplacent les données vers et depuis une application Managed Service for Apache Flink. Les connecteurs sont des intégrations flexibles qui vous permettent de lire des fichiers et des répertoires. Les connecteurs sont constitués de modules complets permettant d’interagir avec les services Amazon et les systèmes tiers.

Les types de connecteurs sont les suivants :
+ [Ajouter des sources de données de streaming](how-sources.md) : fournit des données à votre application à partir d’un flux de données Kinesis, d’un fichier ou d’une autre source de données.
+ [Écrire des données à l'aide de récepteurs](how-sinks.md): envoyez des données depuis votre application vers un flux de données Kinesis, un flux Firehose ou une autre destination de données.
+ [Utiliser des E/S asynchrones](how-async.md) : fournit un accès asynchrone à une source de données (telle qu’une base de données) pour enrichir les événements de flux. 

## Connecteurs disponibles
<a name="how-connectors-list"></a>

L’environnement Apache Flink contient des connecteurs permettant d’accéder aux données provenant de diverses sources. Pour obtenir des informations sur les connecteurs disponibles dans l’environnement Apache Flink, consultez [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Avertissement**  
Si vous avez des applications exécutées sur Flink 1.6, 1.8, 1.11 ou 1.13 et que vous souhaitez les exécuter dans les régions du Moyen-Orient (EAU), de l'Asie-Pacifique (Hyderabad), d'Israël (Tel Aviv), de l'Europe (Zurich), du Moyen-Orient (EAU), de l'Asie-Pacifique (Melbourne) ou de l'Asie-Pacifique (Jakarta), vous devrez peut-être reconstruire l'archive de vos applications avec un connecteur mis à jour ou passer à Flink 1.18.   
Les connecteurs Apache Flink sont stockés dans leurs propres référentiels open source. Si vous effectuez une mise à niveau vers la version 1.18 ou ultérieure, vous devez mettre à jour vos dépendances. Pour accéder au référentiel des AWS connecteurs Apache Flink, consultez [flink-connector-aws](https://github.com/apache/flink-connector-aws).  
L'ancienne source Kinesis `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` n'est plus disponible et pourrait être supprimée dans une future version de Flink. Utilisez [plutôt Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Il n'y a aucune compatibilité entre les états `FlinkKinesisConsumer` et`KinesisStreamsSource`. Pour plus de détails, consultez la section [Migration de tâches existantes vers la nouvelle source Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) dans la documentation d'Apache Flink.  
 Voici les directives recommandées :   


**Améliorations de connecteurs**  

| Version Flink | Connecteur utilisé | Résolution | 
| --- | --- | --- | 
| 1,19, 1,20 | Source de Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1,19, 1,20 | Évier Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1,19, 1,20 | Source des flux DynamoDB |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source DynamoDB Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1,19, 1,20 | Récepteur DynamoDB | Lors de la mise à niveau vers le service géré pour Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur DynamoDB le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1,19, 1,20 | Évier Amazon SQS |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Amazon SQS le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1,19, 1,20 | Service géré par Amazon pour Prometheus Sink |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous d'utiliser le connecteur récepteur Amazon Managed Service for Prometheus le plus récent. Il doit s'agir de n'importe quelle version 1.0.0 ou ultérieure. Pour plus d'informations, consultez [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Ajouter des sources de données de streaming au service géré pour Apache Flink
<a name="how-sources"></a>

Apache Flink fournit des connecteurs pour lire à partir de fichiers, de sockets, de collections et de sources personnalisées. Dans le code de votre application, vous utilisez une [source Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) pour recevoir les données d’un flux. Cette section décrit les sources disponibles pour les services Amazon.

## Utiliser les flux de données Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`fournit des données de streaming à votre application à partir d'un flux de données Amazon Kinesis. 

### Créer une `KinesisStreamsSource`
<a name="input-streams-create"></a>

L’exemple de code suivant illustre la création d’un `KinesisStreamsSource` :

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Pour plus d'informations sur l'utilisation d'un`KinesisStreamsSource`, consultez le connecteur [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) dans la documentation d'Apache Flink [et notre exemple KinesisConnectors public](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

### Créez un `KinesisStreamsSource` qui utilise un consommateur EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Il est désormais compatible avec [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un client Kinesis utilise EFO, le service Kinesis Data Streams lui fournit sa propre bande passante dédiée, au lieu que le consommateur partage la bande passante fixe du flux avec les autres consommateurs lisant le flux.

Pour plus d'informations sur l'utilisation d'EFO avec les consommateurs Kinesis, [consultez FLIP-128 : Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Vous activez le consommateur EFO en définissant les paramètres suivants sur le consommateur Kinesis :
+ **READER\$1TYPE :** définissez ce paramètre **sur EFO** pour que votre application utilise un consommateur EFO pour accéder aux données Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME :** définissez ce paramètre sur une valeur de chaîne unique parmi les consommateurs de ce flux. La réutilisation d’un nom de consommateur dans le même flux de données Kinesis entraînera la résiliation du client qui utilisait ce nom précédemment. 

Pour configurer un `KinesisStreamsSource` afin d’utiliser EFO, ajoutez les paramètres suivants au consommateur :

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Pour un exemple de service géré pour une application Apache Flink utilisant un client EFO, consultez [notre exemple public de Kinesis Connectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

## Utiliser Amazon MSK
<a name="input-msk"></a>

La source `KafkaSource` fournit des données de streaming à votre application à partir d’une rubrique Amazon MSK. 

### Créer une `KafkaSource`
<a name="input-msk-create"></a>

L’exemple de code suivant illustre la création d’un `KafkaSource` :

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Pour plus d’informations sur l’utilisation d’un `KafkaSource`, consultez [Réplication MSK](earlier.md#example-msk).

# Écrire des données à l'aide de récepteurs dans le service géré pour Apache Flink
<a name="how-sinks"></a>

Dans le code de votre application, vous pouvez utiliser n'importe quel connecteur [récepteur Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) pour écrire dans des systèmes externes, y compris AWS des services tels que Kinesis Data Streams et DynamoDB.

Apache Flink fournit également des récepteurs pour les fichiers et les sockets, et vous pouvez implémenter des récepteurs personnalisés. Parmi les différents éviers pris en charge, les suivants sont fréquemment utilisés :

## Utiliser les flux de données Kinesis
<a name="sinks-streams"></a>

Apache Flink fournit des informations sur le connecteur [Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) dans la documentation d’Apache Flink.

Pour un exemple d’application qui utilise un flux de données Kinesis pour l’entrée et la sortie, consultez [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

## Utiliser Apache Kafka et Amazon Managed Streaming pour Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Le [connecteur Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) fournit un support complet pour la publication de données sur Apache Kafka et Amazon MSK, y compris des garanties « une seule fois ». Pour savoir comment écrire dans Kafka, consultez les [exemples de connecteurs Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) dans la documentation d'Apache Flink.

## Utiliser Amazon S3
<a name="sinks-s3"></a>

Vous pouvez utiliser le `StreamingFileSink` Apache Flink pour écrire des objets dans un compartiment Amazon S3.

Pour un exemple sur la façon d’écrire des objets dans S3, consultez[Exemple : écriture dans un compartiment Amazon S3](earlier.md#examples-s3). 

## Utilisez Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`Il s'agit d'un récepteur Apache Flink fiable et évolutif permettant de stocker les résultats des applications à l'aide du service [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). Cette section décrit comment configurer un projet Maven pour créer et utiliser un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Créer une `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Exemple de code `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Créer une `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

L’exemple de code suivant illustre la création d’un `FlinkKinesisFirehoseProducer` :

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Exemple de code `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

L'exemple de code suivant montre comment créer et configurer un flux de données Apache Flink `FlinkKinesisFirehoseProducer` et comment envoyer des données au service Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Pour un didacticiel complet sur l'utilisation du lavabo Firehose, voir. [Exemple : écrire dans Firehose](earlier.md#get-started-exercise-fh)

# Utiliser l'asynchrone I/O dans le service géré pour Apache Flink
<a name="how-async"></a>

Un I/O opérateur asynchrone enrichit les données de flux à l'aide d'une source de données externe telle qu'une base de données. Le service géré pour Apache Flink enrichit les événements du flux de manière asynchrone afin que les demandes puissent être groupées pour une plus grande efficacité. 

Pour plus d'informations, consultez la section [E/S asynchrones dans la documentation d'](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)Apache Flink.

# Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec l'API DataStream
<a name="how-operators"></a>

Pour transformer les données entrantes dans un service géré pour Apache Flink, vous devez utiliser un *opérateur* Apache Flink. Un opérateur Apache Flink transforme un ou plusieurs flux de données en un nouveau flux de données. Le nouveau flux de données contient des données modifiées par rapport au flux de données d’origine. Apache Flink fournit plus de 25 opérateurs de traitement de flux prédéfinis. Pour plus d'informations, consultez la section [Opérateurs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) dans la documentation d'Apache Flink.

**Topics**
+ [Utiliser des opérateurs de transformation](#how-operators-transform)
+ [Utiliser des opérateurs d'agrégation](#how-operators-agg)

## Utiliser des opérateurs de transformation
<a name="how-operators-transform"></a>

Voici un exemple de transformation de texte simple sur l’un des champs d’un flux de données JSON. 

Ce code crée un flux de données transformé. Le nouveau flux de données contient les mêmes données que le flux d’origine, la chaîne « ` Company` » étant ajoutée au contenu du champ `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Utiliser des opérateurs d'agrégation
<a name="how-operators-agg"></a>

Voici un exemple d’opérateur d’agrégation. Le code crée un flux de données agrégé. L’opérateur crée une fenêtre variable de 5 secondes et renvoie la somme des valeurs `PRICE` des enregistrements de la fenêtre avec la même valeur `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Pour plus d’exemples de code, consultez [Exemples de création et d'utilisation d'un service géré pour les applications Apache Flink](examples-collapsibles.md). 

# Suivez les événements dans le service géré pour Apache Flink à l'aide de l'API DataStream
<a name="how-time"></a>

Le service géré pour Apache Flink suit les événements à l’aide des horodatages suivants :
+ **Heure de traitement :** fait référence à l’heure système de la machine qui exécute l’opération correspondante.
+ **Heure de l’événement :** fait référence à l’heure à laquelle chaque événement individuel s’est produit sur son appareil producteur.
+ **Heure d’ingestion :** fait référence à l’heure à laquelle les événements entrent dans le service géré pour Apache Flink.

Vous réglez le temps utilisé par l'environnement de streaming à l'aide de`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Pour plus d'informations sur les horodatages, consultez la section [Génération de filigranes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) dans la documentation d'Apache Flink.

# Composants de l'API Review Table
<a name="how-table"></a>

Votre application Apache Flink utilise l’[API de table Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) pour interagir avec les données d’un flux à l’aide d’un modèle relationnel. Vous utilisez l’API de table pour accéder aux données à l’aide des sources de table, puis vous utilisez les fonctions de table pour transformer et filtrer les données de table. Vous pouvez transformer et filtrer les données de table à l’aide de fonctions d’API ou de commandes SQL. 

Cette section contient les rubriques suivantes :
+ [Connecteurs d'API de table](how-table-connectors.md) : ces composants déplacent les données entre votre application et les sources de données et destinations externes.
+ [Attributs temporels de l'API Table](how-table-timeattributes.md) : cette rubrique décrit comment le service géré pour Apache Flink suit les événements lors de l’utilisation de l’API de table.

# Connecteurs d'API de table
<a name="how-table-connectors"></a>

Dans le modèle de programmation Apache Flink, les connecteurs sont des composants que votre application utilise pour lire ou écrire des données provenant de sources externes, telles que d'autres AWS services.

Avec l’API de table Apache Flink, vous pouvez utiliser les types de connecteurs suivants :
+ [Sources d'API du tableau](#how-table-connectors-source) : vous utilisez les connecteurs source de l’API de table pour créer des tables dans votre `TableEnvironment` à l’aide d’appels d’API ou de requêtes SQL.
+ [Réservoirs d'API de table](#how-table-connectors-sink) : vous utilisez des commandes SQL pour écrire des données de table dans des sources externes telles qu’une rubrique Amazon MSK ou un compartiment Amazon S3.

## Sources d'API du tableau
<a name="how-table-connectors-source"></a>

Vous créez une source de table à partir d’un flux de données. Le code suivant crée une table à partir d’une rubrique Amazon MSK :

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

Pour plus d'informations sur les sources de tables, consultez la section [Connecteurs de table et SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) dans la documentation d'Apache Flink.

## Réservoirs d'API de table
<a name="how-table-connectors-sink"></a>

Pour écrire des données de table dans un récepteur, vous créez le récepteur en SQL, puis vous exécutez le récepteur basé sur SQL sur l’objet `StreamTableEnvironment`.

L’exemple de code suivant illustre comment écrire des données de table sur un récepteur Amazon S3 :

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 Vous pouvez utiliser le paramètre `format` pour contrôler le format utilisé par le service géré pour Apache Flink pour écrire la sortie sur le récepteur. Pour plus d'informations sur les formats, consultez la section [Connecteurs pris en charge](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) dans la documentation d'Apache Flink.

## Sources et récepteurs définis par l'utilisateur
<a name="how-table-connectors-userdef"></a>

Vous pouvez utiliser les connecteurs Apache Kafka existants pour envoyer des données vers et depuis d’autres services AWS , tels qu’Amazon MSK et Amazon S3. Pour interagir avec d’autres sources de données et destinations, vous pouvez définir vos propres sources et récepteurs. Pour plus d'informations, consultez la section [Sources et récepteurs définis par l'utilisateur dans](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) la documentation d'Apache Flink.

# Attributs temporels de l'API Table
<a name="how-table-timeattributes"></a>

Chaque enregistrement d’un flux de données possède plusieurs horodatages qui définissent le moment où les événements liés à l’enregistrement se sont produits :
+ **Heure de l’événement** : horodatage défini par l’utilisateur qui définit le moment où l’événement à l’origine de l’enregistrement s’est produit.
+ **Heure d’ingestion** : heure à laquelle votre application a extrait l’enregistrement du flux de données.
+ **Heure de traitement** : heure à laquelle votre demande a traité l’enregistrement.

Lorsque l'API Apache Flink Table crée des fenêtres basées sur des temps records, vous définissez lequel de ces horodatages elle utilise à l'aide de la méthode. `setStreamTimeCharacteristic` 

Pour plus d'informations sur l'utilisation des horodatages avec l'API Table, consultez la section [Attributs temporels](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) et [traitement des flux en temps opportun](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) dans la documentation d'Apache Flink.

# Utiliser Python avec le service géré pour Apache Flink
<a name="how-python"></a>

**Note**  
Si vous développez l'application Python Flink sur un nouveau Mac équipé d'une puce Apple Silicon, vous pouvez rencontrer des [problèmes connus liés aux](https://issues.apache.org/jira/browse/FLINK-26981) dépendances Python de la version PyFlink 1.15. Dans ce cas, nous recommandons d’exécuter l’interpréteur Python dans Docker. Pour step-by-step obtenir des instructions, reportez-vous à la section [Développement de la version PyFlink 1.15 sur Apple Silicon Mac](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon).

La version 2.2 d'Apache Flink inclut la prise en charge de la création d'applications à l'aide de Python version 3.12 ; la prise en charge de Python version 3.8 est supprimée. Pour plus d'informations, consultez [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). Pour créer une application de service géré pour Apache Flink à l’aide de Python, procédez comme suit :
+ Créez le code de votre application Python sous forme de fichier texte avec une méthode `main`.
+ Regroupez le fichier de code de votre application et toutes les dépendances Python ou Java dans un fichier zip, puis chargez-le dans un compartiment Amazon S3.
+ Créez votre application de service géré pour Apache Flink, en spécifiant l’emplacement de votre code Amazon S3, les propriétés de l’application et les paramètres de l’application.

À un niveau élevé, l’API de table Python est un encapsuleur autour de l’API de table Java. Pour plus d'informations sur l'API Python Table, consultez le [didacticiel de l'API Table](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) dans la documentation Apache Flink.

# Programmez votre service géré pour l'application Apache Flink Python
<a name="how-python-programming"></a>

Vous codez votre service géré pour l’application Apache Flink pour Python à l’aide de l’API de table Apache Flink Python. Le moteur Apache Flink traduit les instructions de l’API de table Python (exécutées dans la machine virtuelle Python) en instructions de l’API de table Java (exécutées dans la machine virtuelle Java). 

Pour utiliser l’API de table Python, procédez comme suit :
+ Créez une référence vers l’`StreamTableEnvironment`.
+ Créez des objets `table` à partir de vos données de streaming source en exécutant des requêtes sur la référence `StreamTableEnvironment`.
+ Exécutez des requêtes sur vos objets `table` pour créer des tables de sortie.
+ Rédigez vos tables de sortie vers vos destinations à l’aide d’un `StatementSet`.

Pour commencer à utiliser l’API de table Python dans le service géré pour Apache Flink, consultez [Commencez avec Amazon Managed Service pour Apache Flink pour Python](gs-python.md).

## Lire et écrire des données de streaming
<a name="how-python-programming-readwrite"></a>

Pour lire et écrire des données en streaming, vous devez exécuter des requêtes SQL dans l’environnement de table.

### Création d’une table
<a name="how-python-programming-readwrite-createtable"></a>

L’exemple de code suivant illustre une fonction définie par l’utilisateur qui crée une requête SQL. La requête SQL crée une table qui interagit avec un flux Kinesis :

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Lire les données de streaming
<a name="how-python-programming-readwrite-read"></a>

L’exemple de code suivant montre comment utiliser la requête SQL `CreateTable` précédente sur une référence d’environnement de table pour lire des données :

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Écrire des données de streaming
<a name="how-python-programming-readwrite-write"></a>

L’exemple de code suivant montre comment utiliser la requête SQL de l’exemple `CreateTable` pour créer une référence de table de sortie, et comment utiliser un `StatementSet` pour interagir avec les tables afin d’écrire des données dans un flux Kinesis de destination :

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Lire les propriétés d'exécution
<a name="how-python-programming-properties"></a>

Vous pouvez utiliser les propriétés d’exécution pour configurer votre application sans changer le code de votre application.

Vous spécifiez les propriétés de votre application de la même manière qu’avec un service géré pour Apache Flink pour une application Java. Vous pouvez spécifier des propriétés d’exécution de différentes manières :
+ En utilisant l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)action.
+ En utilisant l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action.
+ En Configurant votre application à l’aide de la console.

Vous pouvez récupérer les propriétés de l’application dans le code en lisant un fichier JSON appelé `application_properties.json` créé par l’exécution du service géré pour Apache Flink.

L’exemple de code suivant montre comment lire les propriétés d’une application à partir du fichier `application_properties.json` :

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

L’exemple de code de fonction défini par l’utilisateur suivant illustre la lecture d’un groupe de propriétés à partir de l’objet des propriétés de l’application : récupère :

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

L’exemple de code suivant illustre la lecture d’une propriété appelée INPUT\$1STREAM\$1KEY à partir d’un groupe de propriétés renvoyé par l’exemple précédent :

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Créez le package de code de votre application
<a name="how-python-programming-package"></a>

Une fois que vous avez créé votre application Python, vous regroupez votre fichier de code et ses dépendances dans un fichier zip.

Votre fichier zip doit contenir un script python avec une méthode `main` et peut éventuellement contenir les éléments suivants :
+ Fichiers de code Python supplémentaires
+ Code Java défini par l’utilisateur dans les fichiers JAR
+ Bibliothèques Java dans des fichiers JAR

**Note**  
Le fichier zip de votre application doit contenir toutes les dépendances de votre application. Vous ne pouvez pas référencer des bibliothèques provenant d’autres sources pour votre application.

# Créez votre service géré pour l'application Apache Flink Python
<a name="how-python-creating"></a>

## Spécifiez vos fichiers de code
<a name="how-python-creating-code"></a>

Une fois que vous avez créé le package de code de votre application, vous le chargez dans un compartiment Amazon S3. Vous créez ensuite votre application à l'aide de la console ou de l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)action.

Lorsque vous créez votre application à l'aide de cette [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)action, vous spécifiez les fichiers de code et les archives de votre fichier zip à l'aide d'un groupe de propriétés d'application spécial appelé`kinesis.analytics.flink.run.options`. Vous pouvez définir les types de fichiers suivants :
+ **python** : fichier texte contenant une méthode principale de Python.
+ **jarfile :** fichier JAR Java contenant des fonctions Java définies par l’utilisateur.
+ **pyFiles** : fichier de ressources Python contenant les ressources à utiliser par l’application.
+ **pyArchives** : fichier zip contenant les fichiers de ressources de l’application.

Pour plus d'informations sur les types de fichiers de code Python d'Apache Flink, consultez la section [Interface de ligne de commande](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/) dans la documentation d'Apache Flink.

**Note**  
Le service géré pour Apache Flink ne prend pas en charge les types de fichiers `pyModule`, `pyExecutable` ou `pyRequirements`. L’ensemble du code, des exigences et des dépendances doivent se trouver dans votre fichier zip. Vous ne pouvez pas spécifier les dépendances à installer à l’aide de pip. 

L’exemple d’extrait de code JSON suivant montre comment spécifier l’emplacement des fichiers dans le fichier zip de votre application :

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Surveillez votre service géré pour l'application Apache Flink Python
<a name="how-python-monitoring"></a>

Vous utilisez le CloudWatch journal de votre application pour surveiller votre application Managed Service for Apache Flink Python.

Le service géré pour Apache Flink enregistre les messages suivants pour les applications Python :
+ Messages écrits sur la console à l’aide de `print()` dans la méthode `main` de l’application.
+ Messages envoyés dans le cadre de fonctions définies par l’utilisateur à l’aide du package `logging`. L’exemple de code suivant illustre l’écriture dans le journal des applications à partir d’une fonction définie par l’utilisateur :

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Messages d’erreur émis par l’application.

  Si l’application génère une exception dans la fonction `main`, elle apparaîtra dans les journaux de votre application.

  L’exemple suivant illustre une entrée de journal pour une exception émise à partir du code Python :

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**Note**  
En raison de problèmes de performances, nous vous recommandons de n’utiliser que des messages de journal personnalisés lors du développement de l’application. 

## Journaux de requêtes avec CloudWatch Insights
<a name="how-python-monitoring-insights"></a>

La requête CloudWatch Insights suivante recherche les journaux créés par le point d'entrée Python lors de l'exécution de la fonction principale de votre application :

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Utiliser les propriétés d'exécution dans Managed Service pour Apache Flink
<a name="how-properties"></a>

Vous pouvez utiliser les *propriétés d’exécution* pour configurer votre application sans recompiler le code de votre application. 

**Topics**
+ [Gérer les propriétés d'exécution à l'aide de la console](#how-properties-console)
+ [Gérer les propriétés d'exécution à l'aide de la CLI](#how-properties-cli)
+ [Accès aux propriétés d'exécution dans un service géré pour une application Apache Flink](#how-properties-access)

## Gérer les propriétés d'exécution à l'aide de la console
<a name="how-properties-console"></a>

Vous pouvez ajouter, mettre à jour ou supprimer des propriétés d'exécution de votre application Managed Service for Apache Flink à l'aide du AWS Management Console.

**Note**  
Si vous utilisez une version antérieure prise en charge d'Apache Flink et que vous souhaitez mettre à niveau vos applications existantes vers Apache Flink 1.19.1, vous pouvez le faire en utilisant des mises à niveau de version d'Apache Flink sur place. Grâce aux mises à niveau de version sur place, vous conservez la traçabilité des applications par rapport à un seul ARN pour toutes les versions d'Apache Flink, y compris les instantanés, les journaux, les métriques, les balises, les configurations Flink, etc. Vous pouvez utiliser cette fonctionnalité dans `RUNNING` et dans `READY` l'État. Pour de plus amples informations, veuillez consulter [Utiliser des mises à niveau de version sur place pour Apache Flink](how-in-place-version-upgrades.md).

**Mettre à jour les propriétés d’exécution d’une application de service géré pour l’application Apache Flink**

1. Connectez-vous à la AWS Management Console console Amazon MSF et ouvrez-la à https://console.aws.amazon.com l'adresse /flink.

1. Choisissez votre application de service géré pour Apache Flink. Choisissez **Détails de l’application**.

1. Sur la page de votre application, choisissez **Configurer**.

1. Développez la section **Propriétés**.

1. Utilisez les commandes de la section **Propriétés** pour définir un groupe de propriétés avec des paires clé-valeur. Utilisez ces commandes pour ajouter, mettre à jour ou supprimer des groupes de propriétés et des propriétés d’exécution.

1. Choisissez **Mettre à jour**.

## Gérer les propriétés d'exécution à l'aide de la CLI
<a name="how-properties-cli"></a>

Vous pouvez ajouter, mettre à jour ou supprimer des propriétés d’exécution à l’aide de l’interface [AWS CLI](https://docs.aws.amazon.com/cli). 

Cette section inclut des exemples de demandes d’actions d’API pour configurer les propriétés d’exécution pour une application. Pour obtenir des informations sur l’utilisation d’un fichier JSON comme entrée pour une action d’API, consultez [Exemple de code de service géré pour l'API Apache Flink](api-examples.md).

**Note**  
Remplacez l’exemple d’ID de compte (*`012345678901`*) dans les exemples suivants par votre ID de compte.

### Ajouter des propriétés d'exécution lors de la création d'une application
<a name="how-properties-create"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) suivant ajoute deux groupes de propriétés d’exécution (`ProducerConfigProperties` et `ConsumerConfigProperties`) lorsque vous créez une application :

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Ajouter et mettre à jour les propriétés d'exécution dans une application existante
<a name="how-properties-update"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant ajoute ou met à jour les propriétés d’exécution d’une application existante :

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**Note**  
Si vous utilisez une clé qui n’a aucune propriété d’exécution correspondante dans un groupe de propriétés, le service géré pour Apache Flink ajoute la paire clé-valeur en tant que nouvelle propriété. Si vous utilisez une clé pour une propriété d’exécution existante dans un groupe de propriétés, le service géré pour Apache Flink met à jour la valeur de la propriété. 

### Supprimer les propriétés d'exécution
<a name="how-properties-remove"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant supprime toutes les propriétés d’exécution et tous les groupes de propriétés d’une application existante :

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**Important**  
Si vous omettez un groupe de propriétés existant ou une clé de propriété existante dans un groupe de propriétés, ce groupe de propriétés ou cette propriété est supprimé.

## Accès aux propriétés d'exécution dans un service géré pour une application Apache Flink
<a name="how-properties-access"></a>

Vous pouvez récupérer les propriétés d’exécution dans le code de votre application Java à l’aide de la méthode `KinesisAnalyticsRuntime.getApplicationProperties()` statique, qui renvoie un objet `Map<String, Properties>`.

L’exemple de code Java suivant permet de récupérer les propriétés d’exécution pour votre application :

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

Vous pouvez récupérer un groupe de propriétés (sous forme d’objet `Java.Util.Properties`) comme suit :

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

Vous configurez généralement une source ou un récepteur Apache Flink en transmettant l’objet `Properties` sans avoir à récupérer les propriétés individuelles. L’exemple de code suivant montre comment créer une source Flink en transmettant un objet `Properties` extrait des propriétés d’exécution :

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

Pour des exemples de code, consultez [Exemples de création et d'utilisation d'un service géré pour les applications Apache Flink](examples-collapsibles.md).

# Utiliser les connecteurs Apache Flink avec le service géré pour Apache Flink
<a name="how-flink-connectors"></a>

Les connecteurs Apache Flink sont des composants logiciels qui transfèrent des données vers et depuis une application Amazon Managed Service pour Apache Flink. Les connecteurs sont des intégrations flexibles qui vous permettent de lire des fichiers et des répertoires. Les connecteurs sont constitués de modules complets permettant d’interagir avec les services Amazon et les systèmes tiers.

Les types de connecteurs sont les suivants :
+ **Sources :** fournissez des données à votre application à partir d'un flux de données Kinesis, d'un fichier, d'un sujet Apache Kafka, d'un fichier ou d'autres sources de données.
+ **Récepteurs :** envoyez des données depuis votre application vers un flux de données Kinesis, un flux Firehose, une rubrique Apache Kafka ou d'autres destinations de données.
+ **E/S asynchrones :** fournit un accès asynchrone à une source de données telle qu'une base de données pour enrichir les flux. 

Les connecteurs Apache Flink sont stockés dans leurs propres référentiels sources. La version et l'artefact des connecteurs Apache Flink changent en fonction de la version d'Apache Flink que vous utilisez et selon que vous utilisez l' DataStreamAPI Table ou SQL. 

Amazon Managed Service pour Apache Flink prend en charge plus de 40 connecteurs source et récepteur Apache Flink prédéfinis. Le tableau suivant fournit un résumé des connecteurs les plus courants et de leurs versions associées. Vous pouvez également créer des récepteurs personnalisés à l'aide du framework Async-sink. Pour plus d'informations, consultez [The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) dans la documentation d'Apache Flink.

 Pour accéder au référentiel des AWS connecteurs Apache Flink, consultez [flink-connector-aws](https://github.com/apache/flink-connector-aws).

## Connecteurs pour Flink 2.2
<a name="connectors-flink-2-2"></a>

Lors de la mise à niveau vers Flink 2.2, vous devez mettre à jour les dépendances de votre connecteur vers des versions compatibles avec le runtime Flink 2.x. Les connecteurs Flink sont publiés indépendamment du moteur d'exécution Flink, et tous les connecteurs ne disposent pas encore d'une version compatible avec Flink 2.x. Le tableau suivant résume la disponibilité des connecteurs couramment utilisés dans Amazon Managed Service pour Apache Flink au moment de la rédaction de cet article :


**Connecteurs pour Flink 2.2**  

| Connecteur | Version 2.0 ou ultérieure de Flink | Remarques | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4,0,0-2,0 | Recommandé pour Flink 2.2 | 
| Kinesis Data Streams (source) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recommandé pour Flink 2.2 | 
| Kinesis Data Streams (récepteur) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Recommandé pour Flink 2.2 | 
| FileSystem (S3, HDFS) | Fourni avec Flink | Intégré à la distribution Flink — toujours disponible | 
| JDBC | Pas encore sorti pour 2.x | Aucune version compatible avec Flink 2.x n'est disponible | 
| OpenSearch | Pas encore sorti pour 2.x | Aucune version compatible avec Flink 2.x n'est disponible | 
| Elasticsearch | Pas encore sorti pour 2.x | Envisagez de migrer vers le connecteur OpenSearch  | 
| Amazon Managed Service for Prometheus | Pas encore sorti pour 2.x | Aucune version compatible avec Flink 2.x au moment de la rédaction | 

Si votre application dépend d'un connecteur qui ne possède pas encore de version 2.2 de Flink, deux options s'offrent à vous : attendre que le connecteur publie une version compatible ou évaluer si vous pouvez le remplacer par une autre version (par exemple, en utilisant le catalogue JDBC ou un récepteur personnalisé).

**Problèmes connus**
+ Les applications utilisant le chemin `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) introduit dans les connecteurs v5.0.0 et v6.0.0 peuvent échouer lorsque les flux Kinesis sont repartagés. Il s'agit d'un problème connu au sein de la communauté. Pour plus d'informations, consultez [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Les applications utilisant le chemin `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) introduit dans les connecteurs v5.0.0 et v6.0.0 `KinesisStreamsSink` peuvent être bloquées si l'application Flink est soumise à une contre-pression, ce qui entraîne l'arrêt complet du traitement des données dans une ou plusieurs applications. TaskManagers Une opération d'arrêt forcé et une opération de démarrage de l'application sont nécessaires pour récupérer l'application. Il s'agit d'un sous-cas du problème connu dans la communauté : [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Connecteurs pour les anciennes versions de Flink
<a name="connectors-older-versions"></a>


**Connecteurs pour les anciennes versions de Flink**  

| Connecteur | Version 1.15 de Flink | Version 1.18 de Flink | Versions 1.19 de Flink | Versions 1.20 de Flink | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream (source) DataStream et API de table | flink-connector-kinesis, 1,15.4 | flink-connector-kinesis, 4,3,0-1,18 | flink-connector-kinesis, 5,0,0-1,19 | flink-connector-kinesis, 5,0,0-1,20 | 
| API Kinesis Data Stream - Sink - DataStream et Table | flink-connector-aws-kinesis-streams, 1.15.4 | flink-connector-aws-kinesis-streams, 4.3.0-1,18 | flink-connector-aws-kinesis-streams, 5.0.0-1,19 | flink-connector-aws-kinesis-streams, 5.0.0-1,20 | 
| Kinesis Data Streams Source/Sink - - SQL | flink-sql-connector-kinesis, 1,15.4 | flink-sql-connector-kinesis, 4,3,0-1,18 | flink-sql-connector-kinesis, 5,0,0-1,19 | flink-sql-connector-kinesis-streams, 5.0.0-1,20 | 
| API Kafka DataStream et Table | flink-connector-kafka, 1,15.4 | flink-connector-kafka, 3,2,0-1,18 | flink-connector-kafka, 3,3,0-1,19 | flink-connector-kafka, 3,3,0-1,20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1,15.4 | flink-sql-connector-kafka, 3,2,0-1,18 | flink-sql-connector-kafka, 3,3,0-1,19 | flink-sql-connector-kafka, 3,3,0-1,20 | 
| Firehose - DataStream et API Table | flink-connector-aws-kinesis-lance à incendie, 1.15.4 | flink-connector-aws-firehose, 4,3,0-1,18 | flink-connector-aws-firehose, 5,0,0-1,19 | flink-connector-aws-firehose, 5,0,0-1,20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose, 1.15.4 | flink-sql-connector-aws-tuyau à incendie, 4.3.0-1,18 | flink-sql-connector-aws-tuyau à incendie, 5.0.0-1,19 | flink-sql-connector-aws-tuyau à incendie, 5,0,0-1,20 | 
| DynamoDB - et API de table DataStream  | flink-connector-dynamodb, 3,0,0-1,15 | flink-connector-dynamodb, 4,3,0-1,18 | flink-connector-dynamodb, 5,0,0-1,19 | flink-connector-dynamodb, 5,0,0-1,20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3,0,0-1,15 | flink-sql-connector-dynamodb, 4,3,0-1,18 | flink-sql-connector-dynamodb, 5,0,0-1,19 | flink-sql-connector-dynamodb, 5,0,0-1,20 | 
| OpenSearch - DataStream et API Table | - | flink-connector-opensearch, 1,2,0-1,18 | flink-connector-opensearch, 1,2,0-1,19 | flink-connector-opensearch, 1,2,0-1,19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-sql-connector-opensearch, 1,2,0-1,19 | flink-sql-connector-opensearch, 1,2,0-1,19 | 
| Amazon Managed Service pour Prometheus DataStream | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-connector-prometheus, 1,0,0-1,19 | flink-connector-prometheus, 1,0-1,20 | 
| Amazon SQS DataStream et API de table | - | flink-sql-connector-opensearch, 1,2,0-1,18 | flink-connector-sqs, 5,0,0-1,19 | flink-connector-sqs, 5,0,0-1,20 | 

Pour en savoir plus sur les connecteurs dans Amazon Managed Service pour Apache Flink, consultez :
+ [DataStream Connecteurs API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Connecteurs d'API de table](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Problèmes connus
<a name="connectors-known-issues"></a>

Il existe un problème connu d'Apache Flink open source avec le connecteur Apache Kafka dans Apache Flink 1.15. Ce problème est résolu dans les versions ultérieures d'Apache Flink. 

Pour de plus amples informations, veuillez consulter [Problèmes connus](flink-1-15-2.md#flink-1-15-known-issues). 

# Implémenter la tolérance aux pannes dans le service géré pour Apache Flink
<a name="how-fault"></a>

Le point de contrôle est la méthode utilisée pour implémenter la tolérance aux pannes dans le service géré Amazon pour Apache Flink. Un *point de contrôle* est une up-to-date sauvegarde d'une application en cours d'exécution qui est utilisée pour effectuer une restauration immédiate en cas d'interruption ou de basculement imprévu d'une application. 

Pour plus de détails sur le point de contrôle dans les applications Apache Flink, voir [Points de contrôle](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) dans la documentation d'Apache Flink.

Un *instantané* est une sauvegarde créée et gérée manuellement de l’état de l’application. Les instantanés vous permettent de restaurer l’état antérieur de votre application en appelant [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html). Pour de plus amples informations, veuillez consulter [Gérez les sauvegardes d'applications à l'aide de snapshots](how-snapshots.md).

Si le point de contrôle est activé pour votre application, le service assure la tolérance aux pannes en créant et en chargeant des sauvegardes des données de l’application en cas de redémarrage inattendu de l’application. Ces redémarrages inattendus d’application peuvent être provoqués par des redémarrages de tâche inattendus, des échecs d’instance, etc. Cela donne à l’application la même sémantique qu’une exécution sans échec lors de ces redémarrages. 

Si les instantanés sont activés pour l'application et configurés à l'aide de ceux de l'application [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html), le service fournit une sémantique de traitement unique lors des mises à jour de l'application, ou lors du dimensionnement ou de la maintenance liés au service.

## Configurer le point de contrôle dans le service géré pour Apache Flink
<a name="how-fault-configure"></a>

Vous pouvez configurer le comportement de point de contrôle de votre application. Vous pouvez définir si elle conserve l’état de point de contrôle, à quelle fréquence elle enregistre son état dans les points de contrôle et l’intervalle minimum entre la fin d’une opération de point de contrôle et le début d’une autre.

Vous configurez les paramètres suivants à l’aide des opérations d’API [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) :
+ `CheckpointingEnabled` : indique si le point de contrôle est activé dans l’application.
+ `CheckpointInterval` : contient le temps en millisecondes entre les opérations de point de contrôle (persistance).
+ `ConfigurationType` : définissez cette valeur sur `DEFAULT` pour utiliser le comportement de point de contrôle par défaut. Définissez cette valeur sur `CUSTOM` pour configurer d’autres valeurs.
**Note**  
Le comportement du point de contrôle par défaut est le suivant :  
**CheckpointingEnabled:** vrai
**CheckpointInterval:** 60 000
**MinPauseBetweenCheckpoints:** 5000
S'il **ConfigurationType**est défini sur`DEFAULT`, les valeurs précédentes seront utilisées, même si elles sont définies sur d'autres valeurs en utilisant le AWS Command Line Interface ou en définissant les valeurs dans le code de l'application.
**Note**  
À partir de Flink 1.15, le service géré pour Apache Flink utilise `stop-with-savepoint` lors de la création automatique d’instantanés, c’est-à-dire lors de la mise à jour, de la mise à l’échelle ou de l’arrêt de l’application. 
+ `MinPauseBetweenCheckpoints` : durée minimale en millisecondes entre la fin d’une opération de point de contrôle et le début d’une autre. La définition de cette propriété empêche l’application de créer un point de contrôle continu lorsque l’opération de contrôle dure plus de temps que `CheckpointInterval`.

## Consultez les exemples d'API de point de contrôle
<a name="how-fault-examples"></a>

Cette section inclut des exemples de demandes d’actions d’API pour configurer les points de contrôle pour une application. Pour obtenir des informations sur l’utilisation d’un fichier JSON comme entrée pour une action d’API, consultez [Exemple de code de service géré pour l'API Apache Flink](api-examples.md).

### Configurer le point de contrôle pour une nouvelle application
<a name="how-fault-examples-create-config"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) suivant configure les points de contrôle lorsque vous créez une application :

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Désactiver le point de contrôle pour une nouvelle application
<a name="how-fault-examples-create-disable"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) suivant désactive les points de contrôle lorsque vous créez une application :

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Configurer le point de contrôle pour une application existante
<a name="how-fault-examples-update-config"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant configure les points de contrôle pour une application existante :

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Désactiver le point de contrôle pour une application existante
<a name="how-fault-examples-update-update-disable"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant désactive les points de contrôle pour une application existante :

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Gérez les sauvegardes d'applications à l'aide de snapshots
<a name="how-snapshots"></a>

Un *instantané* est l’implémentation d’un *point de sauvegarde* Apache Flink par le service géré pour Apache Flink. Un instantané est une sauvegarde de l’état de l’application déclenchée, créée et gérée par un utilisateur ou un service. Pour plus d'informations sur les points de sauvegarde d'Apache Flink, consultez la section Points de [sauvegarde de la documentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) d'Apache Flink. À l'aide des instantanés, vous pouvez redémarrer une application à partir d'un instantané spécifique de l'état de l'application.

**Note**  
Nous recommandons que votre application crée un instantané plusieurs fois par jour pour redémarrer correctement avec des données d’état correctes. La fréquence correcte pour vos instantanés dépend de la logique métier de votre application. La prise de snapshots fréquents vous permet de récupérer des données plus récentes, mais cela augmente les coûts et nécessite davantage de ressources système.

Dans le service géré pour Apache Flink, vous pouvez gérer les instantanés à l’aide des actions API suivantes :
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

Pour connaître la limite du nombre d’instantanés par application, consultez [Service géré pour Apache Flink et quota de blocs-notes Studio](limits.md). Si votre application atteint la limite d’intantanés, la création manuelle d’un instantané échoue avec une `LimitExceededException`. 

Le service géré pour Apache Flink ne supprime jamais les instantanés. Vous devez supprimer les instantanés manuellement à l’aide de l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html).

Pour charger un instantané enregistré de l’état de l’application lors du démarrage d’une application, utilisez le paramètre [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html) de l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html).

**Topics**
+ [Gérez la création automatique de snapshots](#how-fault-snapshot-update)
+ [Restaurer à partir d'un instantané contenant des données d'état incompatibles](#how-fault-snapshot-restore)
+ [Consultez des exemples d'API de capture instantanée](#how-fault-snapshot-examples)

## Gérez la création automatique de snapshots
<a name="how-fault-snapshot-update"></a>

Si `SnapshotsEnabled` ce paramètre est défini sur [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)pour l'application, Managed Service for Apache Flink crée et utilise automatiquement des instantanés lorsque l'application est mise à jour, redimensionnée ou arrêtée afin de fournir une sémantique de traitement unique. `true`

**Note**  
La définition de `ApplicationSnapshotConfiguration::SnapshotsEnabled` sur `false` entraînera une perte de données lors des mises à jour de l’application.

**Note**  
Le service géré pour Apache Flink déclenche des points de sauvegarde intermédiaires lors de la création automatique d’instantanés. Pour la version 1.15 ou ultérieure de Flink, les points de sauvegarde intermédiaires ne provoquent plus d’effets secondaires. Voir [Déclenchement de points de sauvegarde.](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)

Les instantanés créés automatiquement présentent les qualités suivantes :
+ L'instantané est géré par le service, mais vous pouvez le voir à l'aide de l'[ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)action. Les instantanés créés automatiquement sont pris en compte dans votre limite d’instantanés.
+ Si votre application dépasse la limite d’instantanés, les instantanés créés manuellement échoueront, mais le service géré pour Apache Flink créera toujours des instantanés lorsque l’application sera mise à jour, mise à l’échelle ou arrêtée. Vous devez supprimer manuellement les instantanés à l'aide de cette [ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)action avant de créer d'autres instantanés manuellement.

## Restaurer à partir d'un instantané contenant des données d'état incompatibles
<a name="how-fault-snapshot-restore"></a>

Les instantanés contenant des informations sur les opérateurs, la restauration des données d’état à partir d’un instantané d’un opérateur qui a changé depuis la version précédente de l’application peut avoir des résultats inattendus. Une application rencontrera un échec si elle tente de restaurer les données d’état à partir d’un instantané qui ne correspond pas à l’opérateur actuel. De plus, l’application sera bloquée à l’état `STOPPING` ou `UPDATING`. 

Pour autoriser une application à effectuer une restauration à partir d'un instantané contenant des données d'état incompatibles, définissez le `AllowNonRestoredState` paramètre de [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)à à à l'`true`aide de l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action.

Vous constaterez le comportement suivant lorsqu’une application est restaurée à partir d’un instantané obsolète :
+ **Opérateur ajouté :** si un nouvel opérateur est ajouté, le point de sauvegarde ne contient aucune donnée d’état pour le nouvel opérateur. Aucun défaut ne se produira et il n’est pas nécessaire de définir `AllowNonRestoredState`.
+ **Opérateur supprimé :** si un opérateur existant est supprimé, le point de sauvegarde contient les données d’état de l’opérateur manquant. Une erreur se produira à moins que `AllowNonRestoredState` ne soit défini sur `true`.
+ **Modifié par l’opérateur :** si des modifications compatibles sont apportées, telles que le remplacement du type d’un paramètre par un type compatible, l’application peut effectuer une restauration à partir de l’instantané obsolète. Pour plus d'informations sur la restauration à partir de snapshots, consultez la section [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) dans la documentation d'Apache Flink. Une application qui utilise Apache Flink version 1.8 ou ultérieure peut éventuellement être restaurée à partir d’un instantané avec un schéma différent. Une application qui utilise Apache Flink version 1.6 ne peut pas être restaurée. Pour les two-phase-commit récepteurs, nous recommandons d'utiliser un instantané du système (SwS) au lieu d'un instantané créé par l'utilisateur (CreateApplicationSnapshot).

  Pour Flink, le service géré pour Apache Flink déclenche des points de sauvegarde intermédiaires lors de la création automatique d’instantanés. À partir de la version 1.15 de Flink, les points de sauvegarde intermédiaires ne provoquent plus d’effets secondaires. Consultez [Triggering savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Si vous devez reprendre une application incompatible avec les données de point de sauvegarde existantes, nous vous recommandons d'ignorer la restauration à partir de l'instantané en définissant le `ApplicationRestoreType` paramètre de l'[StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)action sur. `SKIP_RESTORE_FROM_SNAPSHOT`

Pour plus d’informations sur la façon dont Apache Flink gère les données d’état incompatibles, consultez [State Schema Evolution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) dans la *documentation Apache Flink*.

## Consultez des exemples d'API de capture instantanée
<a name="how-fault-snapshot-examples"></a>

Cette section inclut des exemples de demandes d’actions d’API pour utiliser des instantanés avec une application. Pour obtenir des informations sur l’utilisation d’un fichier JSON comme entrée pour une action d’API, consultez [Exemple de code de service géré pour l'API Apache Flink](api-examples.md).

### Activer les instantanés pour une application
<a name="how-fault-savepoint-examples-enable"></a>

L’exemple de demande suivant pour l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) active les instantanés pour une application :

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Créer un instantané
<a name="how-fault-savepoint-examples-create"></a>

L’exemple de code de demande suivant pour l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html) crée un instantané de l’état actuel de l’application :

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Répertorier les instantanés d'une application
<a name="how-fault-snapshot-examples-list"></a>

L’exemple de code de demande suivant pour l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) répertorie les 50 premiers instantanés de l’état actuel de l’application :

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### Afficher les détails d'un instantané d'application
<a name="how-fault-snapshot-examples-describe"></a>

L’exemple de demande suivant pour l’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) répertorie les informations spécifiques à un instantané d’application :

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Suppression d’un instantané
<a name="how-fault-snapshot-examples-delete"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) suivant supprime un instantané précédemment enregistré. Vous pouvez obtenir la valeur `SnapshotCreationTimestamp` en utilisant [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) ou [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) :

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Redémarrer une application à l'aide d'un instantané nommé
<a name="how-fault-snapshot-examples-load-custom"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) suivant démarre l’application en utilisant l’état enregistré à partir d’un instantané spécifique :

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Redémarrer une application à l'aide de l'instantané le plus récent
<a name="how-fault-snapshot-examples-load-recent"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) suivant démarre l’application en utilisant l’instantané le plus récent :

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Redémarrer une application sans capture instantanée
<a name="how-fault-snapshot-examples-load-none"></a>

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html) suivant démarre l’application sans charger l’état de l’application, même si un instantané est présent :

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Utiliser des mises à niveau de version sur place pour Apache Flink
<a name="how-in-place-version-upgrades"></a>

Grâce aux mises à niveau de version sur place pour Apache Flink, vous conservez la traçabilité des applications par rapport à un seul ARN pour toutes les versions d'Apache Flink. Cela inclut les instantanés, les journaux, les métriques, les balises, les configurations Flink, les augmentations des limites de ressources VPCs, etc. 

Vous pouvez effectuer des mises à niveau de version sur place pour Apache Flink afin de mettre à niveau les applications existantes vers une nouvelle version de Flink dans Amazon Managed Service pour Apache Flink. Pour effectuer cette tâche, vous pouvez utiliser le AWS SDK AWS CLI AWS CloudFormation, ou le AWS Management Console.

**Note**  
Vous ne pouvez pas utiliser les mises à niveau de version sur place pour Apache Flink avec Amazon Managed Service pour Apache Flink Studio.

**Topics**
+ [Mettez à niveau les applications à l'aide de mises à niveau de version sur place pour Apache Flink](upgrading-applications.md)
+ [Mettez à niveau votre application vers une nouvelle version d'Apache Flink](upgrading-application-new-version.md)
+ [Annulation des mises à niveau des applications](rollback.md)
+ [Bonnes pratiques générales et recommandations pour les mises à niveau des applications](best-practices-recommendations.md)
+ [Précautions et problèmes connus liés aux mises à niveau des applications](precautions.md)
+ [Mise à niveau vers Flink 2.2 : guide complet](flink-2-2-upgrade-guide.md)
+ [Guide de compatibilité d'état pour les mises à niveau de Flink 2.2](state-compatibility.md)

# Mettez à niveau les applications à l'aide de mises à niveau de version sur place pour Apache Flink
<a name="upgrading-applications"></a>

Avant de commencer, nous vous recommandons de regarder cette vidéo : [Mises à niveau des versions sur place](https://www.youtube.com/watch?v=f1qGGdaP2XI).

Pour effectuer des mises à niveau de version sur place pour Apache Flink, vous pouvez utiliser le AWS CLI AWS SDK ou le. AWS CloudFormation AWS Management Console Vous pouvez utiliser cette fonctionnalité avec toutes les applications existantes que vous utilisez avec le service géré pour Apache Flink à `RUNNING` l'état `READY` or. Il utilise l' UpdateApplication API pour ajouter la possibilité de modifier le runtime de Flink.

## Avant la mise à niveau : mettez à jour votre application Apache Flink
<a name="before-upgrading"></a>

Lorsque vous écrivez vos applications Flink, vous les regroupez avec leurs dépendances dans un fichier JAR d'applications et vous téléchargez le fichier JAR dans votre compartiment Amazon S3. À partir de là, Amazon Managed Service pour Apache Flink exécute la tâche dans le nouveau moteur d'exécution Flink que vous avez sélectionné. Vous devrez peut-être mettre à jour vos applications pour assurer la compatibilité avec le moteur d'exécution Flink vers lequel vous souhaitez effectuer la mise à niveau. Des incohérences entre les versions de Flink peuvent entraîner l'échec de la mise à niveau de la version. Le plus souvent, cela se fera avec des connecteurs pour les sources (entrée) ou les destinations (récepteurs, sorties) et les dépendances Scala. Les versions 1.15 et ultérieures de Managed Service for Apache Flink sont indépendantes de Scala et votre fichier JAR doit contenir la version de Scala que vous prévoyez d'utiliser.

**Pour mettre à jour votre application**

1. Lisez les conseils de la communauté Flink sur la mise à niveau des applications avec State. Consultez la section [Mise à niveau des applications et des versions de Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Consultez la liste des problèmes et des limites connus. Consultez [Précautions et problèmes connus liés aux mises à niveau des applications](precautions.md).

1. Mettez à jour vos dépendances et testez vos applications localement. Ces dépendances sont généralement les suivantes :

   1. Le runtime et l'API Flink.

   1. Connecteurs recommandés pour le nouveau moteur d'exécution de Flink. Vous pouvez les trouver dans [les versions Release](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) du moteur d'exécution spécifique vers lequel vous souhaitez effectuer la mise à jour.

   1. Scala — Apache Flink est indépendant de Scala à partir de Flink 1.15 inclus. Vous devez inclure les dépendances Scala que vous souhaitez utiliser dans le JAR de votre application.

1. Créez un nouveau fichier JAR d'application sur un fichier zip et chargez-le sur Amazon S3. Nous vous recommandons d'utiliser un nom différent de celui du fichier JAR/ZIP précédent. Si vous devez revenir en arrière, vous utiliserez ces informations.

1. Si vous exécutez des applications dynamiques, nous vous recommandons vivement de prendre un instantané de votre application actuelle. Cela vous permet de revenir en arrière de manière dynamique si vous rencontrez des problèmes pendant ou après la mise à niveau. 

# Mettez à niveau votre application vers une nouvelle version d'Apache Flink
<a name="upgrading-application-new-version"></a>

Vous pouvez mettre à niveau votre application Flink en utilisant cette [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action.

Vous pouvez appeler l'`UpdateApplication`API de différentes manières :
+ Utilisez le flux de travail **de configuration** existant sur le AWS Management Console.
  + Accédez à la page de votre application sur le AWS Management Console.
  + Choisissez **Configurer**.
  + Sélectionnez le nouveau runtime et le snapshot à partir desquels vous souhaitez démarrer, également appelé configuration de restauration. Utilisez le dernier paramètre comme configuration de restauration pour démarrer l'application à partir du dernier instantané. Pointez sur la nouvelle application mise à niveau JAR/zip sur Amazon S3.
+ Utilisez l'action de AWS CLI [mise à jour de l'application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html).
+ Utiliser CloudFormation (CFN).
  + Mettez à jour le [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment)champ. Auparavant, vous CloudFormation supprimiez l'application et créez-en une nouvelle, ce qui entraînait la perte de vos instantanés et de l'historique des autres applications. Met désormais CloudFormation à jour votre RuntimeEnvironment place et ne supprime pas votre application. 
+ Utilisez le AWS SDK.
  + Consultez la documentation du SDK pour le langage de programmation de votre choix. Consultez [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

Vous pouvez effectuer la mise à niveau alors que l'application est en `RUNNING` état ou pendant que l'application est arrêtée. `READY` Amazon Managed Service pour Apache Flink effectue une validation afin de vérifier la compatibilité entre la version d'exécution d'origine et la version d'exécution cible. Ce contrôle de compatibilité s'exécute lorsque vous effectuez une mise à niveau [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)alors que vous êtes dans `RUNNING` l'état ou le suivant [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)si vous effectuez une mise à niveau alors que vous êtes dans `READY` l'état. 

## Mettre à niveau une application en `RUNNING` état
<a name="upgrading-running"></a>

L'exemple suivant montre la mise à niveau d'une application dans `RUNNING` l'état nommé `UpgradeTest` Flink 1.18 dans l'est des États-Unis (Virginie du Nord) à l'aide de l'application mise à niveau AWS CLI et le démarrage de l'application mise à niveau à partir du dernier instantané. 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ Si vous avez activé les instantanés de service et que vous souhaitez poursuivre l'application à partir du dernier instantané, Amazon Managed Service pour Apache Flink vérifie que le moteur d'exécution de l'`RUNNING`application en cours est compatible avec le moteur d'exécution cible sélectionné.
+ Si vous avez spécifié un instantané à partir duquel poursuivre le runtime cible, Amazon Managed Service pour Apache Flink vérifie que le runtime cible est compatible avec le snapshot spécifié. Si le contrôle de compatibilité échoue, votre demande de mise à jour est rejetée et votre application reste inchangée dans son `RUNNING` état.
+ Si vous choisissez de démarrer votre application sans capture instantanée, Amazon Managed Service pour Apache Flink n'effectue aucun contrôle de compatibilité.
+ Si votre application mise à niveau échoue ou reste bloquée dans un `UPDATING` état transitif, suivez les instructions de la [Annulation des mises à niveau des applications](rollback.md) section pour revenir à l'état sain. 

**Flux de processus pour exécuter des applications d'état**

![\[Le schéma suivant représente le flux de travail recommandé pour mettre à niveau l'application en cours d'exécution. Nous partons du principe que l'application est dynamique et que vous avez activé les instantanés. Pour ce flux de travail, lors de la mise à jour, vous restaurez l'application à partir du dernier instantané automatiquement pris par Amazon Managed Service pour Apache Flink avant la mise à jour.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/in-place-update-while-running.png)


## Mettre à niveau une application à l'état **PRÊT**
<a name="upgrading-ready"></a>

L'exemple suivant montre la mise à niveau d'une application dans `READY` l'état nommé `UpgradeTest` Flink 1.18 dans l'est des États-Unis (Virginie du Nord) à l'aide du. AWS CLI Aucun instantané n'est spécifié pour démarrer l'application car celle-ci n'est pas en cours d'exécution. Vous pouvez spécifier un instantané lorsque vous émettez la demande de démarrage de l'application.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ Vous pouvez mettre à jour le temps d'exécution de vos applications en fonction `READY` de n'importe quelle version de Flink. Amazon Managed Service pour Apache Flink n'exécute aucune vérification tant que vous n'avez pas démarré votre application.
+  Amazon Managed Service pour Apache Flink effectue des contrôles de compatibilité uniquement par rapport à l'instantané que vous avez sélectionné pour démarrer l'application. Il s'agit de contrôles de compatibilité de base conformes au [tableau de compatibilité de Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table). Ils vérifient uniquement la version de Flink avec laquelle l'instantané a été pris et la version de Flink que vous ciblez. Si le runtime Flink du snapshot sélectionné est incompatible avec le nouveau runtime de l'application, la demande de démarrage peut être rejetée.

**Flux de processus pour les applications prêtes à l'emploi**

![\[Le schéma suivant représente le flux de travail recommandé pour mettre à niveau l'application lorsqu'elle est prête. Nous partons du principe que l'application est dynamique et que vous avez activé les instantanés. Pour ce flux de travail, lors de la mise à jour, vous restaurez l'application à partir du dernier instantané automatiquement pris par Amazon Managed Service pour Apache Flink lorsque l'application a été arrêtée.\]](http://docs.aws.amazon.com/fr_fr/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Annulation des mises à niveau des applications
<a name="rollback"></a>

Si vous rencontrez des problèmes avec votre application ou si vous constatez des incohérences dans le code de votre application entre les versions de Flink, vous pouvez revenir en arrière à l'aide du AWS CLI AWS SDK ou du. AWS CloudFormation AWS Management Console Les exemples suivants montrent à quoi ressemble la rétrogradation dans différents scénarios de défaillance.

## La mise à niveau de l'exécution a réussi, `RUNNING` l'application est en cours, mais la tâche échoue et redémarre continuellement
<a name="succeeded-restarting"></a>

Supposons que vous essayez de mettre à niveau une application dynamique nommée Flink 1.15 `TestApplication` vers Flink 1.18 dans l'est des États-Unis (Virginie du Nord). Cependant, l'application Flink 1.18 mise à niveau ne démarre pas ou redémarre constamment, même si l'application est en cours. `RUNNING` Il s'agit d'un scénario de défaillance courant. Pour éviter de nouveaux temps d'arrêt, nous vous recommandons de rétablir immédiatement la version précédente de votre application (Flink 1.15) et de diagnostiquer le problème ultérieurement.

Pour rétablir la version précédente de l'application, utilisez la AWS CLI commande [rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) ou l'action [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API. Cette action d'API annule les modifications que vous avez apportées et qui ont abouti à la dernière version. Il redémarre ensuite votre application en utilisant le dernier instantané réussi. 

Nous vous recommandons vivement de prendre un instantané avec votre application existante avant de tenter de procéder à la mise à niveau. Cela permettra d'éviter la perte de données ou le retraitement des données. 

Dans ce scénario d'échec, l'application ne CloudFormation sera pas annulée à votre place. Vous devez mettre à jour le CloudFormation modèle pour qu'il pointe vers le runtime précédent et vers le code précédent pour forcer la mise CloudFormation à jour de l'application. Dans le cas contraire, CloudFormation suppose que votre application a été mise à jour lorsqu'elle passe à l'`RUNNING`état actuel.

## Annulation d'une application bloquée `UPDATING`
<a name="stuck-updating"></a>

Si votre application reste bloquée à l'`AUTOSCALING`état `UPDATING` ou après une tentative de mise à niveau, Amazon Managed Service pour Apache Flink propose la AWS CLI commande [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html), ou l'action [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API qui permet de rétablir la version de l'application avant le blocage `UPDATING` ou l'état. `AUTOSCALING` Cette API annule les modifications que vous avez apportées qui ont bloqué l'application `UPDATING` ou l'ont bloquée dans un état `AUTOSCALING` transitif.

# Bonnes pratiques générales et recommandations pour les mises à niveau des applications
<a name="best-practices-recommendations"></a>
+ Testez le nouvel état job/runtime sans état dans un environnement hors production avant de tenter une mise à niveau de production.
+ Envisagez de tester d'abord la mise à niveau dynamique avec une application hors production.
+ Assurez-vous que l'état de votre nouveau graphe de tâches est compatible avec l'instantané que vous utiliserez pour démarrer votre application mise à niveau.
  + Assurez-vous que les types enregistrés dans les états de l'opérateur restent les mêmes. Si le type a changé, Apache Flink ne peut pas restaurer l'état de l'opérateur.
  + Assurez-vous que l'opérateur que IDs vous avez défini à l'aide de la `uid` méthode reste le même. Apache Flink recommande vivement d'attribuer des données uniques IDs aux opérateurs. Pour plus d'informations, consultez la section [Affectation d'un opérateur IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) dans la documentation d'Apache Flink.

    Si vous ne les attribuez pas IDs à vos opérateurs, Flink les génère automatiquement. Dans ce cas, elles peuvent dépendre de la structure du programme et, si elles sont modifiées, elles peuvent entraîner des problèmes de compatibilité. Flink utilise Operator IDs pour faire correspondre l'état de l'instantané à l'opérateur. Le changement IDs d'opérateur empêche le démarrage de l'application ou la suppression de l'état enregistré dans le cliché et le démarrage du nouvel opérateur sans état.
  + Ne modifiez pas la clé utilisée pour enregistrer l'état saisi.
  + Ne modifiez pas le type d'entrée des opérateurs dynamiques tels que window ou join. Cela change implicitement le type de l'état interne de l'opérateur, provoquant une incompatibilité d'état.

# Précautions et problèmes connus liés aux mises à niveau des applications
<a name="precautions"></a>

## Kafka Commit lors du point de contrôle échoue à plusieurs reprises après le redémarrage d'un broker
<a name="apache-kafka-connector"></a>

Il existe un problème connu d'Apache Flink open source avec le connecteur Apache Kafka dans la version 1.15 de Flink, causé par un bogue critique du client Kafka open source dans le client Kafka 2.8.1. Pour plus d'informations, consultez [Kafka Commit lorsque le point de contrôle échoue à plusieurs reprises après le redémarrage d'un broker](https://issues.apache.org/jira/browse/FLINK-28060) et [KafkaConsumer ne parvient pas à rétablir la connexion au coordinateur de groupe après commitOffsetAsync ](https://issues.apache.org/jira/browse/KAFKA-13840) une exception.

Pour éviter ce problème, nous vous recommandons d'utiliser Apache Flink 1.18 ou version ultérieure dans Amazon Managed Service pour Apache Flink.

## Limites connues de la compatibilité des états
<a name="state-precautions"></a>
+ Si vous utilisez l'API Table, Apache Flink ne garantit pas la compatibilité des états entre les versions de Flink. Pour plus d'informations, consultez [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) dans la documentation d'Apache Flink.
+ Les états de Flink 1.6 ne sont pas compatibles avec Flink 1.18. L'API rejette votre demande si vous essayez de passer de la version 1.6 à la version 1.18 ou ultérieure avec state. Vous pouvez effectuer une mise à niveau vers les versions 1.8, 1.11, 1.13 et 1.15 et prendre un instantané, puis passer à la version 1.18 ou ultérieure. Pour plus d'informations, consultez la section [Mise à niveau des applications et des versions de Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/) dans la documentation d'Apache Flink.

## Problèmes connus liés au connecteur Flink Kinesis
<a name="kinesis-connector-precautions"></a>
+ Si vous utilisez Flink 1.11 ou une version antérieure et que vous utilisez le `amazon-kinesis-connector-flink` connecteur pour le support Enhanced-fan-out (EFO), vous devez prendre des mesures supplémentaires pour effectuer une mise à niveau dynamique vers Flink 1.13 ou version ultérieure. Cela est dû à la modification du nom du package du connecteur. Pour de plus amples informations, veuillez consulter [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  Le `amazon-kinesis-connector-flink` connecteur pour Flink 1.11 et versions antérieures utilise le package`software.amazon.kinesis`, tandis que le connecteur Kinesis pour Flink 1.13 et versions ultérieures l'utilise. `org.apache.flink.streaming.connectors.kinesis` Utilisez cet outil pour faciliter votre migration : [amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator).
+ Si vous utilisez Flink 1.13 ou une version antérieure `FlinkKinesisProducer` et que vous effectuez une mise à niveau vers Flink 1.15 ou version ultérieure, pour une mise à niveau dynamique, vous devez continuer à utiliser Flink 1.15 ou version ultérieure, `FlinkKinesisProducer` au lieu de la version plus récente. `KinesisStreamsSink` Toutefois, si vous avez déjà un `uid` ensemble personnalisé sur votre évier, vous devriez pouvoir passer à un kit `KinesisStreamsSink` car il `FlinkKinesisProducer` ne conserve pas l'état. Flink le traitera comme le même opérateur car une personnalisation `uid` est définie.

## Applications Flink écrites en Scala
<a name="scala-precautions"></a>
+ Depuis Flink 1.15, Apache Flink n'inclut pas Scala dans le runtime. Vous devez inclure la version de Scala que vous souhaitez utiliser et les autres dépendances de Scala dans votre code JAR/zip lors de la mise à niveau vers Flink 1.15 ou version ultérieure. Pour plus d'informations, consultez la version [1.15.2 d'Amazon Managed Service pour Apache Flink pour Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ Si votre application utilise Scala et que vous la mettez à niveau de Flink 1.11 ou version antérieure (Scala 2.11) vers Flink 1.13 (Scala 2.12), assurez-vous que votre code utilise Scala 2.12. Sinon, votre application Flink 1.13 risque de ne pas trouver les classes Scala 2.11 dans le runtime Flink 1.13.

## Points à prendre en compte lors de la rétrogradation de l'application Flink
<a name="downgrading-precautions"></a>
+ La rétrogradation des applications Flink est possible, mais limitée aux cas où l'application était précédemment exécutée avec l'ancienne version de Flink. Pour une mise à niveau dynamique, le service géré pour Apache Flink nécessitera l'utilisation d'un instantané pris avec une version correspondante ou antérieure pour la rétrogradation.
+ Si vous mettez à jour votre environnement d'exécution de Flink 1.13 ou version ultérieure vers Flink 1.11 ou version antérieure, et si votre application utilise le backend d' HashMap état, votre application échouera continuellement.

# Mise à niveau vers Flink 2.2 : guide complet
<a name="flink-2-2-upgrade-guide"></a>

Ce guide fournit des step-by-step instructions pour mettre à niveau votre application Amazon Managed Service for Apache Flink de Flink 1.x vers Flink 2.2. Il s'agit d'une mise à niveau majeure avec des modifications importantes qui nécessitent une planification et des tests minutieux.

**La mise à niveau de la version majeure est unidirectionnelle**  
L'opération de mise à niveau peut déplacer votre application de Flink 1.x vers la version 2.2 avec préservation de l'état, mais vous ne pouvez pas revenir de la version 2.2 à la version 1.x avec l'état 2.2. Si votre application ne fonctionne plus correctement après la mise à niveau, utilisez l'API Rollback pour revenir à la version 1.x avec l'état 1.x d'origine enregistré dans le dernier instantané.

## Conditions préalables
<a name="upgrade-guide-prerequisites"></a>

Avant de commencer votre mise à niveau :
+ Révision [Changements majeurs et dépréciations](flink-2-2.md#flink-2-2-breaking-changes)
+ Révision [Guide de compatibilité d'état pour les mises à niveau de Flink 2.2](state-compatibility.md)
+ Assurez-vous de disposer d'un environnement hors production pour les tests
+ Documentez la configuration et les dépendances actuelles de votre application

## Comprendre vos parcours de migration
<a name="upgrade-guide-migration-paths"></a>

Votre expérience de mise à niveau dépend de la compatibilité de votre application avec Flink 2.2. La compréhension de ces parcours vous aide à vous préparer de manière appropriée et à définir des attentes réalistes.

**Chemin 1 : Binaire compatible et état de l'application**

**À quoi s'attendre :**
+ Invoquez l'opération de mise à niveau
+ Terminez la migration vers la version 2.2 avec la transition du statut de l'application : `RUNNING` → → `UPDATING` `RUNNING`
+ Préservez l'état de toutes les applications sans perte de données ni retraitement
+ Même expérience que les migrations de versions mineures

Idéal pour : les applications apatrides ou les applications utilisant une sérialisation compatible (Avro, schémas Protobuf compatibles, sans collections) POJOs 

**Voie 2 : Incompatibilités binaires**

**À quoi s'attendre :**
+ Invoquez l'opération de mise à niveau
+ L'opération échoue et révèle l'incompatibilité binaire via l'API Operations et les journaux
+ Lorsque la restauration automatique est activée : les applications sont automatiquement annulées en quelques minutes sans votre intervention
+ Lorsque la restauration automatique est désactivée : les applications restent en cours d'exécution sans traitement des données ; vous revenez manuellement à l'ancienne version
+ Une fois le binaire corrigé, utilisez l'[UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) pour une expérience similaire à Path 1

Idéal pour : les applications utilisant des applications supprimées APIs détectées lors du démarrage d'une tâche Flink

**Chemin 3 : État de l'application incompatible**

**À quoi s'attendre :**
+ Invoquez l'opération de mise à niveau
+ La migration semble réussir au début
+ Les applications entrent dans des boucles de redémarrage en quelques secondes lorsque la restauration de l'état échoue
+ Détectez les défaillances grâce à CloudWatch des métriques indiquant des redémarrages continus
+ Invoquer manuellement l'opération Rollback
+ Revenez à la production dans les minutes qui suivent le lancement du rollback
+ Révision [Migration d'État](state-compatibility.md#state-compat-migration) de votre candidature

Idéal pour : les applications présentant des incompatibilités de sérialisation d'états (POJOs avec des collections, certains états sérialisés en Kryo)

**Note**  
Il est vivement recommandé de créer une réplique de votre application de production et de tester chacune des phases suivantes de la mise à niveau sur la réplique avant de suivre les mêmes étapes pour votre application de production.

## Phase 1 : Préparation
<a name="upgrade-guide-phase-1"></a>

**Mettre à jour le code d'application**

Mettez à jour le code de votre application pour qu'il soit compatible avec Flink 2.2 :
+ **Mettez à jour les dépendances de Flink** vers la version 2.2.0 dans votre ou `pom.xml` `build.gradle`
+ **Mettre à jour les dépendances du connecteur** vers des versions compatibles avec Flink 2.2 (voir) [Disponibilité du connecteur](flink-2-2.md#flink-2-2-connectors)
+ **Supprimez l'utilisation obsolète de l'API** :
  + Remplacer DataSet l'API par une DataStream API ou une table API/SQL
  + Remplacez l'ancien`SourceFunction`//`SinkFunction`par la source FLIP-27 et le récepteur FLIP-143 APIs
  + Remplacez l'utilisation de l'API Scala par l'API Java
+ **Mise à jour vers Java 17**

**Téléchargez le code d'application mis à jour**
+ Créez le fichier JAR de votre application avec les dépendances de Flink 2.2
+ Chargez vers Amazon S3 avec un **nom de fichier différent** de celui de votre fichier JAR actuel (par exemple,`my-app-flink-2.2.jar`)
+ Notez le compartiment S3 et la clé à utiliser lors de l'étape de mise à niveau

## Phase 2 : activer la restauration automatique
<a name="upgrade-guide-phase-2"></a>

La restauration automatique permet à Amazon Managed Service pour Apache Flink de revenir automatiquement à la version précédente en cas d'échec de la mise à niveau.

**Vérifier l'état de la restauration automatique**

*AWS Management Console:*

1. Accédez à votre application

1. Choisissez **la configuration**

1. Sous **Paramètres de l'application**, vérifiez que la **restauration du système** est activée

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Activer la restauration automatique (si elle n'est pas activée)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Phase 3 : Prendre un instantané (facultatif)
<a name="upgrade-guide-phase-3"></a>

Si les instantanés automatiques sont activés pour votre application, vous pouvez ignorer cette étape. Sinon, prenez un instantané de votre application pour enregistrer son état avant de procéder à la mise à niveau.

**Prendre un instantané depuis une application en cours d'exécution**

*AWS Management Console:*

1. Accédez à votre application

1. Choisissez **Snapshots**

1. Choisissez **Créer un instantané**

1. Entrez le nom d'un instantané (par exemple,`pre-flink-2.2-upgrade`)

1. Sélectionnez **Create** (Créer).

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Vérifier la création de snapshots**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Attendez jusqu'à ce que ce `SnapshotStatus` soit `READY` le cas avant de continuer.

## Phase 4 : mise à niveau de l'application
<a name="upgrade-guide-phase-4"></a>

Vous pouvez mettre à niveau votre application Flink à l'aide de cette [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action.

Vous pouvez appeler l'`UpdateApplication`API de différentes manières :
+ **Utilisez le AWS Management Console.**
  + Accédez à la page de votre application sur le AWS Management Console.
  + Choisissez **Configurer**.
  + Sélectionnez le nouveau runtime et le snapshot à partir desquels vous souhaitez démarrer, également appelé configuration de restauration. Utilisez le dernier paramètre comme configuration de restauration pour démarrer l'application à partir du dernier instantané. Pointez sur la nouvelle application mise à niveau JAR/zip sur Amazon S3.
+ **Utilisez l' AWS CLI[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)**action.
+ **Utiliser CloudFormation.**
  + Mettez à jour le `RuntimeEnvironment` champ. Auparavant, vous CloudFormation supprimiez l'application et créez-en une nouvelle, ce qui entraînait la perte de vos instantanés et de l'historique des autres applications. Met désormais CloudFormation à jour votre `RuntimeEnvironment` place et ne supprime pas votre application.
+ **Utilisez le AWS SDK.**
  + Consultez la documentation du SDK pour le langage de programmation de votre choix. Consultez [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

Vous pouvez effectuer la mise à niveau pendant que l'application est en `RUNNING` état ou pendant que l'application est arrêtée. `READY` Amazon Managed Service pour Apache Flink valide la compatibilité entre la version d'exécution d'origine et la version d'exécution cible. Ce contrôle de compatibilité s'exécute lorsque vous effectuez une mise à niveau `UpdateApplication` alors que vous êtes dans `RUNNING` l'état ou le suivant `StartApplication` si vous effectuez une mise à niveau alors que vous êtes dans `READY` l'état.

**Mise à niveau depuis l'état RUNNING**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Mise à niveau depuis l'état PRÊT**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Phase 5 : mise à niveau du moniteur
<a name="upgrade-guide-phase-5"></a>

**Contrôle de compatibilité**
+ Utilisez l'API Operations pour vérifier l'état de la mise à niveau. En cas d'incompatibilités binaires ou de problèmes lors du démarrage de la tâche, l'opération de mise à niveau échouera avec les journaux.
+ Si l'opération de mise à niveau a réussi mais que l'application est bloquée dans des boucles de redémarrage, cela signifie que l'état est incompatible avec la nouvelle version de Flink ou qu'il y a un problème avec le code mis à jour. Découvrez [Guide de compatibilité d'état pour les mises à niveau de Flink 2.2](state-compatibility.md) comment identifier les problèmes d'incompatibilité entre états.

**Surveiller l'état des applications**

*État de l'application :*
+ Le statut de la candidature devrait changer : `RUNNING` → `UPDATING` → `RUNNING`
+ Vérifiez le temps d'exécution de l'application. S'il s'agit de la version 2.2, l'opération de mise à niveau a réussi.
+ Si votre application est active `RUNNING` mais qu'elle utilise toujours l'ancien environnement d'exécution, le rollback automatique est activé. L'API des opérations affichera l'opération sous la forme`FAILED`. Consultez les journaux pour trouver l'exception en cas d'échec.

En outre, surveillez ces indicateurs dans CloudWatch :

*Métrique de redémarrage :*
+ `numRestarts`: Surveillez les redémarrages inattendus : la mise à niveau est réussie si la valeur `numRestarts` est nulle `uptime` et/ou `runningTime` si elle augmente.

*Mesures relatives aux points de contrôle :*
+ `lastCheckpointDuration`: Doit être similaire aux valeurs d'avant la mise à niveau
+ `numberOfFailedCheckpoints`: Doit rester à 0

## Phase 6 : Valider le comportement de l'application
<a name="upgrade-guide-phase-6"></a>

Après l'exécution de l'application sur Flink 2.2 :

**Validation fonctionnelle**
+ Vérifiez que les données sont lues à partir des sources
+ Vérifiez que les données sont écrites sur les récepteurs
+ Vérifier que la logique métier produit les résultats attendus
+ Comparaison des résultats avec la base de référence avant la mise à niveau

**Validation des performances**
+ Surveiller les mesures de latence (temps end-to-end de traitement)
+ Surveiller les mesures de débit (enregistrements par seconde)
+ Surveiller la durée et la taille du point de contrôle
+ Surveiller l'utilisation de la mémoire et du processeur

**Courez pendant plus de 24 heures**

Permettez à l'application de fonctionner pendant au moins 24 heures en production afin de garantir :
+ Aucune fuite de mémoire
+ Comportement stable aux points de contrôle
+ Pas de redémarrages inattendus
+ Débit constant

## Phase 7 : Procédures de rétrogradation
<a name="upgrade-guide-phase-7"></a>

Si la mise à niveau échoue ou si l'application est en cours d'exécution mais ne fonctionne pas correctement, revenez à la version précédente.

**Annulation automatique**

Si la restauration automatique est activée et que la mise à niveau échoue au démarrage, Amazon Managed Service pour Apache Flink revient automatiquement à la version précédente.

**Annulation manuelle**

Si l'application est en cours d'exécution mais ne fonctionne pas correctement, utilisez l'`RollbackApplication`API :

*AWS Management Console:*

1. Accédez à votre application

1. Choisissez **Actions** **→ Annuler**

1. Confirmez le rollback

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**Que se passe-t-il lors du rollback :**
+ L'application s'arrête
+ Runtime revient à la version précédente de Flink
+ Le code de l'application revient au JAR précédent
+ L'application redémarre à partir du dernier instantané réussi pris **avant** la mise à niveau

**Important**  
Vous ne pouvez pas restaurer un instantané Flink 2.2 sur Flink 1.x
Le rollback utilise le snapshot pris avant la mise à niveau
Prenez toujours un instantané avant de procéder à la mise à niveau (phase 3)

## Étapes suivantes
<a name="upgrade-guide-next-steps"></a>

Pour toute question ou problème lors de la mise à niveau, consultez le Support [Résoudre les problèmes liés au service géré pour Apache Flink](troubleshooting.md) ou contactez le AWS Support.

# Guide de compatibilité d'état pour les mises à niveau de Flink 2.2
<a name="state-compatibility"></a>

Lors de la mise à niveau de Flink 1.x vers Flink 2.2, des problèmes de compatibilité d'état peuvent empêcher la restauration de votre application à partir de snapshots. Ce guide vous aide à identifier les problèmes de compatibilité potentiels et propose des stratégies de migration.

## Comprendre les modifications de compatibilité des états
<a name="state-compat-understanding"></a>

Amazon Managed Service pour Apache Flink 2.2 introduit plusieurs modifications de sérialisation qui affectent la compatibilité des états. Les principaux sont les suivants :
+ **Mise à niveau de la version Kryo** : Apache Flink 2.2 met à niveau le sérialiseur Kryo fourni de la version 2 à la version 5. Comme Kryo v5 utilise un format de codage binaire différent de celui de Kryo v2, aucun état d'opérateur sérialisé via Kryo dans un point de sauvegarde Flink 1.x ne peut pas être restauré dans Flink 2.2.
+ **Sérialisation des collections Java** : Dans Flink 1.x, les collections Java (telles que `HashMap``ArrayList`, et`HashSet`) POJOs étaient sérialisées à l'aide de Kryo. Flink 2.2 introduit des sérialiseurs optimisés spécifiques à la collection qui sont incompatibles avec l'état sérialisé KRYO de la version 1.x. Les applications utilisant des collections Java avec des sérialiseurs POJO ou Kryo dans la version 1.x ne peuvent pas restaurer cet état dans Flink 2.2. Consultez la [documentation](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/) de Flink pour plus de détails sur les types de données et la sérialisation.
+ **Compatibilité du connecteur Kinesis** : la version du connecteur Kinesis Data Streams (KDS) inférieure à 5.0 conserve un état incompatible avec le connecteur Flink 2.2 Kinesis version 6.0. Vous devez migrer vers la version 5.0 ou supérieure du connecteur avant de procéder à la mise à niveau.

## Référence de compatibilité de sérialisation
<a name="state-compat-reference"></a>

Passez en revue toutes les déclarations d'état de votre application et associez les types de sérialisation au tableau ci-dessous. Si un type d'état est incompatible, consultez la [Migration d'État](#state-compat-migration) section avant de procéder à la mise à niveau.


**Référence de compatibilité de sérialisation**  

| Type de sérialisation | Compatible ? | Détails | 
| --- | --- | --- | 
| Avril (SpecificRecord,GenericRecord) | Oui | Utilise son propre format binaire indépendant de Kryo. Assurez-vous d'utiliser les informations de type Avro natives de Flink, et non d'Avro enregistré en tant que sérialiseur Kryo. | 
| Protobuf | Oui | Utilise son propre codage binaire indépendant de Kryo. Vérifiez que les modifications du schéma respectent les règles d'évolution rétrocompatibles. | 
| POJOs sans collections | Oui | Géré par le sérialiseur POJO de Flink, mais uniquement si la classe répond à tous les critères POJO : classe publique, constructeur public no-arg, tous les champs publics ou accessibles via des getters/setters, et tous les types de champs eux-mêmes sérialisables par Flink. Un POJO qui enfreint l'une de ces règles revient silencieusement à Kryo et devient incompatible. | 
| Personnalisé TypeSerializers | Oui | Compatible uniquement si votre sérialiseur ne délègue pas à Kryo en interne. | 
| État des API SQL et Table | Oui (avec réserve) | Utilise les sérialiseurs internes de Flink. Cependant, Apache Flink ne garantit pas la compatibilité des états entre les versions principales des applications de l'API Table. Testez d'abord dans un environnement hors production. | 
| POJOs avec des collections Java (HashMap,ArrayList,HashSet) | Non | Dans Flink 1.x, les collections qu'il contient POJOs étaient sérialisées via Kryo v2. Flink 2.2 introduit des sérialiseurs de collection dédiés dont le format binaire est incompatible avec le format Kryo v2. | 
| Classes de cas Scala | Non | Sérialisé via Kryo dans Flink 1.x. La mise à niveau de Kryo v2 vers v5 modifie le format binaire. | 
| Enregistrements Java | Non | Revenez généralement à la sérialisation Kryo dans Flink 1.x. Vérifiez en testant avecdisableGenericTypes(). | 
| Types de bibliothèques tierces | Non | Les types sans sérialiseur personnalisé enregistré sont renvoyés à Kryo. Le changement de format binaire Kryo v2 à v5 rompt la compatibilité. | 
| Tout type utilisant Kryo fallback | Non | Si Flink ne peut pas gérer un type avec un sérialiseur intégré ou enregistré, il revient à Kryo. Tous les états sérialisés par Kryo à partir de la version 1.x sont incompatibles avec la version 2.2. | 

## Méthodes diagnostiques
<a name="state-compat-diagnostics"></a>

Vous pouvez identifier les problèmes de compatibilité des états de manière proactive en consultant les journaux des applications ou en les inspectant après l'opération de l'[UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

**Identifiez la solution de secours Kryo dans votre application**

Vous pouvez utiliser le modèle regex suivant dans vos journaux pour identifier la solution de repli Kryo dans votre application :

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Exemple de journal :

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

Si la mise à niveau échoue à l'aide de l' UpdateApplication API, les exceptions suivantes peuvent indiquer que vous rencontrez une incompatibilité d'état liée au sérialiseur :

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Liste de contrôle préalable à la mise à niveau
<a name="state-compat-checklist"></a>
+ Passez en revue toutes les déclarations d'État figurant dans votre demande
+ Vérifiez POJOs auprès des collections (`HashMap`,`ArrayList`,`HashSet`)
+ Vérifiez les méthodes de sérialisation pour chaque type d'état
+ Créez une application de réplication de production et testez la compatibilité des états à l'aide de l' UpdateApplication API sur cette réplique
+ Si l'état est incompatible, sélectionnez une stratégie parmi [Migration d'État](#state-compat-migration)
+ Activez la restauration automatique dans la configuration de votre application Flink de production

## Migration d'État
<a name="state-compat-migration"></a>

**Reconstruire l'état complet**

Idéal pour les applications où l'état peut être reconstruit à partir des données sources.

Si votre application peut rétablir son état à partir des données sources :

1. Arrêtez l'application Flink 1.x

1. Passez à Flink 2.x avec un code mis à jour

1. Commencez par `SKIP_RESTORE_FROM_SNAPSHOT`

1. Autoriser l'application à rétablir son état

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Bonnes pratiques
<a name="state-compat-best-practices"></a>

1. **Utilisez toujours Avro ou Protobuf pour les états complexes**. Ils permettent l'évolution du schéma et sont indépendants de Kryo

1. **Évitez les collections dans POJOs** — Utilisez le natif de Flink `ListState` et `MapState` à la place

1. **Testez la restauration de l'état localement** : avant la mise à niveau de production, effectuez un test avec des instantanés réels

1. **Prenez fréquemment des instantanés**, en particulier avant les mises à niveau majeures des versions

1. **Activer la restauration automatique** : configurez votre application MSF pour qu'elle soit automatiquement annulée en cas de défaillance

1. **Documentez vos types d'états** — Conservez la documentation de tous les types d'états et de leurs méthodes de sérialisation

1. **Surveillez la taille des points de contrôle : l'augmentation de la taille** des points de contrôle peut indiquer des problèmes de sérialisation

## Étapes suivantes
<a name="state-compat-next-steps"></a>

**Planifiez votre mise à niveau** : voir[Mise à niveau vers Flink 2.2 : guide complet](flink-2-2-upgrade-guide.md).

Pour toute question ou problème lors de la migration, consultez le Support [Résoudre les problèmes liés au service géré pour Apache Flink](troubleshooting.md) ou contactez le AWS Support.

# Implémenter le dimensionnement des applications dans le service géré pour Apache Flink
<a name="how-scaling"></a>

Vous pouvez configurer l’exécution parallèle des tâches et l’allocation de ressources pour le service géré Amazon pour Apache Flink afin d’implémenter la mise à l’échelle. Pour plus d'informations sur la façon dont Apache Flink planifie les instances parallèles de tâches, voir [Parallel Execution dans la documentation d'](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/)Apache Flink.

**Topics**
+ [Configuration du parallélisme des applications et du KPU ParallelismPer](#how-parallelism)
+ [Allouer des unités de traitement Kinesis](#how-scaling-kpus)
+ [Mettez à jour le parallélisme de votre application](#how-scaling-howto)
+ [Utiliser le dimensionnement automatique dans Managed Service pour Apache Flink](how-scaling-auto.md)
+ [Considérations relatives à maxParallelism](#how-scaling-auto-max-parallelism)

## Configuration du parallélisme des applications et du KPU ParallelismPer
<a name="how-parallelism"></a>

Vous configurez l’exécution parallèle des tâches votre application de service géré pour Apache Flink (telles que la lecture depuis une source ou l’exécution d’un opérateur) à l’aide des propriétés [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html) suivantes : 
+ `Parallelism` : utilisez cette propriété pour définir le parallélisme par défaut de l’application Apache Flink. Tous les opérateurs, sources et récepteurs s’exécutent avec ce parallélisme, sauf s’ils sont remplacés dans le code de l’application. La valeur par défaut est `1`, et la valeur maximale est `256`.
+ `ParallelismPerKPU` : utilisez cette propriété pour définir le nombre de tâches parallèles qui peuvent être planifiées par unité de traitement Kinesis (KPU) de votre application. La valeur par défaut est `1`, et la valeur maximale est `8`. Pour les applications comportant des opérations de blocage (par exemple, des E/S), une valeur plus élevée de `ParallelismPerKPU` entraîne une utilisation complète des ressources KPU.

**Note**  
La limite pour `Parallelism` est égale au nombre de `ParallelismPerKPU` fois la limite pour KPUs (64 par défaut). La KPUs limite peut être augmentée en demandant une augmentation de limite. Pour des instructions pour demander une augmentation de cette limite, consultez « Pour demander une augmentation de limite » dans [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

Pour plus d'informations sur la définition du parallélisme des tâches pour un opérateur spécifique, voir [Configuration du parallélisme : opérateur dans la](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) documentation d'Apache Flink.

## Allouer des unités de traitement Kinesis
<a name="how-scaling-kpus"></a>

Le service géré pour Apache Flink fournit la capacité en tant que KPUs. qUne seule unité KPU vous fournit 1 vCPU et 4 Go de mémoire. Pour chaque unité KPU allouée, 50 Go de stockage des applications en cours d’exécution sont également fournis. 

Le service géré pour Apache Flink calcule KPUs les éléments nécessaires pour exécuter votre application à l'aide des `ParallelismPerKPU` propriétés `Parallelism` et, comme suit :

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Le service géré pour Apache Flink fournit rapidement des ressources à vos applications en réponse aux pics de débit ou d’activité de traitement. Il supprime progressivement les ressources de votre application une fois le pic d’activité passé. Pour désactiver l’allocation automatique des ressources, définissez la valeur `AutoScalingEnabled` sur `false`, comme décrit plus loin dans [Mettez à jour le parallélisme de votre application](#how-scaling-howto). 

La limite par défaut KPUs pour votre application est de 64. Pour des instructions pour demander une augmentation de cette limite, consultez « Pour demander une augmentation de limite » dans [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**Note**  
Un KPU supplémentaire est facturé à des fins d’orchestration. Pour plus d’informations, consultez [Tarification du service géré pour Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Mettez à jour le parallélisme de votre application
<a name="how-scaling-howto"></a>

Cette section contient des exemples de demande d’action d’API qui définissent le parallélisme d’une application. Pour plus d’exemples et d’instructions sur l’utilisation de blocs de requête avec des actions d’API, consultez [Exemple de code de service géré pour l'API Apache Flink](api-examples.md).

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html) suivant définit le parallélisme lorsque vous créez une application :

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant définit le parallélisme pour une application existante :

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

L’exemple de demande d’action [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) suivant désactive le parallélisme pour une application existante :

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Utiliser le dimensionnement automatique dans Managed Service pour Apache Flink
<a name="how-scaling-auto"></a>

Le service géré pour Apache Flink adapte de manière élastique le parallélisme de votre application pour s’adapter au débit de données de votre source et à la complexité de votre opérateur dans la plupart des scénarios. La mise à l’échelle automatique est activée par défaut. Le service géré pour Apache Flink surveille l’utilisation des ressources (processeur) de votre application et adapte de manière élastique le parallélisme de votre application à la hausse ou à la baisse en conséquence :
+ Votre application augmente (augmente le parallélisme) si le maximum CloudWatch métrique `containerCPUUtilization` est supérieur à 75 % ou plus pendant 15 minutes. Cela signifie que l'`ScaleUp`action est lancée lorsqu'il existe 15 points de données consécutifs avec une période d'une minute égale ou supérieure à 75 %. Une `ScaleUp` action double la valeur `CurrentParallelism` de votre application. `ParallelismPerKPU`n'est pas modifié. En conséquence, le nombre de personnes allouées double KPUs également. 
+ Votre application réduit (diminue le parallélisme) lorsque l’utilisation de votre processeur reste inférieure à 10 % pendant six heures. Cela signifie que l'`ScaleDown`action est lancée lorsqu'il existe 360 points de données consécutifs avec une période d'une minute inférieure à 10 %. Une `ScaleDown` action réduit de moitié (arrondi vers le haut) le parallélisme de l'application. `ParallelismPerKPU`n'est pas modifié, et le nombre de personnes allouées est KPUs également réduit de moitié (arrondi au chiffre supérieur). 

**Note**  
Une période maximale de `containerCPUUtilization` plus d'une minute peut être référencée pour trouver la corrélation avec un point de données utilisé pour l'action Scaling, mais il n'est pas nécessaire de refléter le moment exact où l'action est initialisée.

Le service géré pour Apache Flink ne réduira pas la valeur `CurrentParallelism` de votre application à un niveau inférieur au paramètre `Parallelism` de votre application.

Lorsque le service géré pour Apache Flink met à l’échelle votre application, elle passe à l’état `AUTOSCALING`. Vous pouvez vérifier l'état actuel de votre candidature à l'aide [ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)des actions [ DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html)ou. Pendant que le service fait évoluer votre application, la seule action d'API valide que vous pouvez utiliser est celle [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)dont le `Force` paramètre est défini sur`true`.

Vous pouvez utiliser la propriété `AutoScalingEnabled` (partie de [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)) pour activer ou désactiver le comportement de l’autoscaling. Votre AWS compte est débité pour KPUs les prestations du service géré pour Apache Flink, qui dépendent de votre application `parallelism` et de vos `parallelismPerKPU` paramètres. Un pic d’activité augmente les coûts de votre service géré pour Apache Flink.

Pour obtenir des informations sur la tarification, consultez [Tarification du service géré pour Apache Flink](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

Notez les informations suivantes à propos de la mise à l’échelle de l’application :
+ La mise à l’échelle automatique est activée par défaut.
+ La mise à l’échelle ne s’applique pas aux blocs-notes Studio. Toutefois, si vous déployez un bloc-notes Studio en tant qu’application à état durable, la mise à l’échelle s’appliquera à l’application déployée.
+ La limite par défaut de votre application est de 64 KPUs. Pour de plus amples informations, veuillez consulter [Service géré pour Apache Flink et quota de blocs-notes Studio](limits.md).
+ Lorsque la mise à l’échelle automatique met à jour le parallélisme des applications, celles-ci subissent des interruptions. Pour éviter ces interruptions, procédez comme suit :
  + Désactiver la mise à l’échelle automatique
  + Configurez `parallelism` et utilisez l'[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)action `parallelismPerKPU` de votre application. Pour plus d'informations sur la définition des paramètres de parallélisme de votre application, consultez. [Mettez à jour le parallélisme de votre application](how-scaling.md#how-scaling-howto)
  + Surveillez régulièrement l’utilisation des ressources de votre application pour vérifier qu’elle dispose des paramètres de parallélisme adaptés à sa charge de travail. Pour obtenir des informations sur la surveillance de l’allocation des ressources, consultez [Métriques et dimensions dans le service géré pour Apache Flink](metrics-dimensions.md).

## Implémenter un autoscaling personnalisé
<a name="how-scaling-custom-autoscaling"></a>

Si vous souhaitez un contrôle plus précis de la mise à l'échelle automatique ou si vous souhaitez utiliser d'autres mesures de déclenchement`containerCPUUtilization`, vous pouvez utiliser cet exemple : 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  Cet exemple montre comment faire évoluer votre service géré pour l'application Apache Flink à l'aide d'une CloudWatch métrique différente de celle de l'application Apache Flink, y compris les métriques d'Amazon MSK et d'Amazon Kinesis Data Streams, utilisées comme sources ou récepteurs.

Pour plus d'informations, voir [Surveillance améliorée et dimensionnement automatique pour Apache Flink](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/).

## Implémenter l'autoscaling planifié
<a name="how-scaling-scheduled-autoscaling"></a>

Si votre charge de travail suit un profil prévisible au fil du temps, vous préférerez peut-être dimensionner votre application Apache Flink de manière préventive. Cela permet de faire évoluer votre application à une heure planifiée, par opposition à une mise à l'échelle réactive basée sur une métrique. Pour configurer le dimensionnement à la hausse ou à la baisse à des heures fixes de la journée, vous pouvez utiliser cet exemple :
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## Considérations relatives à maxParallelism
<a name="how-scaling-auto-max-parallelism"></a>

Le parallélisme maximal qu'une tâche Flink peut ajuster est limité *au minimum* pour `maxParallelism` tous les opérateurs de la tâche. Par exemple, si vous avez une tâche simple avec uniquement une source et un récepteur, et que la source a 16 et le récepteur 8, l'application ne peut pas évoluer au-delà du parallélisme de 8. `maxParallelism`

Pour savoir comment la valeur par défaut `maxParallelism` d'un opérateur est calculée et comment la remplacer, reportez-vous à la section [Définition du parallélisme maximal dans la](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism) documentation d'Apache Flink.

En règle générale, sachez que si vous ne définissez `maxParallelism` aucun opérateur et que vous démarrez votre application avec un parallélisme inférieur ou égal à 128, tous les opérateurs auront un `maxParallelism` de 128.

**Note**  
Le parallélisme maximal de la tâche est la limite supérieure du parallélisme pour faire évoluer votre application en conservant son état.   
Si vous modifiez `maxParallelism` une application existante, celle-ci ne pourra pas redémarrer à partir d'une capture d'écran précédente prise avec l'ancienne`maxParallelism`. Vous ne pouvez redémarrer l'application que sans capture d'écran.   
Si vous envisagez de dimensionner votre application à un parallélisme supérieur à 128, vous devez le définir explicitement `maxParallelism` dans votre application.
+ La logique de mise à l'échelle automatique empêchera le dimensionnement d'une tâche Flink jusqu'à un parallélisme supérieur au parallélisme maximal de la tâche.
+ Si vous utilisez un dimensionnement automatique personnalisé ou un dimensionnement planifié, configurez-les de manière à ce qu'ils ne dépassent pas le parallélisme maximal de la tâche.
+ Si vous redimensionnez manuellement votre application au-delà du parallélisme maximal, l'application ne démarre pas.

# Ajouter des balises au service géré pour les applications Apache Flink
<a name="how-tagging"></a>



Cette section décrit comment ajouter des balises de métadonnées clé-valeur à des applications de service géré pour Apache Flink. Ces balises peuvent être utilisées aux fins suivantes :
+ Déterminer la facturation des applications individuelles de service géré pour Apache Flink. Pour plus d’informations, consultez [Utilisation des balises de répartition des coûts](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) dans le *Guide Facturation et gestion des coûts*.
+ Contrôle de l'accès aux ressources d’application basé sur des balises. Pour plus d’informations, consultez [Contrôle de l’accès à l’aide de balises](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) dans le *Gestion des identités et des accès AWS Guide de l’utilisateur*.
+ À des fins définies par l'utilisateur. Vous pouvez définir des fonctionnalités d'application en fonction de la présence de balises utilisateur.

Notez les informations suivantes concernant le balisage :
+ Le nombre maximal de balises d'application inclut les balises système. Le nombre maximal de balises d'application définies par l'utilisateur est de 50.
+ Si une action inclut une liste de balises qui comporte des valeurs `Key` en double, le service émet une `InvalidArgumentException`.

**Topics**
+ [Ajouter des balises lors de la création d'une application](how-tagging-create.md)
+ [Ajouter ou mettre à jour des balises pour une application existante](how-tagging-add.md)
+ [Répertorier les balises d'une application](how-tagging-list.md)
+ [Supprimer les tags d'une application](how-tagging-remove.md)

# Ajouter des balises lors de la création d'une application
<a name="how-tagging-create"></a>

Vous ajoutez des balises lors de la création d'une application en utilisant le `tags` paramètre de l'[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)action.

L'exemple de demande suivant illustre le nœud `Tags` pour une demande `CreateApplication` :

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Ajouter ou mettre à jour des balises pour une application existante
<a name="how-tagging-add"></a>

Vous pouvez ajouter des balises à une application à l'aide de l'[TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html)action. Vous ne pouvez pas ajouter de balises à une application à l'aide de [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)cette action.

Pour mettre à jour une balise existante, ajoutez une balise avec la même clé que la balise existante.

L'exemple de demande suivant pour l'action `TagResource` ajoute de nouvelles balises ou met à jour des balises existantes :

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# Répertorier les balises d'une application
<a name="how-tagging-list"></a>

Pour répertorier les balises existantes, vous utilisez l'[ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html)action.

L'exemple de demande suivant pour l'action `ListTagsForResource` répertorie les balises pour une application :

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Supprimer les tags d'une application
<a name="how-tagging-remove"></a>

Pour supprimer des balises d'une application, vous devez utiliser l'[UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html)action.

L'exemple de demande suivant pour l'action `UntagResource` supprime des balises d'une application :

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Utilisation CloudFormation avec le service géré pour Apache Flink
<a name="lambda-cfn-flink"></a>

L'exercice suivant montre comment démarrer une application Flink créée à l' CloudFormation aide d'une fonction Lambda dans la même pile. 

## Avant de commencer
<a name="before-you-begin"></a>

Avant de commencer cet exercice, suivez les étapes de création d'une application Flink à l'aide de CloudFormation at [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html).

## Écrire une fonction Lambda
<a name="write-lambda-function"></a>

[Pour démarrer une application Flink après sa création ou sa mise à jour, nous utilisons l’API kinesisanalyticsv2 start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html). L'appel sera déclenché par un CloudFormation événement après la création de l'application Flink. Nous verrons comment configurer la pile pour déclencher la fonction Lambda plus loin dans cet exercice, mais nous allons d’abord nous concentrer sur l’instruction de la fonction Lambda et son code. Nous utilisons l’exécution `Python3.8` dans cet exemple. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

Dans le code précédent, Lambda traite les CloudFormation événements entrants, filtre tout le reste`Update`, obtient l'état de l'application et la démarre si c'est le cas. `Create` `READY` Pour obtenir l'état de l'application, vous devez créer le rôle Lambda, comme indiqué ci-dessous.

## Création d'un rôle Lambda
<a name="create-lambda-role"></a>

Vous créez un rôle pour que Lambda puisse « communiquer » avec l’application et écrire dans les journaux. Ce rôle utilise des politiques gérées par défaut, mais vous souhaiterez peut-être le limiter à l'utilisation de politiques personnalisées.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Notez que les ressources Lambda seront créées après la création de l’application Flink dans la même pile, car elles en dépendent.

## Appeler la fonction Lambda
<a name="invoking-lambda-function"></a>

Il ne reste plus qu’à invoquer la fonction Lambda. Pour ce faire, utilisez une [ressource personnalisée](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html).

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

C’est tout ce dont vous avez besoin pour démarrer votre application Flink avec Lambda. Vous êtes maintenant prêt à créer votre propre pile ou à utiliser l’exemple complet ci-dessous pour voir comment toutes ces étapes fonctionnent dans la pratique.

## Passez en revue un exemple détaillé
<a name="lambda-cfn-flink-full-example"></a>

L'exemple suivant est une version légèrement étendue des étapes précédentes avec un `RunConfiguration` ajustement supplémentaire effectué via [les paramètres du modèle](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). Il s’agit d’une pile fonctionnelle que vous pouvez essayer. Assurez-vous de lire les notes d’accompagnement : 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Encore une fois, vous souhaiterez peut-être ajuster les rôles pour Lambda ainsi que pour une application elle-même.

Avant de créer la pile ci-dessus, n’oubliez pas de spécifier vos paramètres.

paramètres.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Remplacez `YOUR_BUCKET_ARN` et `YOUR_JAR` selon vos besoins spécifiques. Vous pouvez suivre ce [guide](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) pour créer un compartiment Amazon S3 et un fichier JAR d’application.

Créez maintenant la pile (remplacez YOUR\$1REGION par une région de votre choix, par exemple us-east-1) :

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

Vous pouvez maintenant accéder à [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) et voir la progression. Une fois votre application Flink créée, vous devriez la voir à l’état `Starting`. Cela peut prendre quelques minutes avant qu’elle ne soit `Running`. 

Pour plus d’informations, consultez les ressources suivantes :
+ [Quatre méthodes pour récupérer n'importe quelle propriété de AWS service en utilisant AWS CloudFormation (partie 1 de 3)](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/).
+ [Procédure pas à pas : recherche d'une image IDs Amazon Machine](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html).

# Utiliser le tableau de bord Apache Flink avec le service géré pour Apache Flink
<a name="how-dashboard"></a>

Vous pouvez utiliser le tableau de bord Apache Flink de votre application pour surveiller l’état de santé de votre application de service géré pour Apache Flink. Le tableau de bord de votre application affiche les informations suivantes :
+ Ressources utilisées, y compris les gestionnaires de tâches et les emplacements de tâches. 
+ Informations sur les tâches, y compris celles en cours d’exécution, terminées, annulées ou ayant échoué. 

Pour obtenir des informations sur les gestionnaires de tâches, les emplacements de tâches et les tâches d’Apache Flink, consultez [Apache Flink Architecture](https://flink.apache.org/what-is-flink/flink-architecture/) sur le site Web d’Apache Flink. 

Notez ce qui suit à propos de l’utilisation du tableau de bord Apache Flink avec le service géré pour Apache Flink :
+ Le tableau de bord Apache Flink pour les applications de service géré pour Apache Flink est en lecture seule. Vous ne pouvez pas modifier votre application de service géré pour Apache Flink à l’aide du tableau de bord Apache Flink.
+ Le tableau de bord Apache Flink n’est pas compatible avec Microsoft Internet Explorer.

## Accédez au tableau de bord Apache Flink de votre application
<a name="how-dashboard-accessing"></a>

Vous pouvez accéder au tableau de bord Apache Flink de votre application via la console du service géré pour Apache Flink ou en demandant un point de terminaison URL sécurisé à l’aide de l’interface CLI.

### Accédez au tableau de bord Apache Flink de votre application à l'aide du service géré pour la console Apache Flink
<a name="how-dashboard-accessing-console"></a>

Pour accéder au tableau de bord de votre application Apache Flink depuis la console, choisissez **Tableau de bord Apache Flink** sur la page de votre application.

**Note**  
Lorsque vous ouvrez le tableau de bord depuis la console du service géré pour Apache Flink, l’URL générée par la console sera valide pendant 12 heures.

### Accédez au tableau de bord Apache Flink de votre application à l'aide du service géré pour Apache Flink CLI
<a name="how-dashboard-accessing-cli"></a>

Vous pouvez utiliser l’interface CLI du service géré pour Apache Flink pour générer une URL permettant d’accéder au tableau de bord de votre application. L’URL que vous générez est valide pour une durée déterminée.

**Note**  
Si vous n’accédez pas à l’URL générée dans les trois minutes, elle ne sera plus valide.

Vous générez l'URL de votre tableau de bord à l'aide de l'[ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html)action. Vous pouvez spécifier les paramètres suivants pour l’action : 
+ Le nom de l’application
+ Durée en secondes pendant laquelle l’URL sera valide
+ Vous spécifiez `FLINK_DASHBOARD_URL` comme type d’URL.