本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
SelfManagedKafka
描述 SelfManagedKafka 事件源类型的对象。有关更多信息,请参阅《开发人员指南》中的AWS Lambda 与自行管理的 Apache Kafka 配合使用。AWS Lambda
AWS Serverless Application Model (AWS SAM) 在设置此事件类型时生成AWS::Lambda::EventSourceMapping资源。
要使用架构注册表,您需要为函数定义特定的 IAM 角色权限。有关所需配置的示例,请参阅使用 IAM 角色完成设置。
语法
要在 AWS SAM 模板中声明此实体,请使用以下语法。
YAML
BatchSize:IntegerBisectBatchOnFunctionError:BooleanConsumerGroupId:StringDestinationConfig:DestinationConfigEnabled:BooleanFilterCriteria:FilterCriteriaKafkaBootstrapServers:ListFunctionResponseTypes:ListKmsKeyArn:StringMaximumRecordAgeInSeconds:IntegerMaximumRetryAttempts:IntegerProvisionedPollerConfig:ProvisionedPollerConfigSchemaRegistryConfig:SchemaRegistryConfigSourceAccessConfigurations:SourceAccessConfigurationsStartingPosition:StringStartingPositionTimestamp:DoubleTopics:List
Properties
-
BatchSize -
Lambda 从流中提取并发送到函数的每个批处理中的最大记录数。
类型:整数
必需:否
默认值:100
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的BatchSize属性。最小值:
1最大值:
10000 -
BisectBatchOnFunctionError -
如果函数返回错误,则将批次拆分为两批并重试。
类型:布尔值
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的BisectBatchOnFunctionError属性。 -
ConsumerGroupId -
用于配置如何从 Kafka 主题中读取事件的字符串。
类型:字符串
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的SelfManagedKafkaConfiguration属性。 -
DestinationConfig -
一个配置对象,用于在 Lambda 处理事件后指定事件目的地。
使用此属性指定来自管理 Kafka 事件源的失败调用的目的地。
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的DestinationConfig属性。 -
Enabled -
禁用事件源映射以暂停轮询和调用。
类型:布尔值
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的Enabled属性。 -
FilterCriteria -
定义用于确定 Lambda 是否应处理事件的条件的对象。有关更多信息,请参阅《AWS Lambda 开发人员指南》中的 AWS Lambda 事件筛选。
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的FilterCriteria属性。 -
KafkaBootstrapServers -
Kafka 代理的引导服务器列表。包括端口,例如
broker.example.com:xxxx类型:列表
必需:否
CloudFormation 兼容性:此属性是独有的 AWS SAM ,没有 CloudFormation 等效属性。
-
FunctionResponseTypes -
当前应用于事件源映射的响应类型的列表。有关详细信息,请参阅《AWS Lambda 开发人员指南》中的报告批处理项目失败。
有效值:
ReportBatchItemFailures类型:列表
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的FunctionResponseTypes属性。 -
KmsKeyArn -
用于加密与此事件相关信息的密钥的 Amazon 资源名称(ARN)。
类型:字符串
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的KmsKeyArn属性。 -
MaximumRecordAgeInSeconds -
Lambda 发送到函数以进行处理的记录的最长期限。
类型:整数
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的MaximumRecordAgeInSeconds属性。 -
MaximumRetryAttempts -
在函数返回错误时重试的最大次数。
类型:整数
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的MaximumRetryAttempts属性。 -
ProvisionedPollerConfig -
用于增加计算事件源映射所使用的轮询器数量的配置。此配置允许最少 1 个轮询器和最多 2000 个轮询器。有关具体示例,请参阅 ProvisionedPollerConfig 示例。
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的ProvisionedPollerConfig属性。 SchemaRegistryConfig-
将架构注册表与自托管 Kafka 事件源配合使用的配置。
注意
此功能需要配置
ProvisionedPollerConfig。类型: SchemaRegistryConfig
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的SelfManagedKafkaEventSourceConfig属性。 -
SourceAccessConfigurations -
用于保护与定义事件源的身份验证协议数组 VPC 组件或虚拟化主机。
有效值:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE类型:SourceAccessConfiguration 列表
必需:是
CloudFormation 兼容性:此属性是
AWS::Lambda::EventSourceMapping资源SelfManagedKafkaEventSourceConfig属性的一部分。 -
StartingPosition -
在流中开始读取数据的位置。
-
AT_TIMESTAMP– 指定开始读取记录的时间。 -
LATEST- 仅读取新记录。 -
TRIM_HORIZON- 处理所有可用的记录。
有效值:
AT_TIMESTAMP|LATEST|TRIM_HORIZON类型:字符串
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的StartingPosition属性。 -
-
StartingPositionTimestamp -
开始读取的时间(以 Unix 时间秒为单位) 在
StartingPosition被指定为AT_TIMESTAMP的情况下定义StartingPositionTimestamp。类型:双精度
必需:否
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的StartingPositionTimestamp属性。 -
Topics -
Kafka 主题的名称。
类型:列表
必需:是
CloudFormation 兼容性:此属性直接传递给
AWS::Lambda::EventSourceMapping资源的Topics属性。
示例
通过 IAM 角色完成设置
以下示例展示了完整的配置,包括使用架构注册表所需的 IAM 角色配置:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig 示例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 200
自行管理的 Kafka 事件源
以下是 SelfManagedKafka 事件源类型的示例。
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
带有 AWS Glue 架构注册表的自管理 Kafka 事件源
以下是使用 AWS Glue 架构注册表配置SelfManagedKafka的事件源类型的示例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
自托管 Kafka 事件源与 Confluent 架构注册表
以下是一个配置了 Confluent 架构注册表的 SelfManagedKafka 事件源类型的示例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret