

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Comprendre MSK Connect
<a name="msk-connect"></a>

MSK Connect est une fonctionnalité d'Amazon MSK qui permet aux développeurs de diffuser facilement des données vers et depuis leurs clusters Apache Kafka. MSK Connect utilise les versions 2.7.1 ou 3.7.x de Kafka Connect, qui sont des frameworks open source permettant de connecter des clusters Apache Kafka à des systèmes externes tels que des bases de données, des index de recherche et des systèmes de fichiers. Avec MSK Connect, vous pouvez déployer des connecteurs entièrement gérés conçus pour Kafka Connect qui transfèrent des données vers ou extraient des données depuis des magasins de données populaires tels qu'Amazon S3 et Amazon OpenSearch Service. Vous pouvez déployer des connecteurs développés par des tiers comme Debezium pour diffuser les journaux des modifications des bases de données vers un cluster Apache Kafka ou déployer un connecteur existant sans modification de code. Les connecteurs sont automatiquement mis à l'échelle pour s'adapter à l'évolution de la charge et vous ne payez que pour les ressources que vous utilisez.

Utilisez des connecteurs sources pour importer des données provenant de systèmes externes dans vos rubriques. Grâce aux connecteurs récepteurs, vous pouvez exporter les données de vos rubriques vers des systèmes externes.

MSK Connect prend en charge les connecteurs pour tout cluster Apache Kafka connecté à un Amazon VPC, qu'il s'agisse d'un cluster MSK ou d'un cluster Apache Kafka hébergé indépendamment. 

MSK Connect surveille en permanence l'état de santé et l'état de livraison des connecteurs, applique les correctifs et gère le matériel sous-jacent, et adapte automatiquement les connecteurs en fonction de l'évolution du débit.

Pour commencer à utiliser la console, consultez [Commencer à utiliser MSK Connect](msk-connect-getting-started.md). 

Pour en savoir plus sur les AWS ressources que vous pouvez créer avec MSK Connect, consultez [Comprendre les connecteurs](msk-connect-connectors.md)[Créez des plugins personnalisés](msk-connect-plugins.md), et[Comprenez les employés de MSK Connect](msk-connect-workers.md).

Pour plus d'informations sur l'API MSK Connect, consultez le manuel [Référence de l'API Amazon MSK Connect](https://docs.aws.amazon.com/MSKC/latest/mskc/Welcome.html). 

## Avantages liés à l'utilisation d'Amazon MSK Connect
<a name="msk-connect-benefits"></a>

Apache Kafka est l'une des plateformes de streaming open source les plus largement adoptées pour l'ingestion et le traitement de flux de données en temps réel. Avec Apache Kafka, vous pouvez dissocier et dimensionner indépendamment vos applications productrices et consommatrices de données.

Kafka Connect est un élément important de la création et de l'exécution d'applications de streaming avec Apache Kafka. Kafka Connect fournit un moyen standardisé de transférer des données entre Kafka et des systèmes externes. Kafka Connect est hautement évolutif et peut gérer de gros volumes de données Kafka Connect fournit un ensemble puissant d'opérations d'API et d'outils pour configurer, déployer et surveiller les connecteurs qui déplacent les données entre les sujets Kafka et les systèmes externes. Vous pouvez utiliser ces outils pour personnaliser et étendre les fonctionnalités de Kafka Connect afin de répondre aux besoins spécifiques de votre application de streaming.

Vous pouvez rencontrer des difficultés lorsque vous utilisez des clusters Apache Kafka Connect de manière autonome ou lorsque vous essayez de migrer des applications Apache Kafka Connect open source vers. AWS Ces défis incluent le temps nécessaire à la configuration de l'infrastructure et au déploiement des applications, les obstacles techniques liés à la configuration de clusters Apache Kafka Connect autogérés et les frais administratifs opérationnels.

Pour relever ces défis, nous vous recommandons d'utiliser Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect) pour migrer vos applications open source Apache Kafka Connect vers. AWS Amazon MSK Connect simplifie l'utilisation de Kafka Connect pour diffuser des données depuis et vers des clusters Apache Kafka et des systèmes externes, tels que des bases de données, des index de recherche et des systèmes de fichiers.

Voici certains des avantages de la migration vers Amazon MSK Connect :
+ **Élimination de la charge opérationnelle** : Amazon MSK Connect allège la charge opérationnelle associée à l'application de correctifs, au provisionnement et à la mise à l'échelle des clusters Apache Kafka Connect. Amazon MSK Connect surveille en permanence l'état de vos clusters Connect et automatise les correctifs et les mises à niveau de version sans perturber vos charges de travail.
+ **Redémarrage automatique des tâches Connect** : Amazon MSK Connect peut récupérer automatiquement les tâches ayant échoué afin de réduire les interruptions de production. Les échecs de tâches peuvent être provoqués par des erreurs temporaires, telles que le dépassement de la limite de connexion TCP pour Kafka ou le rééquilibrage des tâches lorsque de nouveaux collaborateurs rejoignent le groupe de consommateurs pour les connecteurs récepteurs.
+ Mise à **l'échelle horizontale et verticale automatique** : Amazon MSK Connect permet à l'application du connecteur de s'adapter automatiquement pour prendre en charge des débits plus élevés. Amazon MSK Connect gère le dimensionnement pour vous. Il vous suffit de spécifier le nombre de travailleurs dans le groupe de mise à l'échelle automatique et les seuils d'utilisation. Vous pouvez utiliser le fonctionnement de l'`UpdateConnector`API Amazon MSK Connect pour augmenter ou diminuer verticalement le v CPUs entre 1 et 8 v afin de prendre en CPUs charge un débit variable.
+ **Connectivité réseau privée** — Amazon MSK Connect se connecte de manière privée aux systèmes source et récepteur en utilisant AWS PrivateLink des noms DNS privés.

# Commencer à utiliser MSK Connect
<a name="msk-connect-getting-started"></a>

Il s'agit d'un step-by-step didacticiel qui utilise le AWS Management Console pour créer un cluster MSK et un connecteur récepteur qui envoie des données du cluster vers un compartiment S3.

**Topics**
+ [Configuration des ressources requises pour MSK Connect](mkc-tutorial-setup.md)
+ [Créer un plugin personnalisé](mkc-create-plugin.md)
+ [Création d'une machine cliente et d'une rubrique Apache Kafka](mkc-create-topic.md)
+ [Créer un connecteur](mkc-create-connector.md)
+ [Envoyer des données au cluster MSK](mkc-send-data.md)

# Configuration des ressources requises pour MSK Connect
<a name="mkc-tutorial-setup"></a>

Au cours de cette étape, vous créez les ressources suivantes dont vous avez besoin pour ce scénario de mise en route :
+ Un compartiment Amazon S3 destiné à servir de destination pour recevoir les données du connecteur.
+ Un cluster MSK auquel vous allez envoyer des données. Le connecteur lira ensuite les données de ce cluster et les enverra au compartiment S3 de destination.
+ Une politique IAM qui contient les autorisations d'écriture dans le compartiment S3 de destination.
+ Un rôle IAM qui permet au connecteur d'écrire dans le compartiment S3 de destination. Vous allez ajouter la politique IAM que vous créez à ce rôle.
+ Le point de terminaison d'un VPC Amazon permettant d'envoyer des données depuis le VPC Amazon qui possède le cluster et le connecteur vers Amazon S3.

**Pour créer le compartiment S3**

1. Connectez-vous à la console Amazon S3 AWS Management Console et ouvrez-la à l'adresse [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choisissez **Créer un compartiment**.

1. Pour le nom du compartiment, entrez un nom descriptif, tel que `amzn-s3-demo-bucket-mkc-tutorial`.

1. Faites défiler l'écran vers le bas et choisissez **Créer un compartiment**.

1. Dans la liste des compartiments, choisissez le compartiment que vous venez de créer.

1. Choisissez **Créer un dossier**.

1. Entrez `tutorial` comme nom de dossier, puis faites défiler l'écran vers le bas et choisissez **Créer un dossier**.

**Pour créer le cluster**

1. Vous voulez ouvrir la console Amazon MSK [https://console.aws.amazon.com/msk/chez vous ? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Dans le volet de gauche, sous **Clusters MSK**, sélectionnez **Clusters**.

1. Choisissez **Créer un cluster**.

1. Dans **Méthode de création**, choisissez **Création personnalisée**.

1. Comme nom de cluster, saisissez **mkc-tutorial-cluster**.

1. Dans **Type de cluster**, choisissez **Provisioned.**

1. Choisissez **Suivant**.

1. Sous **Mise en réseau**, choisissez un VPC Amazon. Sélectionnez ensuite les zones de disponibilité et les sous-réseaux que vous voulez utiliser. N'oubliez pas le IDs VPC Amazon et les sous-réseaux que vous avez sélectionnés car vous en aurez besoin plus tard dans ce didacticiel.

1. Choisissez **Suivant**.

1. Sous **Méthodes de contrôle d'accès**, assurez-vous que seul l'**accès non authentifié** est sélectionné.

1. Sous **Chiffrement**, assurez-vous que seul le **texte brut** est sélectionné.

1. Continuez à utiliser l'assistant, puis choisissez **Créer un cluster**. Vous accédez alors à la page Détails du cluster. Sur cette page, sous **Groupes de sécurité appliqués**, recherchez l'ID du groupe de sécurité. N'oubliez pas cet ID, car vous en aurez besoin ultérieurement dans ce didacticiel.

**Pour créer une politique IAM avec des autorisations d'écriture dans le compartiment S3**

1. Ouvrez la console IAM à l’adresse [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Dans le panneau de navigation, choisissez **Politiques**.

1. Choisissez **Create Policy** (Créer une politique).

1. Dans l'**éditeur de politiques**, choisissez **JSON**, puis remplacez le JSON dans la fenêtre de l'éditeur par le JSON suivant.

   Dans l'exemple suivant, remplacez *<amzn-s3-demo-bucket-my-tutorial>* par le nom de votre compartiment S3.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Sid": "AllowListBucket",
         "Effect": "Allow",
         "Action": [
           "s3:ListBucket",
           "s3:GetBucketLocation"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>"
       },
       {
         "Sid": "AllowObjectActions",
         "Effect": "Allow",
         "Action": [
           "s3:PutObject",
           "s3:GetObject",
           "s3:DeleteObject",
           "s3:AbortMultipartUpload",
           "s3:ListMultipartUploadParts",
           "s3:ListBucketMultipartUploads"
         ],
         "Resource": "arn:aws:s3:::<amzn-s3-demo-bucket-my-tutorial>/*"
       }
     ]
   }
   ```

------

   Pour obtenir des instructions sur la façon de rédiger des politiques de sécurité, consultez[Contrôle d'accès IAM](iam-access-control.md).

1. Choisissez **Suivant**.

1. Sur la page **Examiner et créer**, procédez comme suit :

   1. Pour **Nom de la politique**, entrez un nom descriptif, tel que**mkc-tutorial-policy**.

   1. Dans **Autorisations définies dans cette politique**, passez en revue et and/or modifiez les autorisations définies dans votre politique.

   1. (Facultatif) Pour identifier, organiser ou rechercher la politique, choisissez **Ajouter une nouvelle balise pour ajouter** des balises sous forme de paires clé-valeur. Par exemple, ajoutez une balise à votre politique avec la paire clé-valeur et. **Environment** **Test**

      Pour plus d'informations sur l'utilisation des balises, consultez la section [Balises pour les Gestion des identités et des accès AWS ressources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) dans le *guide de l'utilisateur IAM*.

1. Choisissez **Create Policy** (Créer une politique).

**Pour créer le rôle IAM capable d'écrire dans le compartiment de destination**

1. Dans le volet de navigation de la console IAM, choisissez **Roles**, puis **Create role**.

1. Sur la page **Select trusted entity** (Sélectionner une entité de confiance), procédez comme suit :

   1. Pour **Trusted entity** (Entité de confiance), choisissez **Service AWS**.

   1. Pour **Service ou cas d'utilisation**, choisissez **S3**.

   1. Sous **Cas d'utilisation**, choisissez **S3**.

1. Choisissez **Suivant**.

1. Sur la page **Add permissions** (Ajouter des autorisations), procédez comme suit :

   1. Dans le champ de recherche sous **Politiques d'autorisations**, entrez le nom de la politique que vous avez créée précédemment pour ce didacticiel. Par exemple, **mkc-tutorial-policy**. Choisissez ensuite la case située à gauche du nom de la politique.

   1. (Facultatif) Définissez une [limite d'autorisations](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_boundaries.html). Il s’agit d’une fonctionnalité avancée disponible pour les fonctions de service, mais pas pour les rôles liés à un service. Pour plus d'informations sur la définition d'une limite d'autorisations, voir [Création de rôles et attachement de politiques (console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_job-functions_create-policies.html) dans le *guide de l'utilisateur IAM*.

1. Choisissez **Suivant**.

1. Sur la page **Name, review, and create** (Nommer, vérifier et créer), procédez comme suit :

   1. Pour **Nom du rôle**, entrez un nom descriptif, tel que**mkc-tutorial-role**.
**Important**  
Lorsque vous nommez un rôle, notez ce qui suit :  
Les noms de rôles doivent être uniques au sein du Compte AWS vôtre et ne peuvent pas être rendus uniques au cas par cas.  
Par exemple, ne créez pas deux rôles nommés **PRODROLE** et **prodrole**. Lorsqu’un nom de rôle est utilisé dans une politique ou dans le cadre d’un ARN, le nom de rôle est sensible à la casse. Cependant, lorsqu’un nom de rôle apparaît aux clients dans la console, par exemple lors de la procédure d’ouverture de session, le nom de rôle est insensible à la casse.
Vous ne pouvez pas modifier le nom du rôle après sa création, car d’autres entités pourraient y faire référence.

   1. (Facultatif) Pour **Description**, saisissez la description du rôle.

   1. (Facultatif) Pour modifier les cas d'utilisation et les autorisations du rôle, à l'**étape 1 : Sélectionnez les entités de confiance** ou à l'**étape 2 : Ajouter des sections d'autorisations**, choisissez **Modifier**.

   1. (Facultatif) Pour identifier, organiser ou rechercher le rôle, choisissez **Ajouter une nouvelle balise pour ajouter** des balises sous forme de paires clé-valeur. Par exemple, ajoutez une balise à votre rôle avec la paire clé-valeur et. **ProductManager** **John**

      Pour plus d'informations sur l'utilisation des balises, consultez la section [Balises pour les Gestion des identités et des accès AWS ressources](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html) dans le *guide de l'utilisateur IAM*.

1. Passez en revue les informations du rôle, puis choisissez **Create role** (Créer un rôle).

**Pour autoriser MSK Connect à assumer le rôle**

1. Dans le volet gauche de la console IAM, sous **Gestion des accès**, sélectionnez **Rôles**.

1. Recherchez `mkc-tutorial-role` et choisissez-le.

1. Sur la page **Récapitulatif** du rôle, choisissez l'onglet **Relations d'approbation**.

1. Choisissez **Modifier la relation d’approbation**.

1. Remplacez la politique d'approbation existante par le code JSON suivant.

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "kafkaconnect.amazonaws.com"
         },
         "Action": "sts:AssumeRole"
       }
     ]
   }
   ```

------

1. Choisissez **Mettre à jour la politique d'approbation**.

**Pour créer un point de terminaison d'un VPC Amazon à partir du VPC du cluster vers Amazon S3**

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Dans le volet gauche, sélectionnez **Points de terminaison**.

1. Choisissez **Créer un point de terminaison**.

1. Sous **Nom du service**, sélectionnez le service **com.amazonaws.us-east-1.s3** et le type de **passerelle**.

1. Choisissez le VPC du cluster, puis cochez la case située à gauche de la table de routage associée aux sous-réseaux du cluster.

1. Choisissez **Créer un point de terminaison**.

**Étape suivante**

[Créer un plugin personnalisé](mkc-create-plugin.md)

# Créer un plugin personnalisé
<a name="mkc-create-plugin"></a>

Un plugin contient le code qui définit la logique du connecteur. Au cours de cette étape, vous allez créer un plugin personnalisé contenant le code du connecteur récepteur Lenses Amazon S3. Dans une étape ultérieure, lorsque vous créerez le connecteur MSK, vous spécifierez que son code se trouve dans ce plugin personnalisé. Vous pouvez utiliser le même plugin pour créer plusieurs connecteurs MSK avec des configurations différentes.

**Pour créer le connecteur personnalisé**

1. Téléchargez le [connecteur S3](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

1. Chargez le fichier ZIP vers un compartiment S3 auquel vous avez accès. Pour plus d'informations sur le chargement de fichiers sur Amazon S3, consultez la section [Chargement d'objets](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) dans le guide de l'utilisateur d'Amazon S3.

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Dans le volet de gauche, développez **MSK Connect**, puis choisissez **Plugins personnalisés**.

1. Choisissez **Créer un plugin personnalisé**.

1. Choisissez **Parcourir S3**.

1. Dans la liste des compartiments, recherchez le compartiment dans lequel vous avez chargé le fichier ZIP, puis sélectionnez-le.

1. Dans la liste des objets du compartiment, sélectionnez le bouton radio situé à gauche du fichier ZIP, puis cliquez sur le bouton intitulé **Choisir**.

1. Entrez `mkc-tutorial-plugin` comme nom de plugin personnalisé, puis choisissez **Créer un plugin personnalisé**.

La création du plugin personnalisé peut prendre AWS quelques minutes. Une fois le processus de création terminé, le message suivant apparaît dans une bannière en haut de la fenêtre du navigateur.

```
Custom plugin mkc-tutorial-plugin was successfully created
The custom plugin was created. You can now create a connector using this custom plugin.
```

**Étape suivante**

[Création d'une machine cliente et d'une rubrique Apache Kafka](mkc-create-topic.md)

# Création d'une machine cliente et d'une rubrique Apache Kafka
<a name="mkc-create-topic"></a>

Dans cette étape, vous créez une instance Amazon EC2 à utiliser comme instance de client Apache Kafka. Vous utilisez ensuite cette instance pour créer une rubrique sur le cluster.

**Pour créer un ordinateur client**

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Sélectionnez **Lancer des instances**.

1. Saisissez un **Nom** pour votre ordinateur client, tel que **mkc-tutorial-client**.

1. Laissez **Amazon Linux 2 AMI (HVM) - Kernel 5.10, type de volume SSD** sélectionné pour le **type Amazon Machine Image (AMI)**.

1. Choisissez le type d'instance **t2.xlarge**.

1. Sous **Paire de clés (connexion)**, choisissez **Créer une nouvelle paire de clés**. Saisissez **mkc-tutorial-key-pair** dans **Nom de la paire de clés**, puis choisissez **Télécharger la paire de clés**. Vous pouvez utiliser également une paire de clés existante.

1. Choisissez **Lancer l'instance**.

1. Choisissez **View Instances (Afficher les instances)**. Ensuite, dans la colonne **Groupes de sécurité**, choisissez le groupe de sécurité associé à votre nouvelle instance. Copiez l'ID du groupe de sécurité et enregistrez-le pour plus tard.

**Pour autoriser le client nouvellement créé à envoyer des données au cluster**

1. Ouvrez la console Amazon VPC à l’adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Dans le volet de gauche, sélectionnez **Groupes de sécurité** sous l'onglet **SÉCURITÉ**. Dans la colonne **ID du groupe de sécurité)**, recherchez le groupe de sécurité du cluster. Vous avez enregistré l'ID de ce groupe de sécurité lorsque vous avez créé le cluster dans [Configuration des ressources requises pour MSK Connect](mkc-tutorial-setup.md). Choisissez ce groupe de sécurité en cochant la case située à gauche de sa ligne. Assurez-vous qu'aucun autre groupe de sécurité n'est sélectionné simultanément.

1. Dans la moitié inférieure de l'écran, choisissez l'onglet **Règles entrantes**.

1. Choisissez **Modifier les règles entrantes**.

1. Dans le coin inférieur gauche de l'écran, choisissez **Ajouter une règle**.

1. Dans la nouvelle règle, choisissez **Tout le trafic** dans la colonne **Type** . Dans le champ à droite de la colonne **Source**, entrez l'ID du groupe de sécurité de l'ordinateur client. Il s'agit de l'ID du groupe de sécurité que vous avez enregistré après avoir créé l'ordinateur client.

1. Sélectionnez **Enregistrer les règles**. Votre cluster MSK accepte désormais tout le trafic provenant du client que vous avez créé dans la procédure précédente.

**Pour créer une rubrique**

1. Ouvrez la console Amazon EC2 à l’adresse [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Dans le tableau des instances, sélectionnez `mkc-tutorial-client`.

1. En haut de l'écran, choisissez **Connexion**, puis suivez les instructions pour vous connecter à l'instance.

1. Installez Java sur l'instance client en exécutant la commande suivante :

   ```
   sudo yum install java-1.8.0
   ```

1. Exécutez la commande suivante pour télécharger Apache Kafka. 

   ```
   wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
   ```
**Note**  
Si vous souhaitez utiliser un site miroir autre que celui utilisé dans cette commande, vous pouvez en choisir un autre sur le site web [Apache](https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.11-1.1.1.tgz).

1. Exécutez la commande suivante dans le répertoire où vous avez téléchargé le fichier TAR à l'étape précédente.

   ```
   tar -xzf kafka_2.12-2.2.1.tgz
   ```

1. Accédez au répertoire **kafka\$12.12-2.2.1**.

1. Vous voulez ouvrir la console Amazon MSK [https://console.aws.amazon.com/msk/chez vous ? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Dans le volet de gauche, choisissez **Clusters**, puis choisissez le nom`mkc-tutorial-cluster`.

1. Choisissez **Afficher les informations sur le client**.

1. Copiez la chaîne de connexion en **texte brut**.

1. Sélectionnez **Exécuté**.

1. Exécutez la commande suivante sur l'instance cliente (`mkc-tutorial-client`), en la *bootstrapServerString* remplaçant par la valeur que vous avez enregistrée lorsque vous avez consulté les informations client du cluster.

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server bootstrapServerString --replication-factor 2 --partitions 1 --topic mkc-tutorial-topic
   ```

   Si la commande réussit, le message suivant s'affiche : `Created topic mkc-tutorial-topic.`

**Étape suivante**

[Créer un connecteur](mkc-create-connector.md)

# Créer un connecteur
<a name="mkc-create-connector"></a>

Cette procédure décrit comment créer un connecteur à l'aide du AWS Management Console.

**Pour créer le connecteur**

1. Connectez-vous à la AWS Management Console console Amazon MSK et ouvrez-la [https://console.aws.amazon.com/msk/chez vous ? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Dans le volet gauche, développez **MSK Connect**, puis choisissez **Connecteurs**.

1. Sélectionnez **Créer un connecteur**.

1. Dans la liste des plugins, choisissez `mkc-tutorial-plugin`, puis **Suivant**.

1. Pour le nom du connecteur, entrez `mkc-tutorial-connector`.

1. Dans la liste des clusters, choisissez `mkc-tutorial-cluster`.

1. Dans la section **Paramètres réseau du connecteur**, choisissez l'une des options suivantes pour le type de réseau :
   + **IPv4**(par défaut) - Pour la connectivité aux destinations IPv4 uniquement
   + **Double pile** : pour la connectivité aux destinations via les deux IPv4 et IPv6 (disponible uniquement si des blocs IPv6 CIDR sont IPv4 associés à vos sous-réseaux)

1. Copiez la configuration suivante et collez-la dans le champ de configuration du connecteur.

   Assurez-vous de remplacer la région par le code de l' Région AWS endroit où vous créez le connecteur. Remplacez également le nom du compartiment Amazon S3 *<amzn-s3-demo-bucket-my-tutorial>* par le nom de votre compartiment dans l'exemple suivant.

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   s3.region=us-east-1
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   flush.size=1
   schema.compatibility=NONE
   tasks.max=2
   topics=mkc-tutorial-topic
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   storage.class=io.confluent.connect.s3.storage.S3Storage
   s3.bucket.name=<amzn-s3-demo-bucket-my-tutorial>
   topics.dir=tutorial
   ```

1. Sous **Autorisations d'accès**, choisissez `mkc-tutorial-role`.

1. Choisissez **Suivant**. Sur la page **Sécurité**, sélectionnez à nouveau **Suivant**.

1. Sur la page **Journaux**, choisissez **Suivant**.

1. Sur la page **Réviser et créer**, passez en revue la configuration de votre connecteur et choisissez **Create connector**.

**Étape suivante**

[Envoyer des données au cluster MSK](mkc-send-data.md)

# Envoyer des données au cluster MSK
<a name="mkc-send-data"></a>

Au cours de cette étape, vous envoyez des données à la rubrique Apache Kafka que vous avez créée précédemment, puis vous recherchez ces mêmes données dans le compartiment S3 de destination.

**Pour envoyer des données au cluster MSK**

1. Dans le dossier `bin` de l'installation d'Apache Kafka sur l'instance cliente, créez un fichier texte nommé `client.properties` avec le contenu suivant.

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=AWS_MSK_IAM
   ```

1. Exécutez la commande suivante pour créer un producteur de console. Remplacez *BootstrapBrokerString* par la valeur que vous avez obtenue lors de l'exécution de la commande précédente.

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerString --producer.config client.properties --topic mkc-tutorial-topic
   ```

1. Saisissez le message souhaité, puis appuyez sur **Entrée**. Répétez cette étape deux ou trois fois. Chaque fois que vous entrez une ligne et appuyez sur **Entrée**, cette ligne est envoyée à votre cluster Apache Kafka sous forme de message distinct.

1. Recherchez dans le compartiment Amazon S3 de destination les messages que vous avez envoyés à l'étape précédente.

# Comprendre les connecteurs
<a name="msk-connect-connectors"></a>

Un connecteur intègre des systèmes externes et des services Amazon à Apache Kafka en copiant en continu les données de streaming d'une source de données vers votre cluster Apache Kafka, ou en copiant en continu les données de votre cluster vers un récepteur de données. Un connecteur peut également exécuter une logique légère telle que la transformation, la conversion de format ou le filtrage des données avant de les livrer à une destination. Les connecteurs source extraient les données d'une source de données et les transmettent au cluster, tandis que les connecteurs récepteurs extraient les données du cluster et les transfèrent vers un récepteur de données.

Le diagramme suivant illustre l'architecture d'un connecteur. Un worker est un processus de machine virtuelle Java (JVM) qui exécute la logique du connecteur. Chaque worker crée un ensemble de tâches qui s'exécutent dans des threads parallèles et se chargent de copier les données. Les tâches ne stockent pas l'état et peuvent donc être démarrées, arrêtées ou redémarrées à tout moment afin de fournir un pipeline de données résilient et évolutif.

![\[Diagramme montrant l'architecture d'un cluster de connecteur.\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/images/mkc-worker-architecture.png)


# Comprendre la capacité du connecteur
<a name="msk-connect-capacity"></a>

La capacité totale d'un connecteur dépend du nombre de travailleurs qu'il possède, ainsi que du nombre d'unités MSK Connect (MCUs) par travailleur. Chaque MCU représente 1 vCPU de calcul et 4 Gio de mémoire. La mémoire MCU correspond à la mémoire totale d'une instance de worker et non à la mémoire de tas utilisée.

Les employés de MSK Connect utilisent les adresses IP des sous-réseaux fournis par le client. Chaque travailleur utilise une adresse IP provenant de l'un des sous-réseaux fournis par le client. Vous devez vous assurer que vous disposez d'un nombre suffisant d'adresses IP disponibles dans les sous-réseaux fournis à une CreateConnector demande pour tenir compte de leur capacité spécifiée, en particulier lors du dimensionnement automatique des connecteurs où le nombre de travailleurs peut fluctuer.

Pour créer un connecteur, vous devez choisir l'un des deux modes de capacité suivants.
+ *Provisionné* : choisissez ce mode si vous connaissez les exigences de capacité de votre connecteur. Vous spécifiez deux valeurs :
  + Le nombre de workers.
  + Le nombre de MCUs par travailleur.
+ *Mis à l'échelle automatiquement* : choisissez ce mode si les exigences de capacité de votre connecteur sont variables ou si vous ne les connaissez pas à l'avance. Lorsque vous utilisez le mode autoscaled, Amazon MSK Connect remplace la propriété `tasks.max` de votre connecteur par une valeur proportionnelle au nombre de travailleurs exécutant le connecteur et au nombre MCUs de travailleurs par travailleur. 

  Vous spécifiez trois ensembles de valeurs :
  + Le nombre minimal et maximal de workers.
  + Les pourcentages d'utilisation de la mise à l'échelle horizontale et de la montée en charge du processeur, qui sont déterminés par la métrique `CpuUtilization`. Lorsque la métrique `CpuUtilization` du connecteur dépasse le pourcentage de montée en charge, MSK Connect augmente le nombre de workers qui utilisent le connecteur. Lorsque la métrique `CpuUtilization` passe en dessous du pourcentage de la mise à l'échelle horizontale, MSK Connect réduit le nombre de workers. Le nombre de workers reste toujours dans les limites des nombres minimum et maximum que vous spécifiez lors de la création du connecteur.
  + Le nombre de MCUs par travailleur.
  + (Facultatif) *Nombre maximal de tâches de mise à l'échelle automatique* : nombre maximal de tâches allouées au connecteur lors des opérations de mise à l'échelle automatique. Ce paramètre vous permet de définir une limite supérieure pour la création de tâches, afin de mieux contrôler l'utilisation des ressources et le parallélisme par rapport à vos partitions de sujets Kafka.

Pour plus d'informations sur les travailleurs, voir[Comprenez les employés de MSK Connect](msk-connect-workers.md), et pour plus d'informations sur le nombre maximal de tâches de mise à l'échelle automatique, voir[Comprendre le nombre maximal de tâches de mise à l'échelle automatique](msk-connect-max-autoscaling-task-count.md). Pour en savoir plus sur les métriques MSK Connect, consultez [Surveillance d'Amazon MSK Connect](mkc-monitoring-overview.md).

# Comprendre le nombre maximal de tâches de mise à l'échelle automatique
<a name="msk-connect-max-autoscaling-task-count"></a>

Le `maxAutoscalingTaskCount` paramètre est un champ de capacité facultatif disponible pour les connecteurs de mise à l'échelle automatique dans Amazon MSK Connect. Ce paramètre vous permet de définir une limite supérieure au nombre maximum de tâches pouvant être créées lors des opérations de mise à l'échelle automatique du connecteur, afin de mieux contrôler l'utilisation des ressources et les performances.

Lorsque vous utilisez le mode de capacité autoscalée, Amazon MSK Connect remplace automatiquement la propriété `tasks.max` de votre connecteur par une valeur proportionnelle au nombre de travailleurs MCUs et par travailleur. Le `maxAutoscalingTaskCount` paramètre fournit une option configurable supplémentaire pour limiter le nombre maximum de tâches créées pour votre connecteur.

Cette fonctionnalité est particulièrement utile lorsque vous souhaitez contrôler le niveau de parallélisme par rapport au nombre de partitions thématiques de votre cluster Kafka. En définissant cette limite, vous pouvez optimiser les performances et éviter une distribution inefficace des tâches qui pourrait se produire lorsque le nombre de tâches calculé automatiquement dépasse vos exigences en matière de charge de travail.

## Exigences de configuration
<a name="msk-connect-max-autoscaling-task-count-requirements"></a>

Le `maxAutoscalingTaskCount` paramètre doit répondre aux exigences suivantes :

```
maxAutoscalingTaskCount ≥ maxWorkerCount
```

Cette exigence garantit une utilisation efficace des ressources en maintenant au moins une tâche par travailleur. Le système applique ce minimum afin d'optimiser la fonctionnalité du connecteur.

Lorsque vous spécifiez`maxAutoscalingTaskCount`, la limite est appliquée immédiatement lors de la création du connecteur et lors de tous les événements de dimensionnement ultérieurs. À mesure que le nombre de travailleurs augmente ou diminue pendant les opérations de mise à l'échelle automatique, le système continue de respecter cette limite. La `tasks.max` valeur s'ajuste proportionnellement au nombre de travailleurs et MCUs par travailleur, mais ne dépasse jamais la valeur configurée`maxAutoscalingTaskCount`.

Si vous ne spécifiez pas ce paramètre, le connecteur utilise le calcul standard sans limite : `tasks.max = workerCount × mcuCount × tasksPerMcu` (où tasksPerMcu est 2). 

## Quand utiliser maxAutoscalingTask Count
<a name="msk-connect-max-autoscaling-task-count-when-to-use"></a>

Envisagez `maxAutoscalingTaskCount` de l'utiliser dans les scénarios suivants :
+ *Nombre de partitions limité* : lorsque vos sujets Kafka comportent un nombre fixe de partitions inférieur au nombre de tâches calculé automatiquement, la définition d'une limite empêche la création de tâches inactives sans aucune tâche à effectuer.
+ *Optimisation des performances* : lorsque vous avez identifié qu'un nombre de tâches spécifique fournit un débit optimal pour votre charge de travail, vous pouvez limiter le nombre maximum de tâches afin de maintenir des performances constantes.
+ *Gestion des ressources* : lorsque vous souhaitez contrôler le parallélisme et la consommation de ressources maximaux de votre connecteur, quel que soit le nombre de travailleurs actifs.

## Exemple
<a name="msk-connect-max-autoscaling-task-count-example"></a>

Pour un connecteur avec la configuration suivante :

```
minWorkerCount: 1
maxWorkerCount: 4
mcuCount: 8
maxAutoscalingTaskCount: 15
```

Sans cela`maxAutoscalingTaskCount`, lorsqu'il est étendu à 4 travailleurs, le connecteur créerait 64 tâches (4 travailleurs × 8 MCUs × 2 tâches par microcontrôleur). Si ce paramètre est `maxAutoscalingTaskCount` défini sur 15, le connecteur ne crée que 15 tâches, ce qui peut être plus approprié si votre sujet Kafka comporte 15 partitions ou moins.

# Configuration du type de réseau à double pile
<a name="msk-connect-dual-stack"></a>

Amazon MSK Connect prend en charge le type de réseau à double pile pour les nouveaux connecteurs. Grâce au réseau à double pile, vos connecteurs peuvent se connecter à des destinations à la fois via IPv4 et IPv6. Notez que la IPv6 connectivité n'est disponible qu'en mode double pile (IPv4 \$1 IPv6) ; IPv6 seule la mise en réseau n'est pas prise en charge.

Par défaut, les nouveaux connecteurs utilisent le type de IPv4 réseau. Pour créer un connecteur avec un type de réseau à double pile, assurez-vous de remplir les conditions requises décrites dans la section suivante. Notez qu'une fois que vous avez créé un connecteur utilisant un type de réseau à double pile, vous ne pouvez pas modifier son type de réseau. Pour modifier le type de réseau, vous devez supprimer et recréer le connecteur.

Amazon MSK Connect prend également en charge la connectivité des points de terminaison des API de service à la fois sur IPv6 et IPv4. Pour utiliser la IPv6 connectivité pour les appels d'API, vous devez utiliser les points de terminaison à double pile. Pour plus d'informations sur les points de terminaison du service MSK Connect, consultez Points de terminaison et [quotas Amazon MSK Connect](https://docs.aws.amazon.com/general/latest/gr/msk-connect.html).

## Conditions préalables à l'utilisation du type de réseau à double pile
<a name="dual-stack-prerequisites"></a>

Avant de configurer le type de réseau à double pile pour vos connecteurs, assurez-vous que tous les sous-réseaux que vous fournissez lors de la création du connecteur disposent à la fois de blocs IPv4 CIDR IPv6 et de blocs CIDR.

## Considérations relatives à l'utilisation du type de réseau à double pile
<a name="dual-stack-considerations"></a>
+ IPv6 le support n'est actuellement disponible qu'en mode double pile (IPv4 \$1 IPv6), et non en tant que IPv6 -only
+ Les connecteurs dotés de la technologie Dual-Stack peuvent se connecter à la IPv4 fois IPv6 aux systèmes de données MSK et Sink ou Source
+ Le type de réseau ne peut pas être modifié après la création du connecteur. Vous devez supprimer et recréer le connecteur pour changer de type de réseau
+ Tous les sous-réseaux spécifiés lors de la création du connecteur doivent prendre en charge la double pile pour que la création du connecteur réussisse avec le type de réseau à double pile
+ Si vous utilisez des sous-réseaux à double pile mais qu'aucun type de réseau n'est spécifié, le connecteur utilisera par défaut la valeur IPv4 -only pour des raisons de rétrocompatibilité
+ Pour les connecteurs existants, vous ne pouvez pas mettre à jour le type de réseau. Vous devez supprimer et recréer le connecteur pour modifier le type de réseau
+ L'utilisation d'un réseau à double pile n'entraîne aucun coût supplémentaire

# Création d’un connecteur
<a name="mkc-create-connector-intro"></a>

Cette procédure décrit comment créer un connecteur à l'aide du AWS Management Console.

**Création d'un connecteur à l'aide du AWS Management Console**

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Dans le volet gauche, sous **MSK Connect**, choisissez **Connecteurs**.

1. Sélectionnez **Créer un connecteur**.

1. Vous pouvez choisir entre utiliser un plugin personnalisé existant pour créer le connecteur ou créer d'abord un nouveau plugin personnalisé. Pour plus d'informations sur les plugins personnalisés et sur la façon de les créer, consultez [Créez des plugins personnalisés](msk-connect-plugins.md). Dans cette procédure, supposons que vous souhaitez utiliser un plugin personnalisé. Dans la liste des plugins personnalisés, recherchez celui que vous souhaitez utiliser, cochez la case située à gauche, puis choisissez **Suivant**.

1. Entrez un nom et, éventuellement, une description.

1. Choisissez le cluster auquel vous souhaitez vous connecter.

1. Dans la section **Paramètres réseau du connecteur**, choisissez l'une des options suivantes pour le type de réseau :
   + **IPv4**(par défaut) - Pour la connectivité aux destinations IPv4 uniquement
   + **Double pile** : pour la connectivité aux destinations via les deux IPv4 et IPv6 (disponible uniquement si des blocs IPv6 CIDR sont IPv4 associés à vos sous-réseaux)

1. Spécifiez la configuration du connecteur. Les paramètres de configuration que vous devez spécifier dépendent du type de connecteur que vous souhaitez créer. Cependant, certains paramètres sont communs à tous les connecteurs, par exemple les paramètres `connector.class` et `tasks.max`. Voici un exemple de configuration pour le [connecteur récepteur Confluent Amazon S3](https://www.confluent.io/hub/confluentinc/kafka-connect-s3).

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Vous configurez ensuite la capacité de votre connecteur. Vous pouvez choisir entre deux modes de capacité : provisionné et mis à l'échelle automatiquement. Pour plus d'informations sur ces deux options, consultez la section [Comprendre la capacité du connecteur](msk-connect-capacity.md).

1. (Facultatif) Dans la section **Nombre maximal de tâches de mise à l'échelle automatique**, utilisez le champ Nombre maximal de tâches de mise à l'échelle automatique pour saisir le nombre maximum de tâches que vous souhaitez allouer au connecteur lors des opérations de mise à l'échelle automatique. La valeur doit être au moins égale à votre nombre maximum de travailleurs. Si vous ne spécifiez aucune valeur, le connecteur utilise le calcul standard sans limite. Pour de plus amples informations, veuillez consulter [Comprendre le nombre maximal de tâches de mise à l'échelle automatique](msk-connect-max-autoscaling-task-count.md).

1. Choisissez la configuration de worker par défaut ou une configuration de worker personnalisée. Pour plus d'informations sur la création des configurations de worker par défaut, consultez [Comprenez les employés de MSK Connect](msk-connect-workers.md).

1. Vous spécifiez ensuite le rôle d'exécution du service. Il doit s'agir d'un rôle IAM que MSK Connect peut assumer et qui accorde au connecteur toutes les autorisations dont il a besoin pour accéder aux ressources nécessaires AWS . Ces autorisations dépendent de la logique du connecteur. Pour plus d'informations sur la création de ce rôle, consultez [Comprendre le rôle d'exécution des services](msk-connect-service-execution-role.md).

1. Choisissez **Suivant**, passez en revue les informations de sécurité, puis choisissez à nouveau **Suivant**.

1. Définissez les options de journalisation souhaitées, puis sélectionnez **Suivant**. Pour plus d’informations sur la journalisation, consultez [Journalisation pour MSK Connect](msk-connect-logging.md).

1. Sur la page **Réviser et créer**, passez en revue la configuration de votre connecteur et choisissez **Create connector**.

Pour utiliser l'API MSK Connect afin de créer un connecteur, consultez [CreateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateConnector.html). 

Vous pouvez utiliser `UpdateConnector` l'API pour modifier la configuration du connecteur. Pour de plus amples informations, veuillez consulter [Mettre à jour un connecteur](mkc-update-connector.md).

# Mettre à jour un connecteur
<a name="mkc-update-connector"></a>

Cette procédure décrit comment mettre à jour la configuration d'un connecteur MSK Connect existant à l'aide du AWS Management Console.

**Mettre à jour la configuration du connecteur à l'aide du AWS Management Console**

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Dans le volet gauche, sous **MSK Connect**, choisissez **Connecteurs**.

1. Sélectionnez un connecteur existant.

1. Choisissez **Modifier la configuration du connecteur**.

1. Mettez à jour la configuration du connecteur. Vous ne pouvez pas annuler l'`connector.class`utilisation UpdateConnector de. L'exemple suivant montre un exemple de configuration pour le connecteur Confluent Amazon S3 Sink. 

   ```
   connector.class=io.confluent.connect.s3.S3SinkConnector
   tasks.max=2
   topics=my-example-topic
   s3.region=us-east-1
   s3.bucket.name=amzn-s3-demo-bucket
   flush.size=1
   storage.class=io.confluent.connect.s3.storage.S3Storage
   format.class=io.confluent.connect.s3.format.json.JsonFormat
   partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
   key.converter=org.apache.kafka.connect.storage.StringConverter
   value.converter=org.apache.kafka.connect.storage.StringConverter
   schema.compatibility=NONE
   ```

1. Sélectionnez **Soumettre**.

1. Vous pouvez ensuite surveiller l'état actuel de l'opération dans l'onglet **Opérations** du connecteur. 

Pour utiliser l'API MSK Connect afin de mettre à jour la configuration d'un connecteur, consultez [UpdateConnector](https://docs.aws.amazon.com/MSKC/latest/mskc/API_UpdateConnector.html).

# Connexion à partir de connecteurs
<a name="msk-connect-from-connectors"></a>

Les bonnes pratiques suivantes peuvent améliorer les performances de votre connectivité à Amazon MSK Connect.

## Ne vous chevauchez pas IPs pour Amazon VPC peering ou Transit Gateway
<a name="CIDR-ip-ranges"></a>

Si vous utilisez Amazon VPC peering ou Transit Gateway avec Amazon MSK Connect, ne configurez pas votre connecteur pour accéder aux ressources VPC peered comprises dans les plages CIDR : IPs 
+ "10.99.0.0/16"
+ "192.168.0.0/16"
+ "172.21.0.0/16"

# Créez des plugins personnalisés
<a name="msk-connect-plugins"></a>

Un plugin est une AWS ressource qui contient le code qui définit la logique de votre connecteur. Vous chargez un fichier JAR (ou un fichier ZIP contenant un ou plusieurs fichiers JAR) dans un compartiment S3 et vous spécifiez l'emplacement du compartiment lorsque vous créez le plugin. Lorsque le plugin est créé, MSK Connect copie le contenu de l'objet S3 à ce moment-là. Il ne maintient pas de lien vers l'objet S3, de sorte que les modifications ultérieures apportées à l'objet n'affecteront ni le plugin ni ses connecteurs. Lorsque vous créez un connecteur, vous spécifiez le plugin que MSK Connect doit utiliser. La relation entre les plugins et les connecteurs est la suivante one-to-many : vous pouvez créer un ou plusieurs connecteurs à partir du même plugin.

**Note**  
Les plugins personnalisés ne peuvent pas être mis à jour sur place. Pour utiliser une nouvelle version du code de votre plugin, supprimez tous les connecteurs qui font référence au plugin, supprimez le plugin, puis recréez-le.

**Packaging de dépendances pour les plugins personnalisés**  
Nous vous recommandons d'inclure tous les fichiers JAR et dépendances requis pour votre plugin. Package de votre connecteur sous la forme de l'un des éléments suivants :  
Un fichier ZIP contenant tous les fichiers JAR et les dépendances requis pour le plugin.
Un seul uber JAR contenant tous les fichiers de classe du plugin et de ses dépendances.
Le fait de ne pas regrouper les dépendances de vos plugins peut avoir un impact sur la disponibilité ou la compatibilité dans l'environnement d'exécution et provoquer des erreurs inattendues.

Pour plus d'informations sur le développement du code d'un connecteur, consultez le [Guide de développement des connecteurs](https://kafka.apache.org/documentation/#connect_development) dans la documentation d'Apache Kafka.

**Création d'un plugin personnalisé à l'aide du AWS Management Console**

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Dans le volet de gauche, sous **MSK Connect**, choisissez **Plugins personnalisés**.

1. Choisissez **Créer un plugin personnalisé**.

1. Choisissez **Parcourir S3**.

1. Dans la liste des compartiments S3, choisissez le compartiment contenant le fichier JAR ou ZIP du plugin.

1. Dans la liste des objets, cochez la case située à gauche du fichier JAR ou ZIP du plugin, puis sélectionnez **Choisir**.

1. Choisissez **Créer un plugin personnalisé**.

Pour utiliser l'API MSK Connect afin de créer un plugin personnalisé, consultez [CreateCustomPlugin](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateCustomPlugin.html).

# Comprenez les employés de MSK Connect
<a name="msk-connect-workers"></a>

Un worker est un processus de machine virtuelle Java (JVM) qui exécute la logique du connecteur. Chaque worker crée un ensemble de tâches qui s'exécutent dans des threads parallèles et se chargent de copier les données. Les tâches ne stockent pas l'état et peuvent donc être démarrées, arrêtées ou redémarrées à tout moment afin de fournir un pipeline de données résilient et évolutif. Les modifications du nombre de workers, qu'elles soient dues à un événement de mise à l'échelle ou à des défaillances inattendues, sont automatiquement détectées par les autres workers. Ils se coordonnent pour rééquilibrer les tâches entre les workers restants. Les workers de Connect utilisent les groupes de consommateurs d'Apache Kafka pour coordonner et rééquilibrer leurs tâches.

Si les exigences de capacité de votre connecteur sont variables ou difficiles à estimer, vous pouvez laisser MSK Connect ajuster le nombre de workers selon vos besoins entre une limite inférieure et une limite supérieure que vous spécifiez. Vous pouvez également spécifier le nombre exact de workers que vous souhaitez exécuter sur votre logique de connecteur. Pour de plus amples informations, veuillez consulter [Comprendre la capacité du connecteur](msk-connect-capacity.md).

**Les employés de MSK Connect consomment des adresses IP**  
Les employés de MSK Connect consomment des adresses IP dans les sous-réseaux fournis par le client. Chaque travailleur utilise une adresse IP provenant de l'un des sous-réseaux fournis par le client. Vous devez vous assurer que vous disposez d'un nombre suffisant d'adresses IP disponibles dans les sous-réseaux fournis à une CreateConnector demande pour tenir compte de leur capacité spécifiée, en particulier lors du dimensionnement automatique des connecteurs où le nombre de travailleurs peut fluctuer.

## Configuration du processus worker par défaut
<a name="msk-connect-default-worker-config"></a>

MSK Connect fournit la configuration de worker par défaut suivante :

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# Propriétés de configuration de l'environnement de worker compatibles
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect fournit une configuration de worker par défaut. Vous avez également la possibilité de créer une configuration de worker personnalisée à utiliser avec vos connecteurs. La liste suivante inclut des informations sur les propriétés de configuration de worker prises en charge ou non par Amazon MSK Connect.
+ Seules les propriétés `key.converter` et `value.converter` sont obligatoires.
+ MSK Connect prend en charge les propriétés de configuration `producer.` suivantes.

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect prend en charge les propriétés de configuration `consumer.` suivantes.

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ Toutes les autres propriétés de configuration qui ne commencent pas par les préfixes `producer.` ou sont `consumer.` prises en charge, *sauf* les propriétés suivantes. 

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

Pour plus d'informations sur les propriétés de configuration de worker et ce qu'elles représentent, consultez [Kafka Connect Configs](https://kafka.apache.org/documentation/#connectconfigs) dans la documentation Apache Kafka.

# Création d'une configuration de travail personnalisée
<a name="msk-connect-create-custom-worker-config"></a>

Cette procédure décrit comment créer une configuration de travail personnalisée à l'aide du AWS Management Console.

**Création d'une configuration de travail personnalisée à l'aide du AWS Management Console**

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/).

1. Dans le volet gauche, sous **MSK Connect**, choisissez **Configurations de worker**.

1. Choisissez **Créer une configuration de worker**.

1. Entrez un nom et une description facultative, puis ajoutez les propriétés et les valeurs que vous souhaitez leur attribuer.

1. Choisissez **Créer une configuration de worker**.

Pour utiliser l'API MSK Connect afin de créer une configuration de travail, consultez [CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html).

# Gérez les décalages du connecteur source à l'aide de `offset.storage.topic`
<a name="msk-connect-manage-connector-offsets"></a>

Cette section fournit des informations qui vous aideront à gérer les décalages des connecteurs source à l'aide de la *rubrique Stockage des décalages*. La rubrique du stockage des décalages est une rubrique interne que Kafka Connect utilise pour stocker les décalages de configuration des connecteurs et des tâches.

## Considérations
<a name="msk-connect-manage-connector-offsets-considerations"></a>

Tenez compte des éléments suivants lorsque vous gérez les décalages du connecteur source.
+ Pour spécifier une rubrique de stockage des décalages, indiquez le nom de la rubrique Kafka dans lequel les décalages des connecteurs sont stockés en tant que valeur pour `offset.storage.topic` dans votre configuration de worker.
+ Soyez prudent lorsque vous modifiez la configuration d'un connecteur. La modification des valeurs de configuration peut entraîner un comportement involontaire du connecteur si un connecteur source utilise les valeurs de la configuration pour saisir des enregistrements de décalage. Pour plus d'informations, nous vous recommandons de consulter la documentation de votre plugin.
+ **Personnaliser le nombre de partitions par défaut** : en plus de personnaliser la configuration de worker en ajoutant `offset.storage.topic`, vous pouvez personnaliser le nombre de partitions pour les rubriques de stockage des décalages et des statuts. Les partitions par défaut pour les rubriques internes sont les suivantes.
  + `config.storage.topic` : 1, non configurable, doit être une rubrique à partition unique
  + `offset.storage.topic` : 25, configurable en fournissant `offset.storage.partitions`
  + `status.storage.topic` : 5, configurable en fournissant `status.storage.partitions`
+ **Suppression manuelle des rubriques** : Amazon MSK Connect crée de nouvelles rubriques internes à Kafka Connect (le nom de la rubrique commence par `__amazon_msk_connect`) à chaque déploiement de connecteurs. Les anciennes rubriques attachées à des connecteurs supprimés ne sont pas automatiquement supprimées car les rubriques internes, telles que `offset.storage.topic`, peuvent être réutilisées entre les connecteurs. Cependant, vous pouvez supprimer manuellement les rubriques internes non utilisées créées par MSK Connect. Les rubriques internes sont nommées selon le format `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id`.

  L'expression régulière `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` peut être utilisée pour supprimer les rubriques internes. Vous ne devez pas supprimer une rubrique interne actuellement utilisée par un connecteur en cours d'exécution.
+ **Utilisation du même nom pour les rubriques internes créées par MSK Connect**  : si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. La propriété `offset.storage.topic` peut être définie à l'aide de la configuration de worker pour attribuer le même nom à `offset.storage.topic` et réutilisée entre différents connecteurs. Cette configuration est décrite dans [Gestion des décalages de connecteurs](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config). MSK Connect n'autorise pas que les différents connecteurs partagent `config.storage.topic` et`status.storage.topic`. Ces rubriques sont créées chaque fois que vous créez un nouveau connecteur dans MSKC. Ils sont automatiquement nommés selon le format `__amazon_msk_connect_<status|configs>_connector_name_connector_id` et sont donc différents selon les connecteurs que vous créez.

# Utiliser la rubrique de stockage offset par défaut
<a name="msk-connect-default-offset-storage-topic"></a>

Par défaut, Amazon MSK Connect génère une nouvelle rubrique de stockage des décalages sur votre cluster Kafka pour chaque connecteur que vous créez. MSK construit le nom de rubrique par défaut en utilisant des parties de l'ARN du connecteur. Par exemple, `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`. 

# Rubrique Utiliser le stockage offset personnalisé
<a name="msk-connect-set-offset-storage-topic"></a>

Pour assurer la continuité des décalages entre les connecteurs source, vous pouvez utiliser une rubrique de stockage des décalages de votre choix au lieu de la rubrique par défaut. La spécification d'une rubrique de stockage des décalages vous aide à accomplir des tâches telles que la création d'un connecteur source qui reprend la lecture à partir du dernier décalage d'un connecteur précédent.

Pour spécifier une rubrique de stockage des décalages, vous devez fournir une valeur pour la propriété `offset.storage.topic` dans votre configuration de worker avant de créer un connecteur. Si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. Si vous créez une rubrique de stockage de décalages personnalisée, vous devez définir [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) sur `compact` dans la configuration de votre rubrique.

**Note**  
Si vous spécifiez une rubrique de stockage des décalages lorsque vous créez un connecteur *récepteur*, MSK Connect crée la rubrique si elle n'existe pas déjà. Toutefois, cette rubrique ne sera pas utilisée pour enregistrer les décalages du connecteur.   
Les décalages du connecteur récepteur sont plutôt gérés à l'aide du protocole de groupe de consommateurs Kafka. Chaque connecteur récepteur crée un groupe nommé `connect-{CONNECTOR_NAME}`. Tant que le groupe de consommateurs existe, tous les connecteurs récepteurs successifs que vous créez avec la même valeur `CONNECTOR_NAME` seront maintenus à partir du dernier décalage validé.

**Important**  
Si vous souhaitez mettre à jour une configuration de connecteur existante tout en préservant la continuité du décalage, utilisez l' UpdateConnector API. Pour de plus amples informations, veuillez consulter [Mettre à jour un connecteur](mkc-update-connector.md).

**Example : Spécification d'un sujet de stockage offset lors de la recréation d'un connecteur source**  
Si vous devez supprimer et recréer un connecteur tout en maintenant la continuité du décalage, vous pouvez spécifier un sujet de stockage du décalage dans votre configuration de travail. Supposons, par exemple, que vous disposiez d'un connecteur de capture des données modifiées (CDC) et que vous souhaitiez le recréer sans perdre votre place dans le flux CDC. Les étapes suivantes expliquent comment effectuer cette tâche.  

1. Sur votre machine cliente, exécutez la commande suivante pour trouver le nom de la rubrique de stockage des décalages de votre connecteur. Remplacez `<bootstrapBrokerString>` par la chaîne de l'agent d'amorçage de votre cluster. Pour obtenir des instructions sur l'obtention de votre chaîne de l'agent d'amorçage, consultez [Obtenez les courtiers bootstrap pour un cluster Amazon MSK](msk-get-bootstrap-brokers.md).

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   La sortie suivante présente une liste de toutes les rubriques du cluster, y compris les rubriques du connecteur interne par défaut. Dans cet exemple, le connecteur CDC existant utilise la [rubrique de stockage des décalages par défaut](msk-connect-default-offset-storage-topic.md) créée par MSK Connect. C'est pourquoi la rubrique de stockage des décalages est appelée `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`.

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. Ouvrez la console Amazon MSK à l'adresse [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk).

1. Choisissez votre connecteur dans la liste des **connecteurs**. Copiez et enregistrez le contenu du champ de **configuration du connecteur** afin de pouvoir le modifier et l'utiliser pour créer le nouveau connecteur.

1. Pour supprimer la connecteur, choisissez **Supprimer**. Puis saisissez le nom du connecteur dans le champ d'entrée du texte pour confirmer la suppression.

1. Créez une configuration de worker personnalisée avec des valeurs adaptées à votre scénario. Pour obtenir des instructions, veuillez consulter [Création d'une configuration de travail personnalisée](msk-connect-create-custom-worker-config.md).

   Dans votre configuration de worker, vous devez spécifier le nom de la rubrique de stockage des décalages que vous avez précédemment récupérée en tant que valeur pour `offset.storage.topic` comme dans la configuration suivante. 

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**Important**  
Vous devez donner à votre nouveau connecteur le même nom que l'ancien connecteur.

   Créez un nouveau connecteur à l'aide de la configuration de worker que vous avez configurée à l'étape précédente. Pour obtenir des instructions, veuillez consulter [Création d’un connecteur](mkc-create-connector-intro.md).

# Tutoriel : Externalisation d'informations sensibles à l'aide de fournisseurs de configuration
<a name="msk-connect-config-provider"></a>

Cet exemple montre comment externaliser des informations sensibles pour Amazon MSK Connect à l'aide d'un fournisseur de configuration open source. Un fournisseur de configuration vous permet de spécifier des variables plutôt que du texte brut dans une configuration de connecteur ou de worker, et les workers exécutés dans votre connecteur résolvent ces variables au moment de l'exécution. Cela empêche le stockage d'informations d'identification et d'autres secrets en texte brut. Dans cet exemple, le fournisseur de configuration prend en charge la récupération des paramètres de configuration depuis AWS Secrets Manager, Amazon S3 et Systems Manager (SSM). À [l'étape 2](#msk-connect-config-providers), vous pouvez voir comment configurer le stockage et la récupération d'informations sensibles pour le service que vous souhaitez configurer.

## Considérations
<a name="msk-connect-config-providers-considerations"></a>

Tenez compte des points suivants lorsque vous utilisez le fournisseur de configuration MSK avec Amazon MSK Connect :
+ Attribuez les autorisations appropriées lors de l'utilisation des fournisseurs de configuration au rôle d'exécution du service IAM.
+ Définissez les fournisseurs de configuration dans les configurations du worker et leur implémentation dans la configuration du connecteur.
+ Des valeurs de configuration sensibles peuvent apparaître dans les journaux des connecteurs si un plugin ne définit pas ces valeurs comme secrètes. Kafka Connect traite les valeurs de configuration non définies de la même manière que toute autre valeur en texte brut. Pour en savoir plus, veuillez consulter la section [Empêcher l'apparition de secrets dans les journaux des connecteurs](msk-connect-logging.md#msk-connect-logging-secrets).
+ Par défaut, MSK Connect redémarre fréquemment un connecteur lorsque celui-ci utilise un fournisseur de configuration. Pour désactiver ce comportement de redémarrage, vous pouvez définir la valeur `config.action.reload` sur `none` dans la configuration de votre connecteur.

## Créez un plugin personnalisé et téléchargez-le sur S3
<a name="msk-connect-config-providers-create-custom-plugin"></a>

 Pour créer un plugin personnalisé, créez un fichier zip contenant le connecteur msk-config-provider en exécutant les commandes suivantes sur votre machine locale.

**Pour créer un plugin personnalisé en utilisant une fenêtre de terminal et Debezium comme connecteur**

Utilisez la AWS CLI pour exécuter des commandes en tant que superutilisateur avec des informations d'identification vous permettant d'accéder à votre compartiment AWS S3. Pour plus d'informations sur l'installation et la configuration de la AWS CLI, consultez la section [Getting started with the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) dans le *guide de AWS Command Line Interface l'utilisateur*. Pour plus d'informations sur l'utilisation de l' AWS interface de ligne de commande avec Amazon S3, consultez la section [Utilisation d'Amazon S3 avec l' AWS interface de ligne de commande](https://docs.aws.amazon.com/cli/latest/userguide/cli-services-s3.html) dans le *guide de AWS Command Line Interface l'utilisateur*.

1. Dans une fenêtre de terminal, créez un dossier nommé `custom-plugin` dans votre espace de travail à l'aide de la commande suivante.

   ```
   mkdir custom-plugin && cd custom-plugin
   ```

1. Téléchargez la dernière version stable du **plug-in du connecteur MySQL** depuis le [site Debezium](https://debezium.io/releases/) à l'aide de la commande suivante.

   ```
   wget https://repo1.maven.org/maven2/io/debezium/debezium-connectormysql/
   2.2.0.Final/debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

   Extrayez le fichier gzip téléchargé dans le dossier `custom-plugin` à l'aide de la commande suivante.

   ```
   tar xzf debezium-connector-mysql-2.2.0.Final-plugin.tar.gz
   ```

1. Téléchargez le [fichier zip du fournisseur de configuration MSK](https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip) à l'aide de la commande suivante.

   ```
   wget https://github.com/aws-samples/msk-config-providers/releases/download/r0.4.0/msk-config-providers-0.4.0-with-dependencies.zip
   ```

   Extrayez le fichier zip téléchargé dans le dossier `custom-plugin` à l'aide de la commande suivante.

   ```
   unzip msk-config-providers-0.4.0-with-dependencies.zip
   ```

1. Compressez le contenu du fournisseur de configuration MSK à partir de l'étape ci-dessus et du connecteur personnalisé dans un seul fichier nommé `custom-plugin.zip`.

   ```
   zip -r ../custom-plugin.zip * 
   ```

1. Chargez le fichier sur S3 pour référence ultérieure.

   ```
   aws s3 cp ../custom-plugin.zip s3:<S3_URI_BUCKET_LOCATION>
   ```

1. Sur la console Amazon MSK, dans la section **MSK Connect**, choisissez **Custom Plugin**, puis **Create custom plugin** et parcourez le bucket **s3 : < *S3\$1URI\$1BUCKET\$1LOCATION* >** S3 pour sélectionner le fichier ZIP du plugin personnalisé que vous venez de télécharger.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/images/s3-object-browser.png)

1. Entrez **debezium-custom-plugin** comme nom du plugin. Si vous le souhaitez, saisissez une description et choisissez **Créer un plugin personnalisé**.  
![\[Amazon S3 bucket interface showing a single custom-plugin.zip file in the debezium folder.\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/images/create-custom-plugin.png)

## Configuration des paramètres et des autorisations pour différents fournisseurs
<a name="msk-connect-config-providers"></a>

Vous pouvez configurer les valeurs des paramètres dans les trois services suivants :
+ Secrets Manager
+ Systems Manager Parameter Store
+ S3 - Simple Storage Service

Choisissez l'un des onglets ci-dessous pour obtenir des instructions sur la configuration des paramètres et des autorisations pertinentes pour ce service.

------
#### [ Configure in Secrets Manager ]

**Pour configurer les valeurs de paramètres dans Secrets Manager**

1. Ouvrez la [console Secrets Manager](https://console.aws.amazon.com/secretsmanager/).

1. Créez un nouveau secret pour stocker vos informations d'identification ou vos secrets. Pour obtenir des instructions, voir [Création d'un AWS Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) dans le *guide de AWS Secrets Manager l'utilisateur*.

1. Copiez l'ARN de votre secret.

1. Ajoutez les autorisations Secrets Manager de l'exemple de politique suivant à votre [rôle d'exécution du service](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Remplacez l'exemple d'ARN`arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234`, par l'ARN de votre secret.

1. Ajoutez la configuration du worker et les instructions relatives au connecteur.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Action": [
                       "secretsmanager:GetResourcePolicy",
                       "secretsmanager:GetSecretValue",
                       "secretsmanager:DescribeSecret",
                       "secretsmanager:ListSecretVersionIds"
                   ],
                   "Resource": [
                   "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
                   ]
               }
           ]
       }
   ```

1. Pour utiliser le fournisseur de configuration Secrets Manager, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 3 :

   ```
   # define name of config provider:
   
   config.providers = secretsmanager
   
   # provide implementation classes for secrets manager:
   
   config.providers.secretsmanager.class = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.secretsmanager.param.region = us-east-1
   ```

1. Pour utiliser le fournisseur de configuration Secrets Manager, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 4.

   ```
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   ```

Vous pouvez également utiliser l'étape ci-dessus avec d'autres fournisseurs de configuration.

------
#### [ Configure in Systems Manager Parameter Store ]

**Pour configurer et utiliser des valeurs de paramètres dans Systems Manager Parameter Store**

1. Ouvrez la [console Systems Manager](https://console.aws.amazon.com/systems-manager/).

1. Dans le panneau de navigation, choisissez **Stockage de paramètres**.

1. Créez un nouveau paramètre à stocker dans le Systems Manager. Pour obtenir des instructions, reportez-vous à la section [Créer un paramètre Systems Manager (console)](https://docs.aws.amazon.com/systems-manager/latest/userguide/parameter-create-console.html) dans le Guide de AWS Systems Manager l'utilisateur.

1. Copiez l'ARN de votre paramètre.

1. Ajoutez les autorisations Systems Manager de l'exemple de politique suivant à votre [rôle d'exécution du service](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). *<arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName>*Remplacez-le par l'ARN de votre paramètre.  
****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": [
                       "ssm:GetParameterHistory",
                       "ssm:GetParametersByPath",
                       "ssm:GetParameters",
                       "ssm:GetParameter"
                   ],
                   "Resource": "arn:aws:ssm:us-east-1:123456789000:parameter/MyParameterName"
               }
           ]
       }
   ```

1. Pour utiliser le fournisseur de configuration Parameter Store, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 3 :

   ```
   # define name of config provider:
   
   config.providers = ssm
   
   # provide implementation classes for parameter store:
   
   config.providers.ssm.class = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   config.providers.ssm.param.region = us-east-1
   ```

1. Pour utiliser le fournisseur de configuration Parameter Store, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 5.

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   ```

   Vous pouvez également grouper les deux étapes ci-dessus avec d'autres fournisseurs de configuration.

------
#### [ Configure in Amazon S3 ]

**Pour configurer objects/files dans Amazon S3**

1. Ouvrez la [console Amazon S3](https://console.aws.amazon.com/s3/).

1. Chargez votre objet dans un compartiment dans S3. Pour obtenir des instructions, consultez [Chargement d'objets](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html).

1. Copiez l'ARN de votre objet.

1. Ajoutez les autorisations de lecture d'objets Amazon S3 de l'exemple de politique suivant à votre [rôle d'exécution du service](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-service-execution-role.html). Remplacez l'exemple d'ARN`arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>`, par l'ARN de votre objet.  
****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
               {
                   "Sid": "VisualEditor0",
                   "Effect": "Allow",
                   "Action": "s3:GetObject",
                   "Resource": "arn:aws:s3:::<MY_S3_BUCKET/path/to/custom-plugin.zip>"
               }
           ]
       }
   ```

1. Pour utiliser le fournisseur de configuration Amazon S3, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 3 :

   ```
   # define name of config provider:
   
   config.providers = s3import
   # provide implementation classes for S3:
   
   config.providers.s3import.class = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   ```

1. Pour utiliser le fournisseur de configuration Amazon S3, copiez les lignes de code suivantes dans la zone de texte de configuration du worker à l'étape 4.

   ```
   #Example implementation for S3 object
   
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

   Vous pouvez également grouper les deux étapes ci-dessus avec d'autres fournisseurs de configuration.

------

## Créez une configuration de worker personnalisée avec des informations sur votre fournisseur de configuration
<a name="msk-connect-config-providers-create-custom-config"></a>

1. Sélectionnez **Configurations de worker** dans la section **Amazon MSK Connect**.

1. Sélectionnez **Créer une configuration de worker**.

1. Entrez `SourceDebeziumCustomConfig` dans la zone de texte Nom de la configuration du worker. La description est facultative.

1. Copiez le code de configuration approprié en fonction des fournisseurs souhaités, puis collez-le dans la zone de texte **Configuration du worker**.

1. Voici un exemple de configuration de worker pour les trois fournisseurs :

   ```
   key.converter=org.apache.kafka.connect.storage.StringConverter
   key.converter.schemas.enable=false
   value.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter.schemas.enable=false
   offset.storage.topic=offsets_my_debezium_source_connector
   
   # define names of config providers:
   
   config.providers=secretsmanager,ssm,s3import
   
   # provide implementation classes for each provider:
   
   config.providers.secretsmanager.class    = com.amazonaws.kafka.config.providers.SecretsManagerConfigProvider
   config.providers.ssm.class               = com.amazonaws.kafka.config.providers.SsmParamStoreConfigProvider
   config.providers.s3import.class          = com.amazonaws.kafka.config.providers.S3ImportConfigProvider
   
   # configure a config provider (if it needs additional initialization), for example you can provide a region where the secrets or parameters are located:
   
   
   config.providers.secretsmanager.param.region = us-east-1
   config.providers.ssm.param.region = us-east-1
   ```

1. Cliquez sur Créer une configuration de worker.

## Création du connecteur
<a name="msk-connect-config-providers-create-connector"></a>

1. Créez un nouveau connecteur en suivant les instructions de la section [Créer un nouveau connecteur](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

1. Choisissez le fichier `custom-plugin.zip` que vous avez chargé dans votre compartiment S3 dans la section [Créez un plugin personnalisé et téléchargez-le sur S3](#msk-connect-config-providers-create-custom-plugin) comme source du plugin personnalisé.

1. Copiez le code de configuration approprié en fonction des fournisseurs souhaités, puis collez-le dans le champ Configuration du connecteur.

1. Voici un exemple de configuration de connecteur pour les trois fournisseurs :

   ```
   #Example implementation for parameter store variable
   schema.history.internal.kafka.bootstrap.servers=${ssm::MSKBootstrapServerAddress}
   
   #Example implementation for secrets manager variable
   database.user=${secretsmanager:MSKAuroraDBCredentials:username}
   database.password=${secretsmanager:MSKAuroraDBCredentials:password}
   
   #Example implementation for Amazon S3 file/object
   database.ssl.truststore.location = ${s3import:us-west-2:my_cert_bucket/path/to/trustore_unique_filename.jks}
   ```

1. Sélectionnez **Utiliser une configuration personnalisée** et choisissez dans le **SourceDebeziumCustomConfig**menu déroulant **Configuration du travailleur**.

1. Suivez les étapes restantes indiquées dans les instructions de la section [Créer un connecteur](https://docs.aws.amazon.com/msk/latest/developerguide/mkc-create-connector.html).

# Rôles et politiques IAM pour MSK Connect
<a name="msk-connect-iam"></a>

Cette section vous aide à configurer les politiques et les rôles IAM appropriés pour déployer et gérer Amazon MSK Connect en toute sécurité au sein de votre AWS environnement. Les sections suivantes expliquent le rôle d'exécution du service qui doit être utilisé avec MSK Connect, y compris la politique de confiance requise et les autorisations supplémentaires nécessaires lors de la connexion à un cluster MSK authentifié par IAM. La page fournit également des exemples de politiques IAM complètes permettant d'accorder un accès complet aux fonctionnalités de MSK Connect, ainsi que des détails sur les politiques AWS gérées disponibles pour le service. 

**Topics**
+ [Comprendre le rôle d'exécution des services](msk-connect-service-execution-role.md)
+ [Exemple de politique IAM pour MSK Connect](mkc-iam-policy-examples.md)
+ [Prévenir le problème des adjoints confus entre les services](cross-service-confused-deputy-prevention.md)
+ [AWS politiques gérées pour MSK Connect](mkc-security-iam-awsmanpol.md)
+ [Utiliser des rôles liés à un service pour MSK Connect](mkc-using-service-linked-roles.md)

# Comprendre le rôle d'exécution des services
<a name="msk-connect-service-execution-role"></a>

**Note**  
Amazon MSK Connect ne prend pas en charge l'utilisation du [rôle lié au service](mkc-using-service-linked-roles.md) comme rôle d'exécution du service. Vous devez créer un rôle d'exécution de service distinct. Pour savoir comment créer un rôle IAM personnalisé, consultez la section [Création d'un rôle pour déléguer des autorisations à un AWS service](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html) dans le Guide de l'*utilisateur IAM*.

Lorsque vous créez un connecteur avec MSK Connect, vous devez spécifier un rôle Gestion des identités et des accès AWS (IAM) à utiliser avec celui-ci. Votre rôle d'exécution du service doit avoir la politique d'approbation suivante pour que MSK Connect puisse l'assumer. Pour plus d'informations sur les clés de contexte de condition de cette politique, veuillez consulter [Prévenir le problème des adjoints confus entre les services](cross-service-confused-deputy-prevention.md).

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
          "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/myConnector/abc12345-abcd-4444-a8b9-123456f513ed-2"
        }
      }
    }   
  ]
}
```

------

Si le cluster Amazon MSK que vous souhaitez utiliser avec votre connecteur est un cluster qui utilise l'authentification IAM, vous devez ajouter la politique d'autorisation suivante au rôle d'exécution du service du connecteur. Pour plus d'informations sur la manière de trouver l'UUID de votre cluster et sur la manière de créer un sujet ARNs, consultez. [Ressources relatives aux politiques d'autorisation](kafka-actions.md#msk-iam-resources)

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:000000000001:cluster/testClusterName/300d0000-0000-0005-000f-00000000000b-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/myCluster/300a0000-0000-0003-000a-00000000000b-6/__amazon_msk_connect_read"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:WriteData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_write"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:CreateTopic",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeTopic"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:topic/testCluster/300f0000-0000-0008-000d-00000000000m-7/__amazon_msk_connect_*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/__amazon_msk_connect_*",
                "arn:aws:kafka:us-east-1:123456789012:group/testCluster/300d0000-0000-0005-000f-00000000000b-1/connect-*"
            ]
        }
    ]
}
```

------

Selon le type de connecteur, vous devrez peut-être également associer au rôle d'exécution du service une politique d'autorisation lui permettant d'accéder aux AWS ressources. Par exemple, si votre connecteur doit envoyer des données à un compartiment S3, le rôle d'exécution du service doit disposer d'une politique d'autorisations autorisant l'écriture dans ce compartiment. À des fins de test, vous pouvez utiliser l'une des politiques IAM prédéfinies qui offrent un accès complet, comme `arn:aws:iam::aws:policy/AmazonS3FullAccess`. Toutefois, pour des raisons de sécurité, nous vous recommandons d'utiliser la politique la plus restrictive qui permette à votre connecteur de lire depuis la AWS source ou d'écrire sur le AWS récepteur.

# Exemple de politique IAM pour MSK Connect
<a name="mkc-iam-policy-examples"></a>

Pour donner à un utilisateur non administrateur un accès complet à toutes les fonctionnalités de MSK Connect, associez une politique telle que la suivante au rôle IAM de l'utilisateur.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "MSKConnectFullAccess",
      "Effect": "Allow",
      "Action": [
        "kafkaconnect:CreateConnector",
        "kafkaconnect:DeleteConnector",
        "kafkaconnect:DescribeConnector",
        "kafkaconnect:ListConnectors",
        "kafkaconnect:UpdateConnector",
        "kafkaconnect:CreateCustomPlugin",
        "kafkaconnect:DeleteCustomPlugin",
        "kafkaconnect:DescribeCustomPlugin",
        "kafkaconnect:ListCustomPlugins",
        "kafkaconnect:CreateWorkerConfiguration",
        "kafkaconnect:DeleteWorkerConfiguration",
        "kafkaconnect:DescribeWorkerConfiguration",
        "kafkaconnect:ListWorkerConfigurations"
      ],
      "Resource": "*"
    },
    {
      "Sid": "IAMPassRole",
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789012:role/MSKConnectServiceRole",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "kafkaconnect.amazonaws.com"
        }
      }
    },
    {
      "Sid": "EC2NetworkAccess",
      "Effect": "Allow",
      "Action": [
        "ec2:CreateNetworkInterface",
        "ec2:DescribeNetworkInterfaces",
        "ec2:DeleteNetworkInterface",
        "ec2:DescribeVpcs",
        "ec2:DescribeSubnets",
        "ec2:DescribeSecurityGroups"
      ],
      "Resource": "*"
    },
    {
      "Sid": "MSKClusterAccess",
      "Effect": "Allow",
      "Action": [
        "kafka:DescribeCluster",
        "kafka:DescribeClusterV2",
        "kafka:GetBootstrapBrokers"
      ],
      "Resource": "arn:aws:kafka:us-east-1:123456789012:cluster/myCluster/"
    },
    {
      "Sid": "MSKLogGroupAccess",
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams",
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:us-east-1:123456789012:log-group:/aws/msk-connect/*"
      ]
    },
    {
      "Sid": "S3PluginAccess",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins",
        "arn:aws:s3:::amzn-s3-demo-bucket1-custom-plugins/*"
      ]
    }
  ]
}
```

------

# Prévenir le problème des adjoints confus entre les services
<a name="cross-service-confused-deputy-prevention"></a>

Le problème de député confus est un problème de sécurité dans lequel une entité qui n’est pas autorisée à effectuer une action peut contraindre une entité plus privilégiée à le faire. En AWS, l'usurpation d'identité interservices peut entraîner un problème de confusion chez les adjoints. L’usurpation d’identité entre services peut se produire lorsqu’un service (le *service appelant*) appelle un autre service (le *service appelé*). Le service appelant peut être manipulé et ses autorisations utilisées pour agir sur les ressources d’un autre client auxquelles on ne serait pas autorisé à accéder autrement. Pour éviter cela, AWS fournit des outils qui vous aident à protéger vos données pour tous les services avec des principaux de service qui ont eu accès aux ressources de votre compte. 

Nous vous recommandons d'utiliser les clés de contexte de condition globale [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcearn) et [https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourceaccount) dans les politiques de ressources afin de limiter les autorisations à la ressource octroyées par MSK Connect à un autre service. Si la valeur `aws:SourceArn` ne contient pas l'ID de compte (par exemple, un Amazon Resource Name (ARN) d'un compartiment Amazon S3 ne contient pas l'ID de compte), vous devez utiliser les deux clés de contexte de condition globale pour limiter les autorisations. Si vous utilisez les deux clés de contexte de condition globale et que la valeur `aws:SourceArn` contient l'ID de compte, la valeur `aws:SourceAccount` et le compte dans la valeur `aws:SourceArn` doivent utiliser le même ID de compte lorsqu'ils sont utilisés dans la même instruction de politique. Utilisez `aws:SourceArn` si vous souhaitez qu'une seule ressource soit associée à l'accès entre services. Utilisez `aws:SourceAccount` si vous souhaitez autoriser l’association d’une ressource de ce compte à l’utilisation interservices.

Dans le cas de MSK Connect, la valeur de `aws:SourceArn` doit être un connecteur MSK.

Le moyen le plus efficace de se protéger contre le problème de député confus consiste à utiliser la clé de contexte de condition globale `aws:SourceArn` avec l’ARN complet de la ressource. Si vous ne connaissez pas l’ARN complet de la ressource ou si vous spécifiez plusieurs ressources, utilisez la clé de contexte de condition globale `aws:SourceArn` avec des caractères génériques (`*`) pour les parties inconnues de l’ARN. Par exemple, *arn:aws:kafkaconnect:us-east-1:123456789012:connector/\$1* représente tous les connecteurs appartenant au compte portant l'ID 123456789012 dans la région USA Est (Virginie du Nord).

L'exemple suivant montre comment utiliser les clés de contexte de condition globale `aws:SourceArn` et `aws:SourceAccount` dans MSK Connect afin d'éviter le problème de l'adjoint confus. Remplacez *123456789012* et arn:aws:kafkaconnect : *us-east-1* ::connector//par vos informations de connecteur et de connexion. *123456789012* *my-S3-Sink-Connector* *abcd1234-5678-90ab-cdef-1234567890ab* Compte AWS 

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": " kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "123456789012"
        },
        "ArnLike": {
        "aws:SourceArn": "arn:aws:kafkaconnect:us-east-1:123456789012:connector/my-S3-Sink-Connector/abcd1234-5678-90ab-cdef-1234567890ab"
        }
      }
    }   
  ]
}
```

------

# AWS politiques gérées pour MSK Connect
<a name="mkc-security-iam-awsmanpol"></a>

Une politique AWS gérée est une politique autonome créée et administrée par AWS. AWS les politiques gérées sont conçues pour fournir des autorisations pour de nombreux cas d'utilisation courants afin que vous puissiez commencer à attribuer des autorisations aux utilisateurs, aux groupes et aux rôles.

N'oubliez pas que les politiques AWS gérées peuvent ne pas accorder d'autorisations de moindre privilège pour vos cas d'utilisation spécifiques, car elles sont accessibles à tous les AWS clients. Nous vous recommandons de réduire encore les autorisations en définissant des [politiques gérées par le client](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#customer-managed-policies) qui sont propres à vos cas d’utilisation.

Vous ne pouvez pas modifier les autorisations définies dans les politiques AWS gérées. Si les autorisations définies dans une politique AWS gérée sont AWS mises à jour, la mise à jour affecte toutes les identités principales (utilisateurs, groupes et rôles) auxquelles la politique est attachée. AWS est le plus susceptible de mettre à jour une politique AWS gérée lorsqu'une nouvelle politique Service AWS est lancée ou lorsque de nouvelles opérations d'API sont disponibles pour les services existants.

Pour plus d’informations, consultez [Politiques gérées par AWS](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_managed-vs-inline.html#aws-managed-policies) dans le *Guide de l’utilisateur IAM*.

## AWS politique gérée : Amazon MSKConnect ReadOnlyAccess
<a name="security-iam-awsmanpol-AmazonMSKConnectReadOnlyAccess"></a>

Cette politique accorde à l'utilisateur les autorisations nécessaires pour répertorier et décrire les ressources MSK Connect.

Vous pouvez associer la politique `AmazonMSKConnectReadOnlyAccess` à vos identités IAM.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:ListConnectors",
                "kafkaconnect:ListCustomPlugins",
                "kafkaconnect:ListWorkerConfigurations"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeConnector"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:connector/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeCustomPlugin"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:custom-plugin/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafkaconnect:DescribeWorkerConfiguration"
            ],
            "Resource": [
                "arn:aws:kafkaconnect:*:*:worker-configuration/*"
            ]
        }
    ]
}
```

------

## AWS politique gérée : KafkaConnectServiceRolePolicy
<a name="security-iam-awsmanpol-KafkaConnectServiceRolePolicy"></a>

Cette politique accorde au service  Connect les autorisations nécessaires pour créer et gérer les interfaces réseau dotées de la balise `AmazonMSKConnectManaged:true`. Ces interfaces réseau donnent à MSK Connect l'accès réseau aux ressources de votre VPC Amazon, telles qu'un cluster Apache Kafka, une source ou un récepteur.

Vous ne pouvez pas vous associer KafkaConnectServiceRolePolicy à vos entités IAM. Cette politique est attachée à un rôle lié au service qui permet à MSK Connect d'effectuer des actions en votre nom.

------
#### [ JSON ]

****  

```
{
	"Version":"2012-10-17",		 	 	 
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"aws:RequestTag/AmazonMSKConnectManaged": "true"
				},
				"ForAllValues:StringEquals": {
					"aws:TagKeys": "AmazonMSKConnectManaged"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateNetworkInterface"
			],
			"Resource": [
				"arn:aws:ec2:*:*:subnet/*",
				"arn:aws:ec2:*:*:security-group/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:CreateTags"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:CreateAction": "CreateNetworkInterface"
				}
			}
		},
		{
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeNetworkInterfaces",
				"ec2:CreateNetworkInterfacePermission",
				"ec2:AttachNetworkInterface",
				"ec2:DetachNetworkInterface",
				"ec2:DeleteNetworkInterface"
			],
			"Resource": "arn:aws:ec2:*:*:network-interface/*",
			"Condition": {
				"StringEquals": {
					"ec2:ResourceTag/AmazonMSKConnectManaged": "true"
				}
			}
		}
	]
}
```

------

## Mises à jour des politiques AWS gérées par MSK Connect
<a name="security-iam-awsmanpol-updates"></a>

Consultez les détails des mises à jour apportées aux politiques AWS gérées pour MSK Connect depuis que ce service a commencé à suivre ces modifications.


| Modifier | Description | Date | 
| --- | --- | --- | 
|  Mise à jour de la politique en lecture seule de MSK Connect  |  MSK Connect a mis à jour la MSKConnect ReadOnlyAccess politique d'Amazon afin de supprimer les restrictions relatives aux opérations de mise en vente.  | 13 octobre 2021 | 
|  MSK Connect a commencé à suivre les modifications  |  MSK Connect a commencé à suivre les modifications apportées à ses politiques AWS gérées.  | 14 septembre 2021 | 

# Utiliser des rôles liés à un service pour MSK Connect
<a name="mkc-using-service-linked-roles"></a>

Amazon MSK Connect utilise des rôles liés à un [service Gestion des identités et des accès AWS](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html#iam-term-service-linked-role) (IAM). Un rôle lié à un service est un type unique de rôle IAM lié directement à MSK Connect. Les rôles liés au service sont prédéfinis par MSK Connect et incluent toutes les autorisations dont le service a besoin pour appeler AWS d'autres services en votre nom. 

Un rôle lié à un service simplifie la configuration de MSK Connect, car vous n'avez pas besoin d'ajouter manuellement les autorisations requises. MSK Connect définit les autorisations de ses rôles liés à un service ; sauf définition contraire, seul MSK Connect peut endosser ses rôles. Les autorisations définies comprennent la politique d'approbation et la politique d'autorisation. De plus, cette politique d'autorisation ne peut pas être attachée à une autre entité IAM.

Pour plus d'informations sur les autres services qui prennent en charge les rôles liés à un service, veuillez consulter [Services AWS qui fonctionnent avec IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-services-that-work-with-iam.html) et recherchez les services où **Oui **figure dans la colonne **Rôle lié à un service**. Choisissez un **Oui** ayant un lien permettant de consulter les détails du rôle pour ce service.

## Autorisations de rôle lié à un service pour MSK Connect
<a name="slr-permissions"></a>

MSK Connect utilise le rôle lié au service nommé — **AWSServiceRoleForKafkaConnect**Autorise Amazon MSK Connect à accéder aux ressources Amazon en votre nom.

Le rôle AWSService RoleForKafkaConnect lié au service fait confiance au `kafkaconnect.amazonaws.com` service pour assumer le rôle.

Pour obtenir des informations sur la politique d'autorisations utilisé par le rôle, consultez [AWS politique gérée : KafkaConnectServiceRolePolicy](mkc-security-iam-awsmanpol.md#security-iam-awsmanpol-KafkaConnectServiceRolePolicy).

Vous devez configurer les autorisations de manière à permettre à une entité IAM (comme un utilisateur, un groupe ou un rôle) de créer, modifier ou supprimer un rôle lié à un service. Pour plus d'informations, consultez [Autorisations du rôle lié à un service](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#service-linked-role-permissions) dans le *Guide de l'utilisateur IAM*.

## Création d'un rôle lié à un service pour MSK Connect
<a name="create-slr"></a>

Vous n’avez pas besoin de créer manuellement un rôle lié à un service. Lorsque vous créez un connecteur dans le AWS Management Console, le ou l' AWS API AWS CLI, MSK Connect crée le rôle lié au service pour vous. 

Si vous supprimez ce rôle lié à un service et que vous avez ensuite besoin de le recréer, vous pouvez utiliser la même procédure pour recréer le rôle dans votre compte. Lorsque vous créez un connecteur, MSK Connect recrée automatiquement le rôle lié au service. 

## Modification d'un rôle lié à un service pour MSK Connect
<a name="edit-slr"></a>

MSK Connect ne vous permet pas de modifier le rôle lié au AWSService RoleForKafkaConnect service. Une fois que vous avez créé un rôle lié à un service, vous ne pouvez pas changer le nom du rôle, car plusieurs entités peuvent faire référence au rôle. Néanmoins, vous pouvez modifier la description du rôle à l’aide d’IAM. Pour plus d'informations, consultez [Modification d'un rôle lié à un service](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#edit-service-linked-role) dans le *Guide de l'utilisateur IAM*.

## Suppression d'un rôle lié à un service pour MSK Connect
<a name="delete-slr"></a>

Vous pouvez utiliser la console IAM, AWS CLI ou l' AWS API pour supprimer manuellement le rôle lié à un service. Pour ce faire, vous devez d'abord supprimer manuellement tous vos connecteurs MSK Connect, puis supprimer manuellement le rôle. Pour plus d'informations, veuillez consulter [Suppression d'un rôle lié à un service](https://docs.aws.amazon.com/IAM/latest/UserGuide/using-service-linked-roles.html#delete-service-linked-role) dans le *Guide de l'utilisateur IAM*.

## Régions prises en charge pour les rôles liés à un service MSK Connect
<a name="slr-regions"></a>

MSK Connect prend en charge l'utilisation des rôles liés à un service dans toutes les régions où le service est disponible. Pour plus d'informations, consultez [Régions et Points de terminaison AWS](https://docs.aws.amazon.com/general/latest/gr/rande.html).

# Activer l'accès à Internet pour Amazon MSK Connect
<a name="msk-connect-internet-access"></a>

Si votre connecteur pour Amazon MSK Connect a besoin d'un accès à Internet, nous vous recommandons d'utiliser les paramètres Amazon Virtual Private Cloud (VPC) suivants pour activer cet accès.
+ Configurez votre connecteur avec des sous-réseaux privés.
+ Créez une [passerelle NAT](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) publique ou une [instance NAT](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_NAT_Instance.html) pour votre VPC dans un sous-réseau public. Pour plus d'informations, consultez la page [Connect subnets to the Internet or other VPCs using NAT devices](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) du *Guide de l'*Amazon Virtual Private Cloud*utilisateur*. 
+ Autorisez le trafic sortant de vos sous-réseaux privés vers votre passerelle ou instance NAT.

# Configuration d'une passerelle NAT pour Amazon MSK Connect
<a name="msk-connect-internet-access-private-subnets-example"></a>

Les étapes suivantes vous montrent comment configurer une passerelle NAT pour permettre à un connecteur d'accéder à Internet. Vous devez effectuer ces étapes avant de créer un connecteur dans un sous-réseau privé.

## Conditions préalables complètes pour configurer une passerelle NAT
<a name="msk-connect-internet-access-private-subnets-prereq"></a>

Vérifiez que vous avez les éléments suivants.
+ L'ID du Amazon Virtual Private Cloud (VPC) associé à votre cluster. Par exemple, *vpc-123456ab*.
+ Les IDs sous-réseaux privés de votre VPC. Par exemple, *subnet-a1b2c3de*, *subnet-f4g5h6ij*, etc. Vous devez configurer votre connecteur avec des sous-réseaux privés.

## Étapes pour activer l'accès à Internet pour votre connecteur
<a name="msk-connect-internet-access-private-subnets-steps"></a>

**Pour activer l'accès à Internet pour votre connecteur**

1. Ouvrez la Amazon Virtual Private Cloud console à l'adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Créez un sous-réseau public pour votre passerelle NAT avec un nom descriptif et notez l'ID du sous-réseau. Pour obtenir des informations détaillées, veuillez consulter [Créer un sous-réseau dans votre VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Créez une passerelle Internet afin que votre VPC puisse communiquer avec Internet et notez l'ID de passerelle. Attachez la passerelle Internet à votre VPC. Pour de plus amples informations, consultez [Créer et attacher une passerelle Internet](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provisionnez une passerelle NAT publique afin que les hôtes de vos sous-réseaux privés puissent accéder à votre sous-réseau public. Lorsque vous créez la passerelle NAT, sélectionnez le sous-réseau public que vous avez créé précédemment. Pour obtenir des informations, consultez [Créer une passerelle NAT](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating).

1. Configurez vos tables de routage. Vous devez disposer de deux tables de routage au total pour terminer cette configuration. Vous devriez déjà avoir une table de routage principale créée automatiquement en même temps que votre VPC. Au cours de cette étape, vous créez une table de routage supplémentaire pour votre sous-réseau public.

   1. Utilisez les paramètres suivants pour modifier la table de routage principale de votre VPC afin que vos sous-réseaux privés acheminent le trafic vers votre passerelle NAT. Pour obtenir des informations, consultez la section [Utiliser des tables de routage](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) dans le *Guide de l'utilisateur *Amazon Virtual Private Cloud**.  
**Table de routage MSKC privée**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

   1. Suivez les instructions de la section [Créer une table de routage personnalisée](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) pour créer une table de routage pour votre sous-réseau public. Lorsque vous créez la table, entrez un nom descriptif dans le champ **Identification de nom** pour vous aider à identifier le sous-réseau auquel la table est associée. Par exemple, **MSKC public**.

   1. Configurez votre table de routage **MSKC public** à l'aide des paramètres suivants.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/msk-connect-internet-access-private-subnets-example.html)

# Comprendre les noms d'hôte DNS privés
<a name="msk-connect-dns"></a>

Grâce à la prise en charge des noms d'hôte DNS privés dans MSK Connect, vous pouvez configurer des connecteurs pour référencer des noms de domaine publics ou privés. Cette prise en charge dépend des serveurs DNS spécifiés dans le *jeu d'options DHCP* du VPC.

Un jeu d'options DHCP est un groupe de configurations réseau utilisé par les instances EC2 dans un VPC pour communiquer sur votre réseau VPC. Chaque VPC possède un jeu d'options DHCP par défaut, mais vous pouvez créer un jeu d'options DHCP personnalisé si, par exemple, vous souhaitez que les instances d'un VPC utilisent un serveur DNS différent pour la résolution des noms de domaine plutôt que le serveur DNS fourni par Amazon. Voir [Jeux d'options DHCP dans le VPC Amazon](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html).

Avant que la résolution DNS privée ne capability/feature soit incluse dans MSK Connect, les connecteurs utilisaient les résolveurs DNS VPC du service pour les requêtes DNS provenant d'un connecteur client. Les connecteurs n'utilisaient pas les serveurs DNS définis dans les jeux d'options DHCP du VPC du client pour la résolution DNS.

Les connecteurs pouvaient uniquement faire référence à des noms d'hôtes dans des configurations de connecteurs clients ou à des plugins pouvant être résolus publiquement. Ils ne pouvaient pas résoudre les noms d'hôtes privés définis dans une zone hébergée en privé ou utiliser les serveurs DNS d'un autre réseau client.

Sans le DNS privé, les clients qui choisissaient de rendre leurs bases de données, leurs entrepôts de données et leurs systèmes tels que le Secrets Manager dans leur propre VPC inaccessibles à Internet ne pouvaient pas utiliser les connecteurs MSK. Les clients utilisent souvent des noms d'hôte DNS privés pour se conformer à la politique de sécurité de l'entreprise.

# Configurer un ensemble d'options DHCP VPC pour votre connecteur
<a name="msk-connect-dns-config-dhcp"></a>

Les connecteurs utilisent automatiquement les serveurs DNS définis dans leur jeu d'options DHCP du VPC lors de la création du connecteur. Avant de créer un connecteur, assurez-vous de configurer l'option DHCP du VPC définie pour les exigences de résolution du nom d'hôte DNS de votre connecteur.

Les connecteurs que vous avez créés avant que la fonctionnalité de nom d'hôte DNS privé ne soit disponible dans MSK Connect continuent d'utiliser la configuration de résolution DNS précédente sans qu'aucune modification ne soit requise.

Si vous avez uniquement besoin d'une résolution de nom d'hôte DNS pouvant être résolue publiquement dans votre connecteur, pour faciliter la configuration, nous vous recommandons d'utiliser le VPC par défaut de votre compte lorsque vous créez le connecteur. Consultez [Serveur Amazon DNS](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#AmazonDNS) dans le *Guide de l'utilisateur du VPC Amazon* pour plus d'informations sur le serveur DNS fourni par Amazon ou Amazon Route 53 Resolver.

Si vous devez résoudre des noms d'hôtes DNS privés, assurez-vous que les options DHCP du VPC transmis lors de la création du connecteur sont correctement configurées. Pour plus d'informations, consultez [Travailler avec des jeux d'options DHCP](https://docs.aws.amazon.com/vpc/latest/userguide/DHCPOptionSet.html) dans le *Guide de l'utilisateur du VPC Amazon*.

Lorsque vous configurez un jeu d'options DHCP pour la résolution de noms d'hôtes DNS privés, assurez-vous que le connecteur peut atteindre les serveurs DNS personnalisés que vous configurez dans le jeu d'options DHCP. Dans le cas contraire, la création de votre connecteur échouera.

Après avoir personnalisé le jeu d'options DHCP du VPC, les connecteurs créés ultérieurement dans ce VPC utilisent les serveurs DNS que vous avez spécifiés dans le jeu d'options. Si vous modifiez le jeu d'options après avoir créé un connecteur, le connecteur adopte les paramètres du nouveau jeu d'options en quelques minutes.

# Configuration des attributs DNS pour votre VPC
<a name="msk-connect-dns-attributes"></a>

Assurez-vous que les attributs DNS du VPC sont correctement configurés, comme décrit dans la section [Attributs DNS de votre VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-support) et [Noms d'hôte DNS](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html#vpc-dns-hostnames) dans le *Guide de l'utilisateur du VPC Amazon*.

Consultez la section [Résolution des requêtes DNS entre VPCs et votre réseau](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver.html) dans le *guide du développeur Amazon Route 53* pour plus d'informations sur l'utilisation des points de terminaison de résolution entrants et sortants pour connecter d'autres réseaux à votre VPC afin de fonctionner avec votre connecteur.

# Gérer les échecs liés à la création de connecteurs
<a name="msk-connect-dns-failure-handling"></a>

Cette section décrit les échecs éventuels de création de connecteurs associés à la résolution DNS et les actions suggérées pour résoudre les problèmes.


| Échec | Action suggérée | 
| --- | --- | 
|  La création du connecteur échoue si une requête de résolution DNS échoue ou si les serveurs DNS sont inaccessibles depuis le connecteur.  |  Vous pouvez voir des échecs de création de connecteurs dus à des requêtes de résolution DNS infructueuses dans vos CloudWatch journaux, si vous avez configuré ces journaux pour votre connecteur. Vérifiez les configurations des serveurs DNS et assurez-vous de la connectivité réseau avec les serveurs DNS à partir du connecteur.  | 
|  Si vous modifiez la configuration des serveurs DNS dans votre jeu d'options DHCP du VPC alors qu'un connecteur est en cours d'exécution, les requêtes de résolution DNS provenant du connecteur peuvent échouer. Si la résolution DNS échoue, certaines tâches du connecteur peuvent passer à l'état d'échec.  |  Vous pouvez voir des échecs de création de connecteurs dus à des requêtes de résolution DNS infructueuses dans vos CloudWatch journaux, si vous avez configuré ces journaux pour votre connecteur. Les tâches ayant échoué devraient redémarrer automatiquement pour rétablir le connecteur. Si cela ne se produit pas, vous pouvez contacter le support pour redémarrer les tâches ayant échoué pour leur connecteur ou vous pouvez recréer le connecteur.  | 

# Sécurité pour MSK Connect
<a name="msk-connect-security"></a>

Vous pouvez utiliser un point de terminaison VPC d'interface, alimenté par AWS PrivateLink, pour empêcher le trafic entre votre Amazon VPC et Amazon MSK-Connect APIs compatible de quitter le réseau Amazon. Les points de terminaison VPC d'interface ne nécessitent pas de passerelle Internet, de périphérique NAT, de connexion VPN ou de connexion. AWS Direct Connect Pour de plus amples informations, veuillez consulter [Utiliser Amazon MSK APIs avec des points de terminaison VPC d'interface](privatelink-vpc-endpoints.md).

# Journalisation pour MSK Connect
<a name="msk-connect-logging"></a>

MSK Connect peut écrire des événements de journal que vous pouvez utiliser pour déboguer votre connecteur. Lorsque vous créez un connecteur, vous pouvez spécifier aucune ou plusieurs des destinations de journal suivantes :
+ Amazon CloudWatch Logs : vous spécifiez le groupe de journaux auquel vous souhaitez que MSK Connect envoie les événements de journal de votre connecteur. Pour plus d'informations sur la création d'un groupe de journaux, voir [Création d'un groupe de journaux](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html#Create-Log-Group) dans le *Guide de l'utilisateur CloudWatch des journaux*.
+ Amazon S3 : vous spécifiez le compartiment S3 auquel vous souhaitez que MSK Connect envoie les événements de journal de votre connecteur. Pour plus d'informations sur la façon de créer un compartiment S3, consultez [Création d'un compartiment](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) dans le *Guide de l'utilisateur Amazon S3*.
+ Amazon Data Firehose : vous spécifiez le flux de diffusion vers lequel vous souhaitez que MSK Connect envoie les événements du journal de votre connecteur. Pour plus d'informations sur la création d'un flux de diffusion, consultez la section [Création d'un flux de diffusion Amazon Data Firehose dans le guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) de l'utilisateur de *Firehose*.

Pour en savoir plus sur la configuration de la journalisation, consultez la section [Activation de la journalisation à partir de certains services AWS](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AWS-logs-and-resource-policy.html) dans le *Guide de l'utilisateur Amazon CloudWatch Logs *.

MSK Connect émet les types d'événements de journal suivants :


****  

| Niveau | Description | 
| --- | --- | 
| INFO | Événements d'exécution présentant un intérêt au démarrage et à l'arrêt. | 
| WARN | Situations d'exécution qui ne sont pas des erreurs mais qui sont indésirables ou inattendues. | 
| FATAL | Erreurs graves entraînant un arrêt prématuré. | 
| ERROR | Conditions inattendues et erreurs d'exécution qui ne sont pas fatales. | 

Voici un exemple d'événement de journal envoyé à CloudWatch Logs :

```
[Worker-0bb8afa0b01391c41] [2021-09-06 16:02:54,151] WARN [Producer clientId=producer-1] Connection to node 1 (b-1.my-test-cluster.twwhtj.c2.kafka.us-east-1.amazonaws.com/INTERNAL_IP) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:782)
```

## Empêcher l'apparition de secrets dans les journaux des connecteurs
<a name="msk-connect-logging-secrets"></a>

**Note**  
Des valeurs de configuration sensibles peuvent apparaître dans les journaux des connecteurs si un plugin ne définit pas ces valeurs comme secrètes. Kafka Connect traite les valeurs de configuration non définies de la même manière que toute autre valeur en texte brut.

Si votre plugin définit une propriété comme secrète, Kafka Connect supprime la valeur de la propriété des journaux du connecteur. Par exemple, les journaux du connecteur suivants montrent que si un plugin définit `aws.secret.key` comme un type `PASSWORD`, sa valeur est alors remplacée par `[hidden]`.

```
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] [2022-01-11 15:18:55,150] INFO SecretsManagerConfigProviderConfig values:
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.access.key = my_access_key
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.region = us-east-1
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] aws.secret.key = [hidden]
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.prefix =
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] secret.ttl.ms = 300000
    2022-01-11T15:18:55.000+00:00    [Worker-05e6586a48b5f331b] (com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProviderConfig:361)
```

Pour éviter que des secrets n'apparaissent dans les fichiers journaux du connecteur, le développeur d'un plugin doit utiliser la constante d'énumération de Kafka Connect [https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD](https://kafka.apache.org/27/javadoc/org/apache/kafka/common/config/ConfigDef.Type.html#PASSWORD) pour définir les propriétés sensibles. Lorsqu'une propriété est du type `ConfigDef.Type.PASSWORD`, Kafka Connect exclut sa valeur des journaux du connecteur, même si la valeur est envoyée sous forme de texte brut. 

# Surveillance d'Amazon MSK Connect
<a name="mkc-monitoring-overview"></a>

La surveillance joue un rôle important dans le maintien de la fiabilité, de la disponibilité et des performances de MSK Connect et de vos autres AWS solutions. Amazon CloudWatch surveille vos AWS ressources et les applications que vous exécutez AWS en temps réel. Vous pouvez collecter et suivre les métriques, créer des tableaux de bord personnalisés, et définir des alarmes qui vous informent ou prennent des mesures lorsqu’une métrique spécifique atteint un seuil que vous spécifiez. Par exemple, vous pouvez CloudWatch suivre l'utilisation du processeur ou d'autres mesures de votre connecteur, afin de pouvoir augmenter sa capacité si nécessaire. Pour plus d'informations, consultez le [guide de CloudWatch l'utilisateur Amazon](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).

Vous pouvez utiliser les opérations d'API suivantes :
+ `DescribeConnectorOperation`: Surveillez l'état des opérations de mise à jour du connecteur.
+ `ListConnectorOperations`: suivez les mises à jour précédentes exécutées sur votre connecteur.

Le tableau suivant indique les métriques que MSK Connect envoie CloudWatch sous la `ConnectorName` dimension. MSK Connect fournit ces métriques par défaut et sans frais supplémentaires. CloudWatch conserve ces indicateurs pendant 15 mois, afin que vous puissiez accéder aux informations historiques et avoir une meilleure idée des performances de vos connecteurs. Vous pouvez également définir des alarmes qui surveillent certains seuils et envoient des notifications ou prennent des mesures lorsque ces seuils sont atteints. Pour plus d'informations, consultez le [guide de CloudWatch l'utilisateur Amazon](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/).


| Nom des métriques | Description | 
| --- | --- | 
| CpuUtilization | Pourcentage de la consommation du processeur par système et par utilisateur. | 
| ErroredTaskCount | Nombre de tâches qui ont des erreurs. | 
| MemoryUtilization | Pourcentage de la mémoire totale d'une instance de worker, et pas seulement la mémoire de tas de la machine virtuelle Java (JVM) actuellement utilisée. La JVM ne restitue généralement pas de mémoire au système d'exploitation. Ainsi, la taille de tas de la JVM (MemoryUtilization) commence généralement par une taille de tas minimale qui augmente progressivement jusqu'à un maximum stable d'environ 80 à 90 %. L'utilisation du tas de mémoire de la JVM peut augmenter ou diminuer à mesure que l'utilisation réelle de la mémoire du connecteur change. | 
| RebalanceCompletedTotal | Nombre total de rééquilibres effectués par ce connecteur. | 
| RebalanceTimeAvg | Durée moyenne (en millisecondes) que le connecteur consacre au rééquilibrage.  | 
| RebalanceTimeMax | Durée maximale (en millisecondes) que le connecteur consacre au rééquilibrage. | 
| RebalanceTimeSinceLast |  Durée (en millisecondes) écoulée depuis que ce connecteur a effectué le dernier rééquilibrage.  | 
| RunningTaskCount | Nombre de tâches en cours d'exécution dans le connecteur. | 
| SinkConsumerByteRate | Nombre moyen d'octets consommés par seconde par le consommateur Sink du framework Kafka Connect avant que des transformations ne soient appliquées aux données. | 
| SinkRecordReadRate | Nombre moyen (par seconde) d'enregistrements lus depuis le cluster Apache Kafka ou Amazon MSK. | 
| SinkRecordSendRate | Nombre moyen (par seconde) d'enregistrements générés par les transformations et envoyés à la destination. Ce nombre n'inclut pas les enregistrements filtrés. | 
| SourceRecordPollRate | Nombre moyen (par seconde) d'enregistrements produits ou interrogés. | 
| SourceProducerByteRate | Le nombre moyen d'octets produits par seconde par le producteur Source du framework Kafka Connect après l'application de toute transformation aux données. | 
| SourceRecordWriteRate | Nombre moyen (par seconde) d'enregistrements générés par les transformations et écrits sur le cluster Apache Kafka ou Amazon MSK. | 
| TaskStartupAttemptsTotal | Nombre total de démarrages de tâches que le connecteur a tentés. Vous pouvez utiliser cette métrique pour identifier les anomalies lors des tentatives de démarrage de tâches. | 
| TaskStartupSuccessPercentage | Pourcentage moyen de démarrages de tâches réussis pour le connecteur. Vous pouvez utiliser cette métrique pour identifier les anomalies lors des tentatives de démarrage de tâches. | 
| WorkerCount | Nombre de workers en cours d'exécution dans le connecteur. | 
| BytesInPerSec | Octets de métadonnées transférés vers le framework Kafka Connect pour la communication entre les travailleurs. | 
| BytesOutPerSec | Octets de métadonnées transférés depuis le framework Kafka Connect pour la communication entre les travailleurs. | 

# Exemples de configuration des ressources Amazon MSK Connect
<a name="msk-connect-examples"></a>

Cette section contient des exemples destinés à vous aider à configurer les ressources Amazon MSK Connect, comme les connecteurs tiers et les fournisseurs de configuration courants.

**Topics**
+ [Configuration du connecteur de réception Amazon S3](mkc-S3sink-connector-example.md)
+ [Configurer le connecteur d'évier EventBridge Kafka pour MSK Connect](mkc-eventbridge-kafka-connector.md)
+ [Utiliser le connecteur source Debezium avec le fournisseur de configuration](mkc-debeziumsource-connector-example.md)

# Configuration du connecteur de réception Amazon S3
<a name="mkc-S3sink-connector-example"></a>

Cet exemple montre comment utiliser le [connecteur récepteur Amazon S3 Confluent et comment AWS CLI créer un connecteur](https://www.confluent.io/hub/confluentinc/kafka-connect-s3) récepteur Amazon S3 dans MSK Connect.

1. Copiez le code JSON et collez-le dans un nouveau fichier. Remplacez les chaînes d'espace réservé par des valeurs correspondant à la chaîne de connexion aux serveurs bootstrap de votre cluster Amazon MSK ainsi qu'au sous-réseau et au groupe de sécurité du cluster. IDs Pour plus d'informations sur la configuration d'un rôle d'exécution de service, consultez [Rôles et politiques IAM pour MSK Connect](msk-connect-iam.md).

   ```
   {
       "connectorConfiguration": {
           "connector.class": "io.confluent.connect.s3.S3SinkConnector",
           "s3.region": "us-east-1",
           "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
           "flush.size": "1",
           "schema.compatibility": "NONE",
           "topics": "my-test-topic",
           "tasks.max": "2",
           "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
           "storage.class": "io.confluent.connect.s3.storage.S3Storage",
           "s3.bucket.name": "amzn-s3-demo-bucket"
       },
       "connectorName": "example-S3-sink-connector",
       "kafkaCluster": {
           "apacheKafkaCluster": {
               "bootstrapServers": "<cluster-bootstrap-servers-string>",
               "vpc": {
                   "subnets": [
                       "<cluster-subnet-1>",
                       "<cluster-subnet-2>",
                       "<cluster-subnet-3>"
                   ],
                   "securityGroups": ["<cluster-security-group-id>"]
               }
           }
       },
       "capacity": {
           "provisionedCapacity": {
               "mcuCount": 2,
               "workerCount": 4
           }
       },
       "kafkaConnectVersion": "2.7.1",
       "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>",
       "plugins": [
           {
               "customPlugin": {
                   "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>",
                   "revision": 1
               }
           }
       ],
       "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"},
       "kafkaClusterClientAuthentication": {"authenticationType": "NONE"}
   }
   ```

1. Exécutez la AWS CLI commande suivante dans le dossier dans lequel vous avez enregistré le fichier JSON à l'étape précédente.

   ```
   aws kafkaconnect create-connector --cli-input-json file://connector-info.json
   ```

   Voici un exemple du résultat que vous obtenez lorsque vous exécutez la commande.

   ```
   {
       "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
       "ConnectorState": "CREATING", 
       "ConnectorName": "example-S3-sink-connector"
   }
   ```

# Configurer le connecteur d'évier EventBridge Kafka pour MSK Connect
<a name="mkc-eventbridge-kafka-connector"></a>

Cette rubrique explique comment configurer le [connecteur EventBridge Kafka Sink](https://github.com/awslabs/eventbridge-kafka-connector) pour MSK Connect. Ce connecteur vous permet d'envoyer des événements depuis votre cluster MSK vers des [bus d' EventBridge événements](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-bus.html). Cette rubrique décrit le processus de création des ressources requises et de configuration du connecteur pour permettre un flux de données fluide entre Kafka et EventBridge. 

**Topics**
+ [Conditions préalables](#mkc-eb-kafka-prerequisites)
+ [Configuration des ressources requises pour MSK Connect](#mkc-eb-kafka-set-up-resources)
+ [Création du connecteur](#mkc-eb-kafka-create-connector)
+ [Envoyer des messages à Kafka](#mkc-eb-kafka-send-json-encoded-messages)

## Conditions préalables
<a name="mkc-eb-kafka-prerequisites"></a>

Avant de déployer le connecteur, assurez-vous de disposer des ressources suivantes :
+ Cluster **Amazon MSK : cluster** MSK actif pour produire et consommer des messages Kafka.
+ Bus **d' EventBridge événements Amazon : bus** d' EventBridge événements destiné à recevoir les événements liés aux sujets Kafka.
+ **Rôles IAM** : créez des rôles IAM avec les autorisations nécessaires pour MSK Connect et EventBridge le connecteur.
+ [Accès à l'Internet public](msk-connect-internet-access.md) à partir de MSK Connect ou d'un point de terminaison EventBridge d'[interface VPC](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-related-service-vpc.html) créé dans le VPC et le sous-réseau de votre cluster MSK. Cela vous permet d'éviter de traverser l'Internet public sans avoir besoin de passerelles NAT.
+ Une [machine cliente](create-serverless-cluster-client.md), telle qu'une instance Amazon EC2 ou [AWS CloudShell](https://aws.amazon.com/cloudshell/), pour créer des sujets et envoyer des enregistrements à Kafka.

## Configuration des ressources requises pour MSK Connect
<a name="mkc-eb-kafka-set-up-resources"></a>

Vous créez un rôle IAM pour le connecteur, puis vous créez le connecteur. Vous créez également une EventBridge règle pour filtrer les événements Kafka envoyés au bus d' EventBridge événements.

**Topics**
+ [Rôle IAM pour le connecteur](#mkc-eb-kafka-iam-role-connector)
+ [Une EventBridge règle pour les événements entrants](#mkc-eb-kafka-create-rule)

### Rôle IAM pour le connecteur
<a name="mkc-eb-kafka-iam-role-connector"></a>

Le rôle IAM que vous associez au connecteur doit être [PutEvents](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-permissions-reference.html)autorisé à envoyer des événements à EventBridge. L'exemple de politique IAM suivant vous autorise à envoyer des événements à un bus d'événements nommé. `example-event-bus` Dans l'exemple suivant, assurez-vous de remplacer l'ARN de la ressource par l'ARN de votre bus d'événements.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus"
    }
  ]
}
```

------

En outre, vous devez vous assurer que votre rôle IAM pour le connecteur contient la politique de confiance suivante.

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "kafkaconnect.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

------

### Une EventBridge règle pour les événements entrants
<a name="mkc-eb-kafka-create-rule"></a>

Vous créez des [règles](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html) qui font correspondre les événements entrants aux critères de données d'événements, connus sous le nom de [https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html). Avec un modèle d'événements, vous pouvez définir les critères pour filtrer les événements entrants et déterminer quels événements doivent déclencher une règle particulière et être ensuite acheminés vers une [cible](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html) désignée. L'exemple suivant de modèle d'événement correspond aux événements Kafka envoyés au bus d' EventBridge événements.

```
{
  "detail": {
    "topic": ["msk-eventbridge-tutorial"]
  }
}
```

Voici un exemple d'événement envoyé par Kafka à l' EventBridge aide du connecteur Kafka Sink.

```
{
  "version": "0",
  "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57",
  "account": "123456789012",
  "time": "2025-03-26T10:15:00Z",
  "region": "us-east-1",
  "detail-type": "msk-eventbridge-tutorial",
  "source": "kafka-connect.msk-eventbridge-tutorial",
  "resources": [],
  "detail": {
    "topic": "msk-eventbridge-tutorial",
    "partition": 0,
    "offset": 0,
    "timestamp": 1742984100000,
    "timestampType": "CreateTime",
    "headers": [],
    "key": "order-1",
    "value": {
      "orderItems": [
        "item-1",
        "item-2"
      ],
      "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025"
    }
  }
}
```

Dans la EventBridge console, [créez une règle](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-create-rule.html) sur le bus d'événements à l'aide de cet exemple de modèle et spécifiez une cible, telle qu'un groupe CloudWatch Logs. La EventBridge console configurera automatiquement la politique d'accès nécessaire pour le groupe CloudWatch Logs.

## Création du connecteur
<a name="mkc-eb-kafka-create-connector"></a>

Dans la section suivante, vous allez créer et déployer le [connecteur EventBridge Kafka Sink](https://github.com/awslabs/eventbridge-kafka-connector) à l'aide du AWS Management Console.

**Topics**
+ [Étape 1 : Téléchargez le connecteur](#mkc-eb-kafka-download-connector)
+ [Étape 2 : créer un compartiment Amazon S3](#mkc-eb-kafka-s3-bucket-create)
+ [Étape 3 : Création d'un plugin dans MSK Connect](#mkc-eb-kafka-create-plugin)
+ [Étape 4 : Créer un connecteur](#mkc-eb-kafka-create-connector)

### Étape 1 : Téléchargez le connecteur
<a name="mkc-eb-kafka-download-connector"></a>

Téléchargez le dernier fichier JAR du EventBridge connecteur depuis la [page GitHub des versions](https://github.com/awslabs/eventbridge-kafka-connector/releases) du connecteur EventBridge Kafka. Par exemple, pour télécharger la version v1.4.1, cliquez sur le lien du fichier JAR pour télécharger le connecteur. `kafka-eventbridge-sink-with-dependencies.jar` Enregistrez ensuite le fichier à l'emplacement de votre choix sur votre machine.

### Étape 2 : créer un compartiment Amazon S3
<a name="mkc-eb-kafka-s3-bucket-create"></a>

1. Pour stocker le fichier JAR dans Amazon S3 afin de l'utiliser avec MSK Connect, ouvrez le AWS Management Console, puis choisissez Amazon S3.

1. Dans la console Amazon S3, choisissez **Create bucket** et entrez un nom de bucket unique. Par exemple, **amzn-s3-demo-bucket1-eb-connector**.

1. Choisissez une région appropriée pour votre compartiment Amazon S3. Assurez-vous qu'il correspond à la région dans laquelle votre cluster MSK est déployé.

1. Pour les **paramètres du bucket**, conservez les sélections par défaut ou ajustez-les selon les besoins.

1. Choisissez **Créer un compartiment**.

1. Téléchargez le fichier JAR dans le compartiment Amazon S3.

### Étape 3 : Création d'un plugin dans MSK Connect
<a name="mkc-eb-kafka-create-plugin"></a>

1. Ouvrez le AWS Management Console, puis accédez à **MSK Connect**.

1. Dans le volet de navigation de gauche, choisissez **Plug-ins personnalisés**.

1. Choisissez **Create plugin**, puis entrez un **nom de plugin**. Par exemple, **eventbridge-sink-plugin**.

1. Pour l'**emplacement personnalisé du plugin**, collez l'**URL de l'objet S3**.

1. Ajoutez une description facultative pour le plugin.

1. Choisissez **Create plugin**.

Une fois le plugin créé, vous pouvez l'utiliser pour configurer et déployer le connecteur EventBridge Kafka dans MSK Connect.

### Étape 4 : Créer un connecteur
<a name="mkc-eb-kafka-create-connector"></a>

Avant de créer le connecteur, nous vous recommandons de créer le sujet Kafka requis pour éviter les erreurs de connecteur. Pour créer le sujet, utilisez votre machine cliente.

1. Dans le volet gauche de la console MSK, choisissez **Connectors**, puis **Create connector**.

1. Dans la liste des plugins, choisissez **eventbridge-sink-plugin**, puis **Suivant**.

1. Pour le nom du connecteur, entrez**EventBridgeSink**.

1. Dans la liste des clusters, choisissez votre cluster MSK.

1. <a name="connector-ex"></a>Copiez la configuration suivante pour le connecteur et collez-la dans le champ **Configuration du connecteur**

   Remplacez les espaces réservés dans la configuration suivante, selon les besoins.
   + `aws.eventbridge.endpoint.uri`Supprimez-le si votre cluster MSK dispose d'un accès Internet public.
   + Si vous vous connectez PrivateLink en toute sécurité depuis MSK à EventBridge, remplacez la partie DNS suivante par le nom DNS privé correct du point de terminaison de l'interface VPC (facultatif) EventBridge pour celui que vous avez créé précédemment. `https://`
   + Remplacez l'ARN du bus d' EventBridge événements dans la configuration suivante par l'ARN de votre bus d'événements.
   + Mettez à jour les valeurs spécifiques à une région.

   ```
   {
     "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
     "aws.eventbridge.connector.id": "msk-eventbridge-tutorial",
     "topics": "msk-eventbridge-tutorial",
     "tasks.max": "1",
     "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com",
     "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus",
     "value.converter.schemas.enable": "false",
     "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "aws.eventbridge.region": "us-east-1",
     "auto.offset.reset": "earliest",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```

   Pour plus d'informations sur la configuration du connecteur, consultez [eventbridge-kafka-connector](https://github.com/awslabs/eventbridge-kafka-connector).

   Si nécessaire, modifiez les paramètres relatifs aux travailleurs et à l'autoscaling. Nous vous recommandons également d'utiliser la dernière version disponible (recommandée) d'Apache Kafka Connect dans la liste déroulante. Sous **Autorisations d'accès**, utilisez le rôle créé précédemment. Nous recommandons également d'activer la connexion à des CloudWatch fins d'observabilité et de résolution des problèmes. Ajustez les autres paramètres facultatifs, tels que les balises, en fonction de vos besoins. Déployez ensuite le connecteur et attendez que le statut passe à l'état Exécutif.

## Envoyer des messages à Kafka
<a name="mkc-eb-kafka-send-json-encoded-messages"></a>

Vous pouvez configurer les codages des messages, tels qu'Apache Avro et JSON, en spécifiant différents convertisseurs à l'aide des `key.converter` paramètres `value.converter` et, éventuellement, disponibles dans Kafka Connect.

[connector example](#connector-ex)Dans cette rubrique, il est configuré pour fonctionner avec des messages codés en JSON, comme indiqué par l'utilisation de `org.apache.kafka.connect.json.JsonConverter` for. `value converter` Lorsque le connecteur est en cours d'exécution, envoyez des enregistrements au sujet `msk-eventbridge-tutorial` Kafka depuis votre machine cliente.

# Utiliser le connecteur source Debezium avec le fournisseur de configuration
<a name="mkc-debeziumsource-connector-example"></a>

Cet exemple explique comment utiliser le plugin du connecteur Debezium MySQL avec une base de données [Amazon Aurora](https://aws.amazon.com/rds/aurora/) compatible avec MySQL comme source. Dans cet exemple, nous avons également configuré le [fournisseur de configuration AWS Secrets Manager](https://github.com/jcustenborder/kafka-config-provider-aws) open source pour externaliser les informations d'identification de la base de données dans AWS Secrets Manager. Pour en savoir plus sur les fournisseurs de configuration, consultez [Tutoriel : Externalisation d'informations sensibles à l'aide de fournisseurs de configuration](msk-connect-config-provider.md).

**Important**  
Le plugin du connecteur Debezium MySQL [ne prend en charge qu'une seule tâche](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-tasks-max) et ne fonctionne pas avec le mode de capacité de mise à l'échelle automatique pour Amazon MSK Connect. Vous devez plutôt utiliser le mode de capacité provisionné et définir la valeur de `workerCount` égale à un dans la configuration de votre connecteur. Pour en savoir plus sur les modes de capacité de MSK Connect, consultez [Comprendre la capacité du connecteur](msk-connect-capacity.md).

# Conditions préalables complètes pour utiliser le connecteur source Debezium
<a name="mkc-debeziumsource-connector-example-prereqs"></a>

Votre connecteur doit être en mesure d'accéder à Internet afin de pouvoir interagir avec des services tels AWS Secrets Manager que ceux situés en dehors du vôtre Amazon Virtual Private Cloud. Les étapes décrites dans cette section vous aident à effectuer les tâches suivantes pour activer l'accès à Internet.
+ Configurez un sous-réseau public qui héberge une passerelle NAT et achemine le trafic vers une passerelle Internet dans votre VPC.
+ Créez une route par défaut qui dirige le trafic de votre sous-réseau privé vers votre passerelle NAT.

Pour de plus amples informations, veuillez consulter [Activer l'accès à Internet pour Amazon MSK Connect](msk-connect-internet-access.md).

**Conditions préalables**

Avant de pouvoir activer l'accès à Internet, vous devez disposer des éléments suivants :
+ L'ID du Amazon Virtual Private Cloud (VPC) associé à votre cluster. Par exemple, *vpc-123456ab*.
+ Les IDs sous-réseaux privés de votre VPC. Par exemple, *subnet-a1b2c3de*, *subnet-f4g5h6ij*, etc. Vous devez configurer votre connecteur avec des sous-réseaux privés.

**Pour activer l'accès à Internet pour votre connecteur**

1. Ouvrez la Amazon Virtual Private Cloud console à l'adresse [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Créez un sous-réseau public pour votre passerelle NAT avec un nom descriptif et notez l'ID du sous-réseau. Pour obtenir des informations détaillées, veuillez consulter [Créer un sous-réseau dans votre VPC](https://docs.aws.amazon.com/vpc/latest/userguide/working-with-vpcs.html#AddaSubnet).

1. Créez une passerelle Internet afin que votre VPC puisse communiquer avec Internet et notez l'ID de passerelle. Attachez la passerelle Internet à votre VPC. Pour de plus amples informations, consultez [Créer et attacher une passerelle Internet](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Attach_Gateway).

1. Provisionnez une passerelle NAT publique afin que les hôtes de vos sous-réseaux privés puissent accéder à votre sous-réseau public. Lorsque vous créez la passerelle NAT, sélectionnez le sous-réseau public que vous avez créé précédemment. Pour obtenir des informations, consultez [Créer une passerelle NAT](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html#nat-gateway-creating).

1. Configurez vos tables de routage. Vous devez disposer de deux tables de routage au total pour terminer cette configuration. Vous devriez déjà avoir une table de routage principale créée automatiquement en même temps que votre VPC. Au cours de cette étape, vous créez une table de routage supplémentaire pour votre sous-réseau public.

   1. Utilisez les paramètres suivants pour modifier la table de routage principale de votre VPC afin que vos sous-réseaux privés acheminent le trafic vers votre passerelle NAT. Pour obtenir des informations, consultez la section [Utiliser des tables de routage](https://docs.aws.amazon.com/vpc/latest/userguide/WorkWithRouteTables.html) dans le *Guide de l'utilisateur *Amazon Virtual Private Cloud**.  
**Table de routage MSKC privée**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

   1. Suivez les instructions de la section [Créer une table de routage personnalisée](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_Internet_Gateway.html#Add_IGW_Routing) pour créer une table de routage pour votre sous-réseau public. Lorsque vous créez la table, entrez un nom descriptif dans le champ **Identification de nom** pour vous aider à identifier le sous-réseau auquel la table est associée. Par exemple, **MSKC public**.

   1. Configurez votre table de routage **MSKC public** à l'aide des paramètres suivants.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/fr_fr/msk/latest/developerguide/mkc-debeziumsource-connector-example-prereqs.html)

Maintenant que vous avez activé l'accès à Internet pour Amazon MSK Connect, vous êtes prêt à créer un connecteur.

# Création d'un connecteur source Debezium
<a name="msk-connect-debeziumsource-connector-example-steps"></a>

Cette procédure décrit comment créer un connecteur source Debezium.

1. 

**Créez un plugin personnalisé**

   1. Téléchargez la dernière version stable du plugin du connecteur MySQL depuis le site [Debezium](https://debezium.io/releases/). Notez la version de Debezium que vous téléchargez (version 2.x ou ancienne série 1.x). Vous allez créer ultérieurement un connecteur basé sur votre version de Debezium.

   1. Téléchargez et extrayez le [fournisseur de configuration AWS Secrets Manager](https://www.confluent.io/hub/jcustenborder/kafka-config-provider-aws).

   1. Placez les archives suivantes dans le même répertoire :
      + Le dossier `debezium-connector-mysql`
      + Le dossier `jcusten-border-kafka-config-provider-aws-0.1.1`

   1. Compressez le répertoire que vous avez créé à l'étape précédente dans un fichier ZIP, puis chargez ce dernier dans un compartiment S3. Pour obtenir des informations, consultez [Chargement d'objets](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html) dans le *Guide de l'utilisateur Amazon S3.*

   1. Copiez le code JSON et collez-le dans un fichier. Par exemple, `debezium-source-custom-plugin.json`. Remplacez-le *<example-custom-plugin-name>* par le nom que vous souhaitez attribuer au plugin, *<amzn-s3-demo-bucket-arn>* par l'ARN du compartiment Amazon S3 dans lequel vous avez chargé le fichier ZIP et `<file-key-of-ZIP-object>` par la clé de fichier de l'objet ZIP que vous avez chargé sur S3.

      ```
      {
          "name": "<example-custom-plugin-name>",
          "contentType": "ZIP",
          "location": {
              "s3Location": {
                  "bucketArn": "<amzn-s3-demo-bucket-arn>",
                  "fileKey": "<file-key-of-ZIP-object>"
              }
          }
      }
      ```

   1. Exécutez la AWS CLI commande suivante depuis le dossier dans lequel vous avez enregistré le fichier JSON pour créer un plugin.

      ```
      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>
      ```

      Vous devez voir un résultat similaire à ce qui suit.

      ```
      {
          "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1",
          "CustomPluginState": "CREATING",
          "Name": "example-custom-plugin-name",
          "Revision": 1
      }
      ```

   1. Exécutez la commande suivante pour vérifier le statut du plugin. Le statut doit passer de `CREATING` à `ACTIVE`. Remplacez l'espace réservé de l'ARN par l'ARN que vous avez obtenu dans le résultat de la commande précédente.

      ```
      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
      ```

1. 

**Configurer AWS Secrets Manager et créer un secret pour les informations d'identification de votre base de données**

   1. Ouvrez la console Secrets Manager à l'adresse [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/).

   1. Créez un nouveau secret pour stocker les informations d'identification de connexion à votre base de données. Pour obtenir des informations, consultez la section [Créer un secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_create-basic-secret.html) dans le Guide de l'utilisateur *AWS Secrets Manager*.

   1. Copiez l'ARN de votre secret.

   1. Ajoutez les autorisations Secrets Manager de l'exemple de politique suivant à votre [Comprendre le rôle d'exécution des services](msk-connect-service-execution-role.md). *<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>*Remplacez-le par l'ARN de votre secret.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "secretsmanager:GetResourcePolicy",
              "secretsmanager:GetSecretValue",
              "secretsmanager:DescribeSecret",
              "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": [
            "arn:aws:secretsmanager:us-east-1:123456789012:secret:MySecret-1234"
            ]
          }
        ]
      }
      ```

------

      Pour obtenir des informations sur la façon d'ajouter des autorisations IAM, consultez [Ajout et suppression d'autorisations basées sur l'identité IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html) dans le *Guide d'utilisateur IAM*.

1. 

**Créez une configuration de worker personnalisée avec des informations sur votre fournisseur de configuration**

   1. Copiez les propriétés de configuration du worker suivantes dans un fichier, en remplaçant les chaînes d'espace réservé par des valeurs correspondant à votre scénario. Pour en savoir plus sur les propriétés de configuration du fournisseur de configuration AWS Secrets Manager, consultez [SecretsManagerConfigProvider](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-config-provider-aws/configProviders/SecretsManagerConfigProvider.html)la documentation du plugin.

      ```
      key.converter=<org.apache.kafka.connect.storage.StringConverter>
      value.converter=<org.apache.kafka.connect.storage.StringConverter>
      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=<us-east-1>
      ```

   1. Exécutez la AWS CLI commande suivante pour créer votre configuration de travail personnalisée. 

      Remplacez les valeurs suivantes :
      + *<my-worker-config-name>*- un nom descriptif pour votre configuration de travail personnalisée
      + *<encoded-properties-file-content-string>*- une version codée en base64 des propriétés en texte brut que vous avez copiées à l'étape précédente

      ```
      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
      ```

1. 

**Créez un connecteur**

   1. Copiez le code JSON suivant qui correspond à votre version de Debezium (2.x ou 1.x) et collez-le dans un nouveau fichier. Remplacez les chaînes `<placeholder>` par des valeurs correspondant à votre scénario. Pour plus d'informations sur la configuration d'un rôle d'exécution de service, consultez [Rôles et politiques IAM pour MSK Connect](msk-connect-iam.md).

      Notez que la configuration utilise des variables comme `${secretManager:MySecret-1234:dbusername}` plutôt que du texte brut pour spécifier les informations d'identification de la base de données. Remplacez `MySecret-1234` par le nom de votre secret, puis indiquez le nom de la clé que vous souhaitez récupérer. Vous devez également remplacer `<arn-of-config-provider-worker-configuration>` par l'ARN de votre configuration de worker personnalisée.

------
#### [ Debezium 2.x ]

      Pour les versions 2.x de Debezium, copiez le code JSON suivant et collez-le dans un nouveau fichier. Remplacez les chaînes *<placeholder>* par des valeurs correspondant à votre scénario.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"topic.prefix": "<logical-name-of-database-server>",
      		"schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"schema.history.internal.consumer.security.protocol": "SASL_SSL",
      		"schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"schema.history.internal.producer.security.protocol": "SASL_SSL",
      		"schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------
#### [ Debezium 1.x ]

      Pour les versions 1.x de Debezium, copiez le code JSON suivant et collez-le dans un nouveau fichier. Remplacez les chaînes *<placeholder>* par des valeurs correspondant à votre scénario.

      ```
      {
      	"connectorConfiguration": {
      		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
      		"tasks.max": "1",
      		"database.hostname": "<aurora-database-writer-instance-endpoint>",
      		"database.port": "3306",
      		"database.user": "<${secretManager:MySecret-1234:dbusername}>",
      		"database.password": "<${secretManager:MySecret-1234:dbpassword}>",
      		"database.server.id": "123456",
      		"database.server.name": "<logical-name-of-database-server>",
      		"database.include.list": "<list-of-databases-hosted-by-specified-server>",
      		"database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>",
      		"database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>",
      		"database.history.consumer.security.protocol": "SASL_SSL",
      		"database.history.consumer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"database.history.producer.security.protocol": "SASL_SSL",
      		"database.history.producer.sasl.mechanism": "AWS_MSK_IAM",
      		"database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
      		"database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
      		"include.schema.changes": "true"
      	},
      	"connectorName": "example-Debezium-source-connector",
      	"kafkaCluster": {
      		"apacheKafkaCluster": {
      			"bootstrapServers": "<cluster-bootstrap-servers-string>",
      			"vpc": {
      				"subnets": [
      					"<cluster-subnet-1>",
      					"<cluster-subnet-2>",
      					"<cluster-subnet-3>"
      				],
      				"securityGroups": ["<id-of-cluster-security-group>"]
      			}
      		}
      	},
      	"capacity": {
      		"provisionedCapacity": {
      			"mcuCount": 2,
      			"workerCount": 1
      		}
      	},
      	"kafkaConnectVersion": "2.7.1",
      	"serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>",
      	"plugins": [{
      		"customPlugin": {
      			"customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>",
      			"revision": 1
      		}
      	}],
      	"kafkaClusterEncryptionInTransit": {
      		"encryptionType": "TLS"
      	},
      	"kafkaClusterClientAuthentication": {
      		"authenticationType": "IAM"
      	},
      	"workerConfiguration": {
      		"workerConfigurationArn": "<arn-of-config-provider-worker-configuration>",
      		"revision": 1
      	}
      }
      ```

------

   1. Exécutez la AWS CLI commande suivante dans le dossier dans lequel vous avez enregistré le fichier JSON à l'étape précédente.

      ```
      aws kafkaconnect create-connector --cli-input-json file://connector-info.json
      ```

      Voici un exemple du résultat que vous obtenez lorsque vous exécutez la commande.

      ```
      {
          "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", 
          "ConnectorState": "CREATING", 
          "ConnectorName": "example-Debezium-source-connector"
      }
      ```

# Mettre à jour la configuration d'un connecteur Debezium
<a name="mkc-debeziumsource-connector-update"></a>

Pour mettre à jour la configuration du connecteur Debezium, procédez comme suit : 

1. Copiez le JSON suivant et collez-le dans un nouveau fichier. Remplacez les chaînes `<placeholder>` par des valeurs correspondant à votre scénario.

   ```
   {
      "connectorArn": <connector_arn>,
      "connectorConfiguration": <new_configuration_in_json>,
      "currentVersion": <current_version>
   }
   ```

1. Exécutez la AWS CLI commande suivante dans le dossier dans lequel vous avez enregistré le fichier JSON à l'étape précédente.

   ```
   aws kafkaconnect update-connector --cli-input-json file://connector-info.json
   ```

   Voici un exemple du résultat lorsque vous exécutez la commande avec succès.

   ```
   {
       "connectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2",
       "connectorOperationArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector-operation/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2/41b6ad56-3184-479b-850a-a8bedd5a02f3",
       "connectorState": "UPDATING"
   }
   ```

1. Vous pouvez désormais exécuter la commande suivante pour contrôler l'état actuel de l'opération :

   ```
   aws kafkaconnect describe-connector-operation --connector-operation-arn <operation_arn>
   ```

Pour avoir un exemple de connecteur Debezium avec des étapes détaillées, consultez [Présentation d'Amazon MSK Connect - Diffusez des données vers et depuis vos clusters Apache Kafka à l'aide de connecteurs gérés](https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/).

# Migrer vers Amazon MSK Connect
<a name="msk-connect-migrating"></a>

Cette section explique comment migrer votre application de connecteur Apache Kafka vers Amazon Managed Streaming for Apache Kafka Connect (Amazon MSK Connect). Pour en savoir plus sur les avantages de la migration vers Amazon MSK Connect, consultez. [Avantages liés à l'utilisation d'Amazon MSK Connect](msk-connect.md#msk-connect-benefits)

Cette section décrit également les rubriques de gestion d'état utilisées par Kafka Connect et Amazon MSK Connect et couvre les procédures de migration des connecteurs source et récepteur.

# Comprendre les sujets internes utilisés par Kafka Connect
<a name="msk-connect-kafka-connect-topics"></a>

Une application Apache Kafka Connect exécutée en mode distribué mémorise son état en utilisant des rubriques internes du cluster Kafka et l'appartenance à un groupe. Les valeurs de configuration suivantes correspondent aux rubriques internes utilisées pour les applications Kafka Connect :
+ Rubrique de configuration, spécifiée par `config.storage.topic`

  Dans la rubrique consacrée à la configuration, Kafka Connect enregistre la configuration de tous les connecteurs et tâches lancés par les utilisateurs. Chaque fois que les utilisateurs mettent à jour la configuration d'un connecteur ou lorsqu'un connecteur demande une reconfiguration (par exemple, le connecteur détecte qu'il peut démarrer d'autres tâches), un enregistrement est envoyé à cette rubrique. Cette rubrique est activée pour le compactage, elle conserve donc toujours le dernier état de chaque entité.
+ Rubrique sur les décalages, spécifiée par `offset.storage.topic`

  Dans la rubrique sur les décalages, Kafka Connect enregistre les décalages des connecteurs source. Tout comme le sujet de configuration, le sujet des décalages est activé pour le compactage. Cette rubrique est utilisée pour écrire les positions des sources uniquement pour les connecteurs sources qui produisent des données destinées à Kafka à partir de systèmes externes. Les connecteurs Sink, qui lisent les données de Kafka et les envoient à des systèmes externes, stockent leurs offsets en utilisant les groupes de consommateurs Kafka habituels.
+ Sujet de statut, spécifié par `status.storage.topic`

  Dans la rubrique consacrée au statut, Kafka Connect enregistre l'état actuel des connecteurs et des tâches. Cette rubrique est utilisée comme emplacement central pour les données demandées par les utilisateurs de l'API REST. Cette rubrique permet aux utilisateurs d'interroger n'importe quel worker tout en obtenant l'état de tous les plugins en cours d'exécution. Tout comme les rubriques relatives à la configuration et aux décalages, la rubrique d'état est également activée pour le compactage.

Outre ces sujets, Kafka Connect utilise largement l'API d'adhésion aux groupes de Kafka. Les groupes sont nommés d'après le nom du connecteur. Par exemple, pour un connecteur nommé file-sink, le groupe est nommé. connect-file-sink Chaque consommateur du groupe fournit des enregistrements pour une seule tâche. Ces groupes et leurs compensations peuvent être récupérés à l'aide d'outils classiques de groupes de consommateurs, tels que`Kafka-consumer-group.sh`. Pour chaque connecteur récepteur, le moteur d'exécution Connect exécute un groupe de consommateurs normal qui extrait les enregistrements de Kafka.

# Gestion de l'état des applications Amazon MSK Connect
<a name="msk-connect-state-management"></a>

Par défaut, Amazon MSK Connect crée trois rubriques distinctes dans le cluster Kafka pour chaque connecteur Amazon MSK afin de stocker la configuration, le décalage et l'état du connecteur. Les noms des rubriques par défaut sont structurés comme suit :
+ \$1\$1msk\$1connect\$1configs\$1 \$1 *connector-name* *connector-id*
+ \$1\$1msk\$1connect\$1status\$1 \$1 *connector-name* *connector-id*
+ \$1\$1msk\$1connect\$1offsets\$1 \$1 *connector-name* *connector-id*

**Note**  
Pour assurer la continuité du décalage entre les connecteurs source, vous pouvez utiliser une rubrique de stockage de décalage de votre choix au lieu de la rubrique par défaut. La spécification d'une rubrique de stockage des décalages vous aide à accomplir des tâches telles que la création d'un connecteur source qui reprend la lecture à partir du dernier décalage d'un connecteur précédent. Pour spécifier un sujet de stockage offset, entrez une valeur pour la [https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-manage-connector-offsets)propriété dans la configuration du worker Amazon MSK Connect avant de créer le connecteur.

# Migrer les connecteurs source vers Amazon MSK Connect
<a name="msk-connect-migrate-source-connectors"></a>

Les connecteurs source sont des applications Apache Kafka Connect qui importent des enregistrements depuis des systèmes externes dans Kafka. Cette section décrit le processus de migration des applications du connecteur source Apache Kafka Connect qui exécutent des clusters Kafka Connect sur site ou des clusters Kafka Connect autogérés qui s'exécutent sur AWS Amazon MSK Connect.

L'application du connecteur source Kafka Connect stocke les décalages dans une rubrique nommée avec la valeur définie pour la propriété de configuration. `offset.storage.topic` Voici des exemples de messages de décalage pour un connecteur JDBC exécutant deux tâches qui importent des données à partir de deux tables différentes nommées `movies` et. `shows` La dernière ligne importée depuis le tableau des films possède un identifiant principal de`18343`. La dernière ligne importée depuis le tableau des shows possède un ID principal de`732`.

```
["jdbcsource",{"protocol":"1","table":"sample.movies"}] {"incrementing":18343}
["jdbcsource",{"protocol":"1","table":"sample.shows"}] {"incrementing":732}
```

Pour migrer les connecteurs source vers Amazon MSK Connect, procédez comme suit :

1. Créez un [plugin personnalisé](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) Amazon MSK Connect en extrayant les bibliothèques de connecteurs de votre cluster Kafka Connect sur site ou autogéré.

1. Créez les [propriétés du worker](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) Amazon MSK Connect et définissez les propriétés `key.converter``value.converter`, avec les mêmes valeurs que celles définies pour le connecteur Kafka qui s'exécute dans votre cluster Kafka Connect existant. `offset.storage.topic`

1. Suspendez l'application du connecteur sur le cluster existant en effectuant une `PUT /connectors/connector-name/pause` demande sur le cluster Kafka Connect existant.

1. Assurez-vous que toutes les tâches de l'application du connecteur sont complètement arrêtées. Vous pouvez arrêter les tâches soit en faisant une `GET /connectors/connector-name/status` demande sur le cluster Kafka Connect existant, soit en consommant les messages du nom de rubrique défini pour la propriété`status.storage.topic`.

1. Obtenez la configuration du connecteur à partir du cluster existant. Vous pouvez obtenir la configuration du connecteur soit en faisant une `GET /connectors/connector-name/config/` demande sur le cluster existant, soit en consommant les messages du nom de rubrique défini pour la propriété`config.storage.topic`.

1. Créez un nouveau [connecteur Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) portant le même nom qu'un cluster existant. Créez ce connecteur à l'aide du plug-in personnalisé que vous avez créé à l'étape 1, des propriétés de travail que vous avez créées à l'étape 2 et de la configuration du connecteur que vous avez extraite à l'étape 5.

1. Lorsque le statut du connecteur Amazon MSK est défini`active`, consultez les journaux pour vérifier que le connecteur a commencé à importer des données depuis le système source.

1. Supprimez le connecteur du cluster existant en effectuant une `DELETE /connectors/connector-name` demande.

# Migrer les connecteurs récepteurs vers Amazon MSK Connect
<a name="msk-connect-migrate-sink-connectors"></a>

Les connecteurs Sink sont des applications Apache Kafka Connect qui exportent des données de Kafka vers des systèmes externes. Cette section décrit le processus de migration des applications du connecteur récepteur Apache Kafka Connect qui exécutent des clusters Kafka Connect sur site ou des clusters Kafka Connect autogérés qui s'exécutent sur AWS Amazon MSK Connect.

Les connecteurs de réception Kafka Connect utilisent l'API d'adhésion au groupe Kafka et stockent les offsets dans les mêmes `__consumer_offset` rubriques qu'une application grand public classique. Ce comportement simplifie la migration du connecteur récepteur d'un cluster autogéré vers Amazon MSK Connect.

Pour migrer les connecteurs récepteurs vers Amazon MSK Connect, procédez comme suit :

1. Créez un [plugin personnalisé](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html) Amazon MSK Connect en extrayant les bibliothèques de connecteurs de votre cluster Kafka Connect sur site ou autogéré.

1. Créez les [propriétés du worker](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-config-provider.html#msk-connect-config-providers-create-custom-config) Amazon MSK Connect et définissez les propriétés `key.converter` et `value.converter` les mêmes valeurs que celles définies pour le connecteur Kafka qui s'exécute dans votre cluster Kafka Connect existant.

1. Suspendez l'application du connecteur sur votre cluster existant en effectuant une `PUT /connectors/connector-name/pause` demande sur le cluster Kafka Connect existant.

1. Assurez-vous que toutes les tâches de l'application du connecteur sont complètement arrêtées. Vous pouvez arrêter les tâches soit en faisant une `GET /connectors/connector-name/status` demande sur le cluster Kafka Connect existant, soit en consommant les messages du nom de rubrique défini pour la propriété`status.storage.topic`.

1. Obtenez la configuration du connecteur à partir du cluster existant. Vous pouvez obtenir la configuration du connecteur soit en faisant une `GET /connectors/connector-name/config` demande sur le cluster existant, soit en consommant les messages du nom de rubrique défini pour la propriété`config.storage.topic`.

1. Créez un nouveau [connecteur Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html) portant le même nom que le cluster existant. Créez ce connecteur à l'aide du plug-in personnalisé que vous avez créé à l'étape 1, des propriétés de travail que vous avez créées à l'étape 2 et de la configuration du connecteur que vous avez extraite à l'étape 5.

1. Lorsque le statut du connecteur Amazon MSK est défini`active`, consultez les journaux pour vérifier que le connecteur a commencé à importer des données depuis le système source.

1. Supprimez le connecteur du cluster existant en effectuant une `DELETE /connectors/connector-name` demande.

# Résoudre les problèmes dans Amazon MSK Connect
<a name="msk-connect-troubleshooting"></a>

La documentation suivante peut vous aider à résoudre les problèmes que vous pouvez rencontrer lors de l'utilisation de MSK Connect. Vous pouvez également publier votre problème sur [AWS re:Post](https://repost.aws/).

**Le connecteur ne parvient pas à accéder aux ressources hébergées sur l'Internet public**  
Consultez [Activation de l'accès à Internet pour Amazon MSK Connect](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-internet-access.html).

**Le nombre de tâches en cours d'exécution du connecteur n'est pas égal au nombre de tâches spécifié dans tasks.max**  
Voici quelques raisons pour lesquelles un connecteur peut utiliser moins de tâches que la configuration tasks.max spécifiée :
+ Certaines implémentations de connecteurs limitent le nombre de tâches pouvant être utilisées. Par exemple, le connecteur Debezium pour MySQL est limité à l'utilisation d'une seule tâche.
+ Lorsque vous utilisez le mode de capacité autoscalée, Amazon MSK Connect remplace la propriété tasks.max d'un connecteur par une valeur proportionnelle au nombre de travailleurs exécutant le connecteur et au nombre de par travailleur. MCUs Si vous avez configuré le `maxAutoscalingTaskCount` paramètre facultatif, la `tasks.max` valeur ne dépassera pas cette limite. Pour plus d'informations, voir [Comprendre le nombre maximal de tâches de mise à l'échelle automatique](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#msk-connect-max-autoscaling-task-count).
+ Pour les connecteurs récepteurs, le niveau de parallélisme (nombre de tâches) ne peut pas être supérieur au nombre de partitions de rubrique. Bien que vous puissiez définir une valeur supérieure à tasks.max, une seule partition n'est jamais traitée par plus d'une tâche à la fois.
+ Dans Kafka Connect 2.7.x, l'assignateur de partition client par défaut est `RangeAssignor`. Le comportement de cet assignateur consiste à donner la première partition de chaque rubrique à un seul consommateur, la deuxième partition de chaque rubrique à un seul consommateur, etc. Cela signifie que le nombre maximum de tâches actives utilisées par un connecteur récepteur `RangeAssignor` est égal au nombre maximal de partitions utilisées dans un même sujet. Si cela ne fonctionne pas pour votre cas d'utilisation, vous devez [créer une configuration de worker](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config) dans laquelle la propriété `consumer.partition.assignment.strategy` est définie sur un assignateur de partition consommateur plus approprié. Voir [Interface Kafka 2.7 ConsumerPartitionAssignor : *toutes les classes d'implémentation connues*](https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html).