Rubrique Utiliser le stockage offset personnalisé - Amazon Managed Streaming for Apache Kafka

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Rubrique Utiliser le stockage offset personnalisé

Pour assurer la continuité des décalages entre les connecteurs source, vous pouvez utiliser une rubrique de stockage des décalages de votre choix au lieu de la rubrique par défaut. La spécification d'une rubrique de stockage des décalages vous aide à accomplir des tâches telles que la création d'un connecteur source qui reprend la lecture à partir du dernier décalage d'un connecteur précédent.

Pour spécifier une rubrique de stockage des décalages, vous devez fournir une valeur pour la propriété offset.storage.topic dans votre configuration de worker avant de créer un connecteur. Si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. Si vous créez une rubrique de stockage de décalages personnalisée, vous devez définir cleanup.policy sur compact dans la configuration de votre rubrique.

Note

Si vous spécifiez une rubrique de stockage des décalages lorsque vous créez un connecteur récepteur, MSK Connect crée la rubrique si elle n'existe pas déjà. Toutefois, cette rubrique ne sera pas utilisée pour enregistrer les décalages du connecteur.

Les décalages du connecteur récepteur sont plutôt gérés à l'aide du protocole de groupe de consommateurs Kafka. Chaque connecteur récepteur crée un groupe nommé connect-{CONNECTOR_NAME}. Tant que le groupe de consommateurs existe, tous les connecteurs récepteurs successifs que vous créez avec la même valeur CONNECTOR_NAME seront maintenus à partir du dernier décalage validé.

Important

Si vous souhaitez mettre à jour une configuration de connecteur existante tout en préservant la continuité du décalage, utilisez l' UpdateConnector API. Pour de plus amples informations, veuillez consulter Mettre à jour un connecteur.

Exemple: Spécification d'un sujet de stockage offset lors de la recréation d'un connecteur source

Si vous devez supprimer et recréer un connecteur tout en maintenant la continuité du décalage, vous pouvez spécifier un sujet de stockage du décalage dans votre configuration de travail. Supposons, par exemple, que vous disposiez d'un connecteur de capture des données modifiées (CDC) et que vous souhaitiez le recréer sans perdre votre place dans le flux CDC. Les étapes suivantes expliquent comment effectuer cette tâche.

  1. Sur votre machine cliente, exécutez la commande suivante pour trouver le nom de la rubrique de stockage des décalages de votre connecteur. Remplacez <bootstrapBrokerString> par la chaîne de l'agent d'amorçage de votre cluster. Pour obtenir des instructions sur l'obtention de votre chaîne de l'agent d'amorçage, consultez Trouvez les courtiers bootstrap pour un cluster Amazon MSK.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    La sortie suivante présente une liste de toutes les rubriques du cluster, y compris les rubriques du connecteur interne par défaut. Dans cet exemple, le connecteur CDC existant utilise la rubrique de stockage des décalages par défaut créée par MSK Connect. C'est pourquoi la rubrique de stockage des décalages est appelée __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. Ouvrez la console Amazon MSK à l'adresse https://console.aws.amazon.com/msk/.

  3. Choisissez votre connecteur dans la liste des connecteurs. Copiez et enregistrez le contenu du champ de configuration du connecteur afin de pouvoir le modifier et l'utiliser pour créer le nouveau connecteur.

  4. Pour supprimer la connecteur, choisissez Supprimer. Puis saisissez le nom du connecteur dans le champ d'entrée du texte pour confirmer la suppression.

  5. Créez une configuration de worker personnalisée avec des valeurs adaptées à votre scénario. Pour obtenir des instructions, veuillez consulter Création d'une configuration de travail personnalisée.

    Dans votre configuration de worker, vous devez spécifier le nom de la rubrique de stockage des décalages que vous avez précédemment récupérée en tant que valeur pour offset.storage.topic comme dans la configuration suivante.

    config.providers.secretManager.param.aws.region=eu-west-3 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. Important

    Vous devez donner à votre nouveau connecteur le même nom que l'ancien connecteur.

    Créez un nouveau connecteur à l'aide de la configuration de worker que vous avez configurée à l'étape précédente. Pour obtenir des instructions, veuillez consulter Création d’un connecteur.