本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
SelfManagedKafka
描述SelfManagedKafka
事件來源類型的物件。如需詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的使用 AWS Lambda 搭配自我管理的 Apache Kafka。
AWS Serverless Application Model (AWS SAM) 會在設定此事件類型時產生 AWS::Lambda::EventSourceMapping 資源。
若要使用結構描述登錄檔,您需要定義函數的特定 IAM 角色許可。如需所需組態的範例,請參閱使用 IAM 角色完成設定。
語法
若要在 AWS SAM 範本中宣告此實體,請使用下列語法。
YAML
BatchSize:
Integer
ConsumerGroupId:String
DestinationConfig:DestinationConfig
Enabled:Boolean
FilterCriteria:FilterCriteria
KafkaBootstrapServers:List
KmsKeyArn:String
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Topics:List
屬性
-
BatchSize
-
Lambda 從串流提取並傳送至函數的每個批次中的記錄數目上限。
類型:整數
必要:否
預設值:100
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的BatchSize
屬性。下限:
1
上限:
10000
-
ConsumerGroupId
-
設定如何從 Kafka 主題讀取事件的字串。
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的SelfManagedKafkaConfiguration
屬性。 -
DestinationConfig
-
組態物件,指定在 Lambda 處理過後事件的目標。
使用此屬性可指定從自我管理 Kafka 事件來源呼叫失敗的目的地。
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的DestinationConfig
屬性。 -
Enabled
-
停用事件來源映射以暫停輪詢和叫用。
類型:布林值
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的Enabled
屬性。 -
FilterCriteria
-
定義判斷 Lambda 是否應處理事件之條件的物件。如需詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的AWS Lambda 事件篩選。
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的FilterCriteria
屬性。 -
KafkaBootstrapServers
-
Kafka 代理程式的引導伺服器清單。包含連接埠,例如
broker.example.com:
xxxx
類型:列出
必要:否
AWS CloudFormation 相容性:此屬性對 是唯一的 AWS SAM ,並且沒有 AWS CloudFormation 同等的。
-
KmsKeyArn
-
金鑰的 Amazon Resource Name (ARN),用於加密與此事件相關的資訊。
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的KmsKeyArn
屬性。 -
ProvisionedPollerConfig
-
用於增加用於計算事件來源映射之輪詢器數量的組態。此組態允許最少 1 個輪詢器,最多 20 個輪詢器。如需範例,請參閱 ProvisionedPollerConfig 範例
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的ProvisionedPollerConfig
屬性。 SchemaRegistryConfig
-
搭配自我管理 Kafka 事件來源使用結構描述登錄檔的組態。
注意
此功能
ProvisionedPollerConfig
需要設定。類型: SchemaRegistryConfig
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的SelfManagedKafkaEventSourceConfig
屬性。 -
SourceAccessConfigurations
-
保護和定義事件來源的身分驗證協定、VPC 元件或虛擬主機。
有效值:
BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE
類型:SourceAccessConfiguration 清單
必要:是
AWS CloudFormation 相容性:此屬性是
AWS::Lambda::EventSourceMapping
資源的 SelfManagedKafkaEventSourceConfig 屬性的一部分。 -
StartingPosition
-
要從中開始讀取的串流位置。
-
AT_TIMESTAMP
– 指定從中開始讀取記錄的時間。 -
LATEST
– 唯讀新記錄。 -
TRIM_HORIZON
– 處理所有可用的記錄。
有效值:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的StartingPosition
屬性。 -
-
StartingPositionTimestamp
-
開始讀取的時間,以 Unix 時間秒為單位。定義
StartingPositionTimestamp
StartingPosition
何時指定為AT_TIMESTAMP
。類型:Double
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的StartingPositionTimestamp
屬性。 -
Topics
-
Kafka 主題名稱。
類型:列出
必要:是
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的Topics
屬性。
範例
使用 IAM 角色完成設定
下列範例顯示完整的設定,包括使用結構描述登錄所需的 IAM 角色組態:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyKafkaProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - my-kafka-broker-1:9092 - my-kafka-broker-2:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig 範例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
自我管理的 Kafka 事件來源
以下是SelfManagedKafka
事件來源類型的範例。
YAML
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: BatchSize: 1000 Enabled: true KafkaBootstrapServers: - abc.xyz.com:xxxx SourceAccessConfigurations: - Type: BASIC_AUTH URI: arn:aws:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - MyKafkaTopic
具有 AWS Glue 結構描述登錄檔的自我管理 Kafka 事件來源
以下是使用 AWS Glue 結構描述登錄檔設定SelfManagedKafka
的事件來源類型範例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678
具有 Confluent Schema Registry 的自我管理 Kafka 事件來源
以下是使用 Confluent Schema Registry 設定SelfManagedKafka
的事件來源類型範例。
Events: SelfManagedKafkaEvent: Type: SelfManagedKafka Properties: KafkaBootstrapServers: - abc.xyz.com:9092 Topics: - SchemaRegistryTestTopic StartingPosition: LATEST ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE SourceAccessConfigurations: - Type: VPC_SUBNET URI: subnet:subnet-12345678 - Type: VPC_SECURITY_GROUP URI: security_group:sg-12345678 - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:kafka-secret