

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Configuration de Flink dans Amazon EMR
Configuration de Flink

## Configuration de Flink avec Hive Metastore et Glue Catalog
Hive et Glue

Les versions 6.9.0 et supérieures d'Amazon EMR prennent en charge à la fois Hive Metastore et AWS Glue Catalog avec le connecteur Apache Flink vers Hive. Cette section décrit les étapes nécessaires pour configurer [AWS Glue Catalog](#flink-configure-hive-glue) et [Hive Metastore](#flink-configure-hive-metastore) avec Flink.

**Topics**
+ [

### Utilisation de Hive Metastore
](#flink-configure-hive-metastore)
+ [

### Utiliser le catalogue AWS de données Glue
](#flink-configure-hive-glue)

### Utilisation de Hive Metastore
Hive Metastore

1. Créez un cluster EMR avec la version 6.9.0 ou supérieure et au moins deux applications : **Hive** et **Flink**. 

1. Utilisez [script runner](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html) pour exécuter le script suivant en tant que fonction d'étape :

   `hive-metastore-setup.sh`

   ```
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/fr_fr/emr/latest/ReleaseGuide/images/hive.png)

### Utiliser le catalogue AWS de données Glue
AWS Catalogue de données Glue

1. Créez un cluster EMR avec la version 6.9.0 ou supérieure et au moins deux applications : **Hive** et **Flink**. 

1. Sélectionnez **Utiliser les métadonnées de la table Hive** dans les paramètres du catalogue de données AWS Glue pour activer le catalogue de données dans le cluster.

1. Utilisez le [script runner](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html) pour exécuter le script suivant en tant que fonction d'étape : [Exécuter des commandes et des scripts sur un cluster Amazon EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html) :

   glue-catalog-setup.sh 

   ```
   sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar 
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/fr_fr/emr/latest/ReleaseGuide/images/hive.png)

## Configurer Flink avec un fichier de configuration
Fichier de configuration

Vous pouvez utiliser l'API de configuration Amazon EMR pour configurer Flink à l'aide d'un fichier de configuration. Les fichiers configurables dans l'API sont les suivants :
+ `flink-conf.yaml`
+ `log4j.properties`
+ `flink-log4j-session`
+ `log4j-cli.properties`

Le fichier de configuration principal de Flink est `flink-conf.yaml`. 

**Pour configurer le nombre de créneaux de tâches utilisés pour Flink à partir du AWS CLI**

1. Créez un fichier, `configurations.json`, contenant les éléments suivants :

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "taskmanager.numberOfTaskSlots":"2"
         }
       }
   ]
   ```

1. Créez ensuite un cluster à l'aide de la configuration suivante :

   ```
   aws emr create-cluster --release-label emr-7.12.0 \
   --applications Name=Flink \
   --configurations file://./configurations.json \
   --region us-east-1 \
   --log-uri s3://myLogUri \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

**Note**  
Vous pouvez également modifier certaines configurations avec l'API Flink. Pour plus d'informations, consultez [https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html) dans la documentation Flink.  
Avec la version 5.21.0 et ultérieures d'Amazon EMR, vous permet de remplacer les configurations de cluster et de spécifier des classifications de configuration supplémentaires pour chaque groupe d'instances dans un cluster en cours d'exécution. Pour ce faire, utilisez la console Amazon EMR, le AWS Command Line Interface (AWS CLI) ou le AWS SDK. Pour plus d'informations, consultez [Fourniture d'une configuration pour un groupe d'instances dans un cluster en cours d'exécution](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-running-cluster.html).

### Options de parallélisme


En tant que propriétaire de votre application, c'est vous qui savez le mieux quelles ressources attribuer aux tâches dans Flink. Pour les exemples de cette documentation, utilisez le même nombre de tâches que les instances de tâches que vous utilisez pour l'application. Généralement, nous recommandons cela pour le niveau initial de parallélisme, mais vous pouvez également augmenter la granularité de parallélisme à l'aide des emplacements de tâches, qui ne doivent généralement pas dépasser le nombre de [cœurs virtuels](https://aws.amazon.com/ec2/virtualcores/) par instance. Pour plus d'informations sur l'architecture de Flink, consultez [https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html) dans la documentation Flink.

## Configuration de Flink sur un cluster EMR doté de plusieurs nœuds primaires
Plusieurs nœuds primaires

Le JobManager de Flink reste disponible pendant le processus de basculement du nœud principal dans un cluster Amazon EMR comportant plusieurs nœuds principaux. À partir d'Amazon EMR 5.28.0, la JobManager haute disponibilité est également activée automatiquement. Aucune configuration manuelle n'est nécessaire.

Avec les versions 5.27.0 ou antérieures d'Amazon EMR, il s' JobManager agit d'un point de défaillance unique. En cas d' JobManager échec, il perd tous les états des tâches et ne reprend pas les tâches en cours d'exécution. Vous pouvez activer la JobManager haute disponibilité en configurant le nombre de tentatives d'application, le point de contrôle et ZooKeeper en activant le stockage d'état pour Flink, comme le montre l'exemple suivant :

```
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.resourcemanager.am.max-attempts": "10"
    }
  },
  {
    "Classification": "flink-conf",
    "Properties": {
        "yarn.application-attempts": "10",
        "high-availability": "zookeeper",
        "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
        "high-availability.storageDir": "hdfs:///user/flink/recovery",
        "high-availability.zookeeper.path.root": "/flink"
    }
  }
]
```

Vous devez configurer le nombre maximal de tentatives du maître d'application pour YARN et les tentatives d'application pour Flink. Pour plus d'informations, consultez [Configuration de la haute disponibilité du cluster YARN](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#maximum-application-master-attempts-yarn-sitexml). Vous pouvez également configurer le point de contrôle Flink pour que les tâches de JobManager restauration en cours d'exécution redémarrées soient restaurées à partir de points de contrôle précédemment terminés. Pour plus d'informations, consultez [Flink checkpointing](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html).

## Configuration de la taille du processus de mémoire
Taille du processus de mémoire

Pour les versions d'Amazon EMR qui utilisent Flink 1.11.x, vous devez configurer la taille totale du processus de mémoire pour () et JobManager (`jobmanager.memory.process.size`) dans. TaskManager `taskmanager.memory.process.size` `flink-conf.yaml` Vous pouvez définir ces valeurs soit en configurant le cluster à l'aide de l'API de configuration, soit en décommentant manuellement ces champs via SSH. Flink fournit les valeurs par défaut suivantes.
+ `jobmanager.memory.process.size` : 1600m
+ `taskmanager.memory.process.size` : 1728m

Pour exclure le métaspace JVM et la surcharge, utilisez la taille de mémoire totale de Flink (`taskmanager.memory.flink.size`) au lieu de `taskmanager.memory.process.size`. La valeur par défaut du paramètre `taskmanager.memory.process.size` est 1280m. Il n'est pas recommandé de définir à la fois `taskmanager.memory.process.size` et `taskmanager.memory.process.size`.

Toutes les versions d'Amazon EMR qui utilisent Flink 1.12.0 et versions ultérieures ont les valeurs par défaut répertoriées dans l'ensemble open source pour Flink comme valeurs par défaut sur Amazon EMR. Vous n'avez donc pas besoin de les configurer vous-même.

## Configuration de la taille du fichier de sortie du journal
Taille du fichier de sortie du journal

Les conteneurs d'applications Flink créent et écrivent dans trois types de fichiers journaux : fichiers `.out`, fichiers `.log` et fichiers `.err`. Seuls `.err` les fichiers sont compressés et supprimés du système de fichiers, tandis que les fichiers journaux `.log` et `.out` restent dans le système de fichiers. Pour garantir la gérabilité de ces fichiers de sortie et la stabilité du cluster, vous pouvez configurer la rotation des journaux dans `log4j.properties` afin de définir un nombre maximum de fichiers et de limiter leur taille.

**Amazon EMR versions 5.30.0 et ultérieures**

À partir d'Amazon EMR 5.30.0, Flink utilise le framework de journalisation log4j2 avec le nom de classification de configuration `flink-log4j.`. L'exemple de configuration suivant illustre le format log4j2.

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "appender.main.name": "MainAppender",
      "appender.main.type": "RollingFile",
      "appender.main.append" : "false",
      "appender.main.fileName" : "${sys:log.file}",
      "appender.main.filePattern" : "${sys:log.file}.%i",
      "appender.main.layout.type" : "PatternLayout",
      "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n",
      "appender.main.policies.type" : "Policies",
      "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy",
      "appender.main.policies.size.size" : "100MB",
      "appender.main.strategy.type" : "DefaultRolloverStrategy",
      "appender.main.strategy.max" : "10"
    },
  }
]
```

**Amazon EMR versions 5.29.0 et antérieures**

Avec les versions 5.29.0 et antérieures d'Amazon EMR, Flink utilise le framework de journalisation log4j. L'exemple de configuration suivant illustre le format log4j.

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.appender.file": "org.apache.log4j.RollingFileAppender",
      "log4j.appender.file.append":"true",
      # keep up to 4 files and each file size is limited to 100MB
      "log4j.appender.file.MaxFileSize":"100MB",
      "log4j.appender.file.MaxBackupIndex":4,
      "log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
      "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
    },
  }
]
```

## Configurer Flink pour qu'il fonctionne avec Java 11
Java 11

Les versions 6.12.0 et supérieures d'Amazon EMR fournissent un support d'exécution Java 11 pour Flink. Les sections suivantes décrivent comment configurer le cluster pour fournir un support d'exécution Java 11 pour Flink.

**Topics**
+ [

### Configurer Flink pour Java 11 lorsque vous créez un cluster
](#flink-configure-java11-create)
+ [

### Configurer Flink pour Java 11 sur un cluster en cours d'exécution
](#flink-configure-java11-update)
+ [

### Confirmez le runtime Java pour Flink sur un cluster en cours d'exécution
](#flink-configure-java11-confirm)

### Configurer Flink pour Java 11 lorsque vous créez un cluster


Procédez comme suit pour créer un cluster EMR avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est `flink-conf.yaml`.

------
#### [ Console ]

**Pour créer un cluster avec Flink et Java 11 Runtime dans la console**

1. [Connectez-vous au et ouvrez la AWS Management Console console Amazon EMR à l'adresse /emr. https://console.aws.amazon.com](https://console.aws.amazon.com/emr)

1. Choisissez **Clusters** sous **EMR sur EC2** dans le volet de navigation, puis **Créer un cluster**.

1. Sélectionnez Amazon EMR version 6.12.0 ou supérieure, puis choisissez d'installer l'application Flink. Sélectionnez les autres applications que vous souhaitez installer sur votre cluster.

1. Poursuivez la configuration de votre cluster. Dans la section facultatifs **Paramètres logiciels**, utilisez l'option par défaut **Entrer un configuration** et entrez la configuration suivante :

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. Continuez à configurer et à lancer votre cluster.

------
#### [ AWS CLI ]

**Pour créer un cluster avec Flink et Java 11 Runtime à partir de la CLI**

1. Créez un fichier de configuration `configurations.json` qui configure Flink pour utiliser Java 11. 

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. À partir de AWS CLI, créez un nouveau cluster EMR avec Amazon EMR version 6.12.0 ou ultérieure, et installez l'application Flink, comme indiqué dans l'exemple suivant :

   ```
   aws emr create-cluster --release-label emr-6.12.0 \ 
   --applications Name=Flink \ 
   --configurations file://./configurations.json \ 
   --region us-east-1 \ 
   --log-uri s3://myLogUri \ 
   --instance-type m5.xlarge \ 
   --instance-count 2 \ 
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

------

### Configurer Flink pour Java 11 sur un cluster en cours d'exécution


Procédez comme suit pour mettre à jour un cluster EMR en cours d'exécution avec Flink et Java 11 Runtime. Le fichier de configuration dans lequel vous ajoutez la prise en charge de Java 11 est `flink-conf.yaml`.

------
#### [ Console ]

**Pour mettre à jour un cluster en cours d'exécution avec Flink et Java 11 Runtime dans la console**

1. [Connectez-vous au et ouvrez la AWS Management Console console Amazon EMR à l'adresse /emr. https://console.aws.amazon.com](https://console.aws.amazon.com/emr)

1. Sélectionnez **Clusters** sous **EMR sur EC2** dans le panneau de navigation, puis sélectionnez le cluster que vous voulez mettre à jour.
**Note**  
Le cluster doit utiliser Amazon EMR version 6.12.0 ou ultérieure pour prendre en charge Java 11.

1. Sélectionnez l'onglet **Configurations**.

1. Dans la section **Configurations du groupe d'instances**, sélectionnez le groupe d'instances **en cours d'exécution** que vous souhaitez mettre à jour, puis choisissez **Reconfigurer** dans le menu d'actions de la liste.

1. Reconfigurez le groupe d'instances avec l'option **Modifier les attributs** comme suit. Sélectionnez **Ajouter une nouvelle configuration** après chacune d'entre elles.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/emr/latest/ReleaseGuide/flink-configure.html)

1. Sélectionnez **Enregistrer les modifications** pour ajouter les configurations.

------
#### [ AWS CLI ]

**Pour mettre à jour un cluster en cours d'exécution afin d'utiliser Flink et le runtime Java 11 à partir de la CLI**

Utilisez la commande `modify-instance-groups` pour spécifier une nouvelle configuration pour un groupe d'instances dans un cluster en cours d'exécution.

1. Créez d'abord un fichier de configuration `configurations.json` qui configure Flink pour utiliser Java 11. Dans l'exemple suivant, remplacez *ig-1xxxxxxx9* par l'ID du groupe d'instances que vous souhaitez reconfigurer. Enregistrez le fichier dans le même répertoire que celui où vous exécuterez la commande `modify-instance-groups`.

   ```
   [
      {
         "InstanceGroupId":"ig-1xxxxxxx9",
         "Configurations":[
            {
               "Classification":"flink-conf",
               "Properties":{
                 "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "env.java.home":"/usr/lib/jvm/jre-11"
               },
               "Configurations":[]
            }
         ]
      }
   ]
   ```

1. À partir de AWS CLI, exécutez la commande suivante. Remplacez l'ID du groupe d'instances que vous souhaitez reconfigurer :

   ```
   aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \
   --instance-groups file://configurations.json
   ```

------

### Confirmez le runtime Java pour Flink sur un cluster en cours d'exécution


Pour déterminer l'environnement d'exécution Java d'un cluster en cours d'exécution, connectez-vous au nœud primaire avec SSH, comme décrit dans [Connexion au nœud primaire avec SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html). Ensuite, exécutez la commande suivante :

```
ps -ef | grep flink
```

La commande `ps` associée à l'option `-ef` répertorie tous les processus en cours d'exécution sur le système. Vous pouvez filtrer cette sortie avec `grep` pour trouver les mentions de la chaîne `flink`. Vérifiez le résultat pour la valeur de l'environnement d'exécution Java (JRE), `jre-XX`. Dans le résultat suivant, `jre-11` indique que Java 11 est activé lors de l'exécution de Flink.

```
flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.
```

Vous pouvez également [vous connecter au nœud primaire avec SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) et démarrer une session Flink YARN avec une commande `flink-yarn-session -d`. La sortie montre la machine virtuelle Java (JVM) pour Flink, `java-11-amazon-corretto` dans l'exemple suivant :

```
2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64
```