本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用自定义偏移存储主题
要在源连接器之间提供偏移连续性,您可以使用自己选择的偏移存储主题来代替默认主题。指定偏移存储主题可以帮助您完成创建源连接器之类的任务,该连接器可从上一个连接器的最后一个偏移恢复读取。
要指定偏移存储主题,请在创建连接器之前在工作程序配置中为 offset.storage.topic 属性提供一个值。如果要重复使用偏移存储主题来消耗先前创建的连接器的偏移,则必须为新连接器指定与旧连接器相同的名称。如果您创建自定义偏移存储主题,则必须在主题配置中将 cleanup.policycompact。
注意
如果您在创建接收器连接器时指定了偏移存储主题,若该主题尚不存在,则 MSK Connect 会创建该主题。但是,该主题不会用于存储连接器偏移,
而是使用 Kafka 使用器组协议来管理接收器连接器偏移。每个接收器连接器都会创建一个名为 connect-{CONNECTOR_NAME} 的组。只要使用器组存在,您创建的任何具有相同 CONNECTOR_NAME 值的连续接收器连接器都将从上次提交的偏移继续。
重要
如果您想在保持偏移连续性的同时更新现有连接器配置,请使用 UpdateConnector API。有关更多信息,请参阅 更新连接器。
例:在重新创建源连接器时指定偏移存储主题
如果您需要在保持偏移连续性的同时删除和重新创建连接器,则可以在工作器配置中指定偏移存储主题。例如,假设您有一个变更数据捕获 (CDC) 连接器,并且您想在不丢失 CDC 数据流中的位置的情况下重新创建它。以下步骤演示如何完成此任务。
-
在您的客户端计算机上,运行以下命令以查找连接器偏移存储主题的名称。将
替换为集群的引导代理字符串。有关获取引导代理字符串的说明,请参阅获取 Amazon MSK 集群的引导代理。<bootstrapBrokerString><path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>以下输出显示了所有集群主题的列表,包括所有默认的内部连接器主题。在此示例中,现有 CDC 连接器使用由 MSK Connect 创建的默认偏移存储主题。这就是偏移存储主题名为
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2的原因。__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2 -
在 https://console.aws.amazon.com/msk/
打开 Amazon MSK 控制台。 -
从连接器列表中选择您的连接器。复制并保存连接器配置字段的内容,以便您可以对其进行修改并使用它来创建新连接器。
-
要删除连接器,请选择删除。然后在文本输入字段中输入连接器名称,以确认删除。
-
使用适合您场景的值创建自定义工作程序配置。有关说明,请参阅创建自定义工作程序配置。
在工作程序配置中,必须将之前检索到的偏移存储主题的名称指定为类似于以下配置中
offset.storage.topic的值。config.providers.secretManager.param.aws.region=eu-west-3 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 -
重要
必须为新连接器指定与旧连接器相同的名称。
使用在上一步中设置的工作程序配置创建新连接器。有关说明,请参阅创建 连接器。