MSK - AWS Serverless Application Model

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

MSK

MSK イベントソースタイプを説明するオブジェクトです。詳細については、「 AWS Lambda デベロッパーガイド」の「Amazon MSK AWS Lambda での の使用」を参照してください。

AWS Serverless Application Model (AWS SAM) このイベントタイプが設定されている場合、 は AWS::Lambda::EventSourceMappingリソースを生成します。

Schema Registry を使用するには、関数に特定の IAM ロールのアクセス許可を定義する必要があります。必要な設定の例については、「IAM ロールでのセットアップを完了する」を参照してください。

構文

AWS SAM テンプレートでこのエンティティを宣言するには、次の構文を使用します。

プロパティ

ConsumerGroupId

Kafka トピックからイベントを読み取る方法を設定する文字列。

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの AmazonManagedKafkaConfigurationプロパティに直接渡されます。

DestinationConfig

Lambda がイベントを処理した後のイベントの送信先を指定する構成オブジェクト。

このプロパティを使用して、Amazon MSK イベントソースから失敗した呼び出しの送信先を指定します。

タイプ: DestinationConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの DestinationConfigプロパティに直接渡されます。

FilterCriteria

Lambda がイベントを処理する必要があるかどうかを判断する基準を定義するオブジェクト。詳細については、AWS Lambda デベロッパーガイドAWS Lambda イベントのフィルタリングを参照してください。

タイプ: FilterCriteria

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの FilterCriteriaプロパティに直接渡されます。

KmsKeyArn

このイベントに関連する情報を暗号化するためのキーの Amazon リソースネーム (ARN)。

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの KmsKeyArnプロパティに直接渡されます。

MaximumBatchingWindowInSeconds

関数を呼び出すまでのレコード収集の最大時間 (秒) です。

タイプ: 整数

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの MaximumBatchingWindowInSecondsプロパティに直接渡されます。

ProvisionedPollerConfig

イベントソースマッピングの計算に使用されるポーラーの量を増やすための設定。この設定では、最小 1 つのポーラーと最大 20 のポーラーを使用できます。例については、「」を参照してくださいProvisionedPollerConfig の例

タイプ: ProvisionedPollerConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの ProvisionedPollerConfigプロパティに直接渡されます。

SchemaRegistryConfig

Kafka イベントソースでスキーマレジストリを使用するための設定。

注記

この機能はProvisionedPollerConfig設定する必要があります。

タイプ: SchemaRegistryConfig

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの AmazonManagedKafkaEventSourceConfigプロパティに直接渡されます。

SourceAccessConfigurations

認証プロトコルの配列、VPC コンポーネント、イベントソースを保護して定義する仮想ホスト。

有効な値: CLIENT_CERTIFICATE_TLS_AUTH

型: SourceAccessConfiguration のリスト

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの AmazonManagedKafkaEventSourceConfig プロパティの一部です。

StartingPosition

読み取りを開始するストリームの場所です。

  • AT_TIMESTAMP - レコードの読み取りを開始する時間を指定します。

  • LATEST - 新しいレコードのみを読み込みます。

  • TRIM_HORIZON - 使用可能なすべてのレコードを処理します。

有効な値: AT_TIMESTAMP | LATEST | TRIM_HORIZON

タイプ: 文字列

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの StartingPositionプロパティに直接渡されます。

StartingPositionTimestamp

Unix タイム秒単位で読み取りをスタートする時間。StartingPositionAT_TIMESTAMP として指定されている場合の StartingPositionTimestamp を定義します。

型: 倍精度

必須: いいえ

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの StartingPositionTimestampプロパティに直接渡されます。

Stream

データストリームまたはストリームコンシューマーの Amazon リソースネーム (ARN) です。

タイプ: 文字列

必須: はい

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの EventSourceArnプロパティに直接渡されます。

Topics

Kafka トピックの名前です。

タイプ: リスト

必須: はい

AWS CloudFormation 互換性: このプロパティは、 AWS::Lambda::EventSourceMappingリソースの Topicsプロパティに直接渡されます。

IAM ロールを使用してセットアップを完了する

次の例は、スキーマレジストリを使用するために必要な IAM ロール設定を含む完全なセットアップを示しています。

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig の例

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

既存のクラスターの Amazon MSK の例

以下は、 AWS アカウントに既に存在する Amazon MSK クラスター用の MSK イベントソースタイプの例です。

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

同じテンプレートで宣言されたクラスターの Amazon MSK の例

以下は、同じテンプレートファイルで宣言されている Amazon MSK クラスター用の MSK イベントソースタイプの例です。

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

スキーマレジストリを使用した MSK イベントソース

以下は、スキーマレジストリで設定されたMSKイベントソースタイプの例です。

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

Confluent Schema Registry を使用した MSK イベントソース

以下は、Confluent Schema Registry で設定されたMSKイベントソースタイプの例です。

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE