本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为 MSK Connect 设置 EventBridge Kafka 水槽连接器
本主题向您展示如何为 MSK Connect 设置 EventBridge Kafka 接收器连接器
先决条件
在部署连接器之前,请确保您拥有以下资源:
- 
                    Amazon MSK 集群:用于生成和使用 Kafka 消息的主动 MSK 集群。 
- 
                    Amazon EventBridge 活动总线:用于接收来自 Kafka 主题的事件的事件的活动总线。 EventBridge 
- 
                    IAM 角色:创建具有 MSK Connect 和连接 EventBridge 器所需权限的 IAM 角色。 
- 
                    通过 MSK Connect 或在 MSK 集群的 VPC 和子网中 EventBridge 创建的 VPC 接口终端节点访问公共互联网。这可以帮助您避免在不需要 NAT 网关的情况下穿越公共互联网。 
- 
                    用于创建主题和向 Kafka 发送记录的客户机器 AWS CloudShell, EC2 例如 Amazon 实例或。 
设置 MSK Connect 所需的资源
您为连接器创建 IAM 角色,然后创建连接器。您还可以创建 EventBridge 规则来筛选发送到事件总线的 Kafka EventBridge 事件。
连接器的 IAM 角色
您与连接器关联的 IAM 角色必须具有允许向其发送事件的PutEvents权限 EventBridge。以下 IAM 策略示例授予您向名example-event-bus为的事件总线发送事件的权限。确保将以下示例中的资源 ARN 替换为事件总线的 ARN。
此外,您必须确保连接器的 IAM 角色包含以下信任策略。
传入事件的 EventBridge 规则
您可以创建将传入事件与事件数据标准(称为事件模式)进行匹配的规则。使用事件模式,您可以定义筛选传入事件的标准,并确定哪些事件应触发特定规则并随后路由到指定目标。以下事件模式示例与发送到事件总线的 Kafka 事件相匹配。 EventBridge
{ "detail": { "topic": ["msk-eventbridge-tutorial"] } }
以下是 EventBridge 使用 Kafka 接收器连接器从 Kafka 发送到的事件的示例。
{ "version": "0", "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", "account": "123456789012", "time": "2025-03-26T10:15:00Z", "region": "us-east-1", "detail-type": "msk-eventbridge-tutorial", "source": "kafka-connect.msk-eventbridge-tutorial", "resources": [], "detail": { "topic": "msk-eventbridge-tutorial", "partition": 0, "offset": 0, "timestamp": 1742984100000, "timestampType": "CreateTime", "headers": [], "key": "order-1", "value": { "orderItems": [ "item-1", "item-2" ], "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025" } } }
在 EventBridge 控制台中,使用此示例模式在事件总线上创建规则并指定目标,例如 CloudWatch 日志组。 EventBridge 控制台将自动为 CloudWatch 日志组配置必要的访问策略。
创建连接器
在下一节中,您将使用创建和部署 EventBridge Kafka 接收器连接器
步骤 1:下载连接器
从 Ka  EventBridge  fka  EventBridge  连接器的GitHub 版本页面kafka-eventbridge-sink-with-dependencies.jar器。然后,将文件保存到计算机上的首选位置。
步骤 2:创建 Amazon S3 存储桶
- 
                        要将 JAR 文件存储在 Amazon S3 中以用于 MSK Connect,请打开 AWS Management Console,然后选择 Amazon S3。 
- 
                        在 Amazon S3 控制台中,选择创建存储桶,然后输入唯一的存储桶名称。例如 amzn-s3-demo-bucket1-eb-connector。
- 
                        为您的 Amazon S3 存储桶选择合适的区域。确保它与部署您的 MSK 集群的区域相匹配。 
- 
                        对于 Bucket 设置,请保留默认选项或根据需要进行调整。 
- 
                        选择创建存储桶。 
- 
                        将 JAR 文件上传到亚马逊 S3 存储桶。 
第 3 步:在 MSK Connect 中创建插件
- 
                        打开 AWS Management Console,然后导航到 MSK Connect。 
- 
                        在左侧导航窗格中,选择自定义插件。 
- 
                        选择 “创建插件”,然后输入插件名称。例如 eventbridge-sink-plugin。
- 
                        要查看自定义插件位置,请粘贴 S3 对象 URL。 
- 
                        为插件添加可选描述。 
- 
                        选择 “创建插件”。 
创建插件后,您可以使用它在 MSK Connect 中配置和部署 EventBridge Kafka 连接器。
步骤 4:创建连接器
在创建连接器之前,我们建议创建所需的 Kafka 主题以避免连接器错误。要创建主题,请使用您的客户端计算机。
- 
                        在 MSK 控制台的左侧窗格中,选择连接器,然后选择创建连接器。 
- 
                        在插件列表中,选择 eventbridge-sink-plugin,然后选择下一步。 
- 
                        对于连接器名称,请输入 EventBridgeSink。
- 
                        在集群列表中,选择您的 MSK 集群。 
- 
                        复制连接器的以下配置并将其粘贴到 “连接器配置” 字段中 根据需要替换以下配置中的占位符。 - 
                                aws.eventbridge.endpoint.uri如果您的 MSK 集群具有公共互联网访问权限,请将其删除。
- 
                                如果您使用 PrivateLink 安全地从 MSK 连接到 EventBridge,请将后 https://面的 DNS 部分替换为您之前创建的(可选)VPC 接口终端节点的 EventBridge 正确私有 DNS 名称。
- 
                                将以下配置中的 EventBridge 事件总线 ARN 替换为事件总线的 ARN。 
- 
                                更新任何特定于区域的值。 
 { "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector", "aws.eventbridge.connector.id": "msk-eventbridge-tutorial", "topics": "msk-eventbridge-tutorial", "tasks.max": "1", "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com", "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "aws.eventbridge.region": "us-east-1", "auto.offset.reset": "earliest", "key.converter": "org.apache.kafka.connect.storage.StringConverter" }有关连接器配置的更多信息,请参见eventbridge-kafka-connector 。 如果需要,请更改工作人员和自动缩放的设置。我们还建议使用下拉列表中最新可用(推荐)的 Apache Kafka Connect 版本。在 “访问权限” 下,使用之前创建的角色。我们还建议启用日志功能,以实现可 CloudWatch 观察性和故障排除。根据需要调整其他可选设置,例如标签。然后,部署连接器并等待状态进入运行状态。 
- 
                                
向 Kafka 发送消息
您可以使用 Kafka Connect 中提供的和(可选)key.converter设置来指定不同的转换器,从而配置消息编码,例如 Apache Avro value.converter 和 JSON。
如使用 for 所示,本主题connector example中的配置为处理 JSON 编码的消息。org.apache.kafka.connect.json.JsonConverter value converter当连接器处于 “运行” 状态时,从您的客户端计算机向 msk-eventbridge-tutorial Kafka 主题发送记录。