翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
MSK
MSK
イベントソースタイプを説明するオブジェクトです。詳細については、「 AWS Lambda デベロッパーガイド」の「Amazon MSK AWS Lambda での の使用」を参照してください。
AWS Serverless Application Model (AWS SAM) このイベントタイプが設定されている場合、 は AWS::Lambda::EventSourceMappingリソースを生成します。
Schema Registry を使用するには、関数に特定の IAM ロールのアクセス許可を定義する必要があります。必要な設定の例については、「IAM ロールでのセットアップを完了する」を参照してください。
構文
AWS SAM テンプレートでこのエンティティを宣言するには、次の構文を使用します。
YAML
ConsumerGroupId:
String
DestinationConfig:DestinationConfig
FilterCriteria:FilterCriteria
KmsKeyArn:String
MaximumBatchingWindowInSeconds:Integer
ProvisionedPollerConfig:ProvisionedPollerConfig
SchemaRegistryConfig:SchemaRegistryConfig
SourceAccessConfigurations:SourceAccessConfigurations
StartingPosition:String
StartingPositionTimestamp:Double
Stream:String
Topics:List
プロパティ
-
ConsumerGroupId
-
Kafka トピックからイベントを読み取る方法を設定する文字列。
タイプ: 文字列
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのAmazonManagedKafkaConfiguration
プロパティに直接渡されます。 -
DestinationConfig
-
Lambda がイベントを処理した後のイベントの送信先を指定する構成オブジェクト。
このプロパティを使用して、Amazon MSK イベントソースから失敗した呼び出しの送信先を指定します。
タイプ: DestinationConfig
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのDestinationConfig
プロパティに直接渡されます。 -
FilterCriteria
-
Lambda がイベントを処理する必要があるかどうかを判断する基準を定義するオブジェクト。詳細については、AWS Lambda デベロッパーガイドの AWS Lambda イベントのフィルタリングを参照してください。
タイプ: FilterCriteria
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのFilterCriteria
プロパティに直接渡されます。 -
KmsKeyArn
-
このイベントに関連する情報を暗号化するためのキーの Amazon リソースネーム (ARN)。
タイプ: 文字列
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのKmsKeyArn
プロパティに直接渡されます。 -
MaximumBatchingWindowInSeconds
-
関数を呼び出すまでのレコード収集の最大時間 (秒) です。
タイプ: 整数
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのMaximumBatchingWindowInSeconds
プロパティに直接渡されます。 -
ProvisionedPollerConfig
-
イベントソースマッピングの計算に使用されるポーラーの量を増やすための設定。この設定では、最小 1 つのポーラーと最大 20 のポーラーを使用できます。例については、「」を参照してくださいProvisionedPollerConfig の例。
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのProvisionedPollerConfig
プロパティに直接渡されます。 SchemaRegistryConfig
-
Kafka イベントソースでスキーマレジストリを使用するための設定。
注記
この機能は
ProvisionedPollerConfig
設定する必要があります。タイプ: SchemaRegistryConfig
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのAmazonManagedKafkaEventSourceConfig
プロパティに直接渡されます。 -
SourceAccessConfigurations
-
認証プロトコルの配列、VPC コンポーネント、イベントソースを保護して定義する仮想ホスト。
有効な値:
CLIENT_CERTIFICATE_TLS_AUTH
型: SourceAccessConfiguration のリスト
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースの AmazonManagedKafkaEventSourceConfig プロパティの一部です。 -
StartingPosition
-
読み取りを開始するストリームの場所です。
-
AT_TIMESTAMP
- レコードの読み取りを開始する時間を指定します。 -
LATEST
- 新しいレコードのみを読み込みます。 -
TRIM_HORIZON
- 使用可能なすべてのレコードを処理します。
有効な値:
AT_TIMESTAMP
|LATEST
|TRIM_HORIZON
タイプ: 文字列
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのStartingPosition
プロパティに直接渡されます。 -
-
StartingPositionTimestamp
-
Unix タイム秒単位で読み取りをスタートする時間。
StartingPosition
がAT_TIMESTAMP
として指定されている場合のStartingPositionTimestamp
を定義します。型: 倍精度
必須: いいえ
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのStartingPositionTimestamp
プロパティに直接渡されます。 -
Stream
-
データストリームまたはストリームコンシューマーの Amazon リソースネーム (ARN) です。
タイプ: 文字列
必須: はい
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのEventSourceArn
プロパティに直接渡されます。 -
Topics
-
Kafka トピックの名前です。
タイプ: リスト
必須: はい
AWS CloudFormation 互換性: このプロパティは、
AWS::Lambda::EventSourceMapping
リソースのTopics
プロパティに直接渡されます。
例
IAM ロールを使用してセットアップを完了する
次の例は、スキーマレジストリを使用するために必要な IAM ロール設定を含む完全なセットアップを示しています。
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig の例
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
既存のクラスターの Amazon MSK の例
以下は、 AWS アカウントに既に存在する Amazon MSK クラスター用の MSK
イベントソースタイプの例です。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic
同じテンプレートで宣言されたクラスターの Amazon MSK の例
以下は、同じテンプレートファイルで宣言されている Amazon MSK クラスター用の MSK
イベントソースタイプの例です。
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic
スキーマレジストリを使用した MSK イベントソース
以下は、スキーマレジストリで設定されたMSK
イベントソースタイプの例です。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE
Confluent Schema Registry を使用した MSK イベントソース
以下は、Confluent Schema Registry で設定されたMSK
イベントソースタイプの例です。
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE