SelfManagedKafka - AWS Serverless Application Model

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

SelfManagedKafka

描述 SelfManagedKafka 事件源类型的对象。有关更多信息,请参阅《开发人员指南》中的AWS Lambda 与自行管理的 Apache Kafka 配合使用。AWS Lambda

AWS Serverless Application Model (AWS SAM) 在设置此事件类型时生成AWS::Lambda::EventSourceMapping资源。

要使用架构注册表,您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例,请参阅使用 IAM 角色完成设置

语法

要在 AWS SAM 模板中声明此实体,请使用以下语法。

属性

BatchSize

Lambda 从流中提取并发送到函数的每个批处理中的最大记录数。

类型:整数

必需:否

默认值:100

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的BatchSize属性。

最小值1

最大值10000

ConsumerGroupId

用于配置如何从 Kafka 主题中读取事件的字符串。

类型:字符串

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的SelfManagedKafkaConfiguration属性。

DestinationConfig

一个配置对象,用于在 Lambda 处理事件后指定事件目的地。

使用此属性指定来自管理 Kafka 事件源的失败调用的目的地。

类型DestinationConfig

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的 DestinationConfig属性。

Enabled

禁用事件源映射以暂停轮询和调用。

类型:布尔值

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的Enabled属性。

FilterCriteria

定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 事件筛选

类型FilterCriteria

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的FilterCriteria属性。

KafkaBootstrapServers

Kafka 代理的引导服务器列表。包括端口,例如 broker.example.com:xxxx

类型:列表

必需:否

AWS CloudFormation 兼容性:此属性是独有的 AWS SAM ,没有 AWS CloudFormation 等效属性。

KmsKeyArn

用于加密与此事件相关信息的密钥的 Amazon 资源名称(ARN)。

类型:字符串

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的KmsKeyArn属性。

ProvisionedPollerConfig

用于增加用于计算事件源映射的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 20 个轮询器。有关示例,请参阅 ProvisionedPollerConfig 示例

类型ProvisionedPollerConfig

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的ProvisionedPollerConfig属性。

SchemaRegistryConfig

用于将架构注册表与自管理 Kafka 事件源配合使用的配置。

注意

需要配置ProvisionedPollerConfig此功能。

类型: SchemaRegistryConfig

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的SelfManagedKafkaEventSourceConfig属性。

SourceAccessConfigurations

用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。

有效值BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE

类型SourceAccessConfiguration 列表

必需:是

AWS CloudFormation 兼容性:此属性是AWS::Lambda::EventSourceMapping资源SelfManagedKafkaEventSourceConfig属性的一部分。

StartingPosition

在流中开始读取数据的位置。

  • AT_TIMESTAMP – 指定开始读取记录的时间。

  • LATEST - 仅读取新记录。

  • TRIM_HORIZON - 处理所有可用的记录。

有效值AT_TIMESTAMP | LATEST | TRIM_HORIZON

类型:字符串

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的StartingPosition属性。

StartingPositionTimestamp

开始读取的时间(以 Unix 时间秒为单位) 在 StartingPosition 被指定为 AT_TIMESTAMP 的情况下定义 StartingPositionTimestamp

类型:双精度

必需:否

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的StartingPositionTimestamp属性。

Topics

Kafka 主题的名称。

类型:列表

必需:是

AWS CloudFormation 兼容性:此属性直接传递给AWS::Lambda::EventSourceMapping资源的Topics属性。

示例

使用 IAM 角色完成设置

以下示例显示了完整的设置,包括使用架构注册表所需的 IAM 角色配置:

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig 示例

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

自行管理的 Kafka 事件源

以下是 SelfManagedKafka 事件源类型的示例。

YAML

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic

带有 AWS Glue 架构注册表的自管理 Kafka 事件源

以下是使用 AWS Glue 架构注册表配置SelfManagedKafka的事件源类型的示例。

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678

带有 Confluent 架构注册表的自管理 Kafka 事件源

以下是使用 Confluent 架构注册表配置SelfManagedKafka的事件源类型的示例。

Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret