本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
MSK
描述MSK
事件來源類型的物件。如需詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的搭配使用 AWS Lambda 與 Amazon MSK。
AWS Serverless Application Model (AWS SAM) 會在設定此事件類型時產生 AWS::Lambda::EventSourceMapping 資源。
若要使用結構描述登錄檔,您需要定義函數的特定 IAM 角色許可。如需所需組態的範例,請參閱使用 IAM 角色完成設定。
語法
若要在 AWS SAM 範本中宣告此實體,請使用下列語法。
YAML
ConsumerGroupId:
String
DestinationConfig:DestinationConfig
FilterCriteria:FilterCriteria
KmsKeyArn:String
MaximumBatchingWindowInSeconds:Integer
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Stream:String
Topics:List
屬性
-
ConsumerGroupId
-
設定如何從 Kafka 主題讀取事件的字串。
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的AmazonManagedKafkaConfiguration
屬性。 -
DestinationConfig
-
組態物件,指定在 Lambda 處理過後事件的目標。
使用此屬性可指定來自 Amazon MSK 事件來源的失敗調用目的地。
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的DestinationConfig
屬性。 -
FilterCriteria
-
定義判斷 Lambda 是否應處理事件之條件的物件。如需詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的AWS Lambda 事件篩選。
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的FilterCriteria
屬性。 -
KmsKeyArn
-
金鑰的 Amazon Resource Name (ARN),用於加密與此事件相關的資訊。
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的KmsKeyArn
屬性。 -
MaximumBatchingWindowInSeconds
-
調用函式前收集記錄的最長時間 (單位為秒)。
類型:整數
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的MaximumBatchingWindowInSeconds
屬性。 -
ProvisionedPollerConfig
-
用於增加用於計算事件來源映射之輪詢器數量的組態。此組態允許最少 1 個輪詢器,最多 20 個輪詢器。如需範例,請參閱 ProvisionedPollerConfig 範例。
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的ProvisionedPollerConfig
屬性。 SchemaRegistryConfig
-
搭配 Kafka 事件來源使用結構描述登錄檔的組態。
注意
此功能
ProvisionedPollerConfig
需要設定。類型: SchemaRegistryConfig
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的AmazonManagedKafkaEventSourceConfig
屬性。 -
SourceAccessConfigurations
-
保護和定義事件來源的身分驗證協定、VPC 元件或虛擬主機。
有效值:
CLIENT_CERTIFICATE_TLS_AUTH
類型:SourceAccessConfiguration 清單
必要:否
AWS CloudFormation 相容性:此屬性是
AWS::Lambda::EventSourceMapping
資源的 AmazonManagedKafkaEventSourceConfig 屬性的一部分。 -
StartingPosition
-
要從中開始讀取的串流位置。
-
AT_TIMESTAMP
– 指定從中開始讀取記錄的時間。 -
LATEST
– 唯讀新記錄。 -
TRIM_HORIZON
– 處理所有可用的記錄。
有效值:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
類型:字串
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的StartingPosition
屬性。 -
-
StartingPositionTimestamp
-
開始讀取的時間,以 Unix 時間秒為單位。定義
StartingPositionTimestamp
StartingPosition
何時指定為AT_TIMESTAMP
。類型:Double
必要:否
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的StartingPositionTimestamp
屬性。 -
Stream
-
資料串流或串流消費者的 Amazon Resource Name (ARN)。
類型:字串
必要:是
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的EventSourceArn
屬性。 -
Topics
-
Kafka 主題名稱。
類型:清單
必要:是
AWS CloudFormation 相容性:此屬性會直接傳遞至
AWS::Lambda::EventSourceMapping
資源的Topics
屬性。
範例
使用 IAM 角色完成設定
下列範例顯示完整的設定,包括使用結構描述登錄所需的 IAM 角色組態:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig 範例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
現有叢集的 Amazon MSK 範例
以下是 中已存在之 Amazon MSK 叢集MSK
的事件來源類型範例 AWS 帳戶。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic
相同範本中宣告之叢集的 Amazon MSK 範例
以下是在相同範本檔案中宣告之 Amazon MSK 叢集MSK
的事件來源類型範例。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic
具有結構描述登錄檔的 MSK 事件來源
以下是使用結構描述登錄檔設定MSK
的事件來源類型範例。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE
具有 Confluent 結構描述登錄檔的 MSK 事件來源
以下是使用 Confluent Schema Registry 設定MSK
的事件來源類型範例。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE