MSK - AWS Serverless Application Model

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

MSK

L'objet décrivant un type de source d'événement MSK. Pour plus d'informations, consultez la section Utilisation AWS Lambda avec Amazon MSK dans le manuel du AWS Lambda développeur.

AWS Serverless Application Model (AWS SAM) génère une AWS::Lambda::EventSourceMappingressource lorsque ce type d'événement est défini.

Pour utiliser Schema Registry, vous devez définir des autorisations de rôle IAM spécifiques pour votre fonction. Voir Configuration complète avec des rôles IAM pour un exemple de configuration requise.

Syntaxe

Pour déclarer cette entité dans votre AWS SAM modèle, utilisez la syntaxe suivante.

Propriétés

ConsumerGroupId

Chaîne qui configure la façon dont les événements seront lus à partir des rubriques Kafka.

Type : chaîne

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la AmazonManagedKafkaConfiguration propriété d'une AWS::Lambda::EventSourceMapping ressource.

DestinationConfig

Objet de configuration qui spécifie la destination d'un événement après son traitement par Lambda.

Utilisez cette propriété pour spécifier la destination des invocations ayant échoués à partir de la source d'événements Amazon MSK.

Type : DestinationConfig

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la DestinationConfig propriété d'une AWS::Lambda::EventSourceMapping ressource.

FilterCriteria

Objet qui définit les critères permettant de déterminer si Lambda doit traiter un événement. Pour de plus amples informations, veuillez consulter AWS Lambda le filtrage d’événements dans le AWS Lambda Manuel du développeur.

Type : FilterCriteria

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la FilterCriteria propriété d'une AWS::Lambda::EventSourceMapping ressource.

KmsKeyArn

Le nom de ressource Amazon (ARN) de la clé permettant de chiffrer les informations relatives à cet événement.

Type : chaîne

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la KmsKeyArn propriété d'une AWS::Lambda::EventSourceMapping ressource.

MaximumBatchingWindowInSeconds

Intervalle de temps maximal (en secondes) pour collecter des enregistrements avant d’invoquer la fonction.

Type : entier

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la MaximumBatchingWindowInSeconds propriété d'une AWS::Lambda::EventSourceMapping ressource.

ProvisionedPollerConfig

Configuration pour augmenter le nombre de sondeurs utilisés pour calculer les mappages de sources d'événements. Cette configuration permet un minimum de 1 sondeur et un maximum de 20 sondeurs. Pour un exemple, reportez-vous àProvisionedPollerConfig exemple.

Type : ProvisionedPollerConfig

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la ProvisionedPollerConfig propriété d'une AWS::Lambda::EventSourceMapping ressource.

SchemaRegistryConfig

Configuration pour l'utilisation d'un registre de schémas avec la source d'événements Kafka.

Note

Cette fonctionnalité doit ProvisionedPollerConfig être configurée.

Type : SchemaRegistryConfig

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la AmazonManagedKafkaEventSourceConfig propriété d'une AWS::Lambda::EventSourceMapping ressource.

SourceAccessConfigurations

Tableau du protocole d’authentification, composants VPC ou hôte virtuel pour sécuriser et définir votre source d’événement.

Valeurs valides : CLIENT_CERTIFICATE_TLS_AUTH

Type : liste de propriétés SourceAccessConfiguration

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété fait partie de la AmazonManagedKafkaEventSourceConfigpropriété d'une AWS::Lambda::EventSourceMapping ressource.

StartingPosition

Position de début de la lecture dans le flux.

  • AT_TIMESTAMP : spécifier l'heure à partir de laquelle la lecture des enregistrements doit commencer.

  • LATEST : lire uniquement les nouveaux enregistrements.

  • TRIM_HORIZON : traiter tous les enregistrements disponibles.

Valeurs valides : AT_TIMESTAMP | LATEST | TRIM_HORIZON

Type : chaîne

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la StartingPosition propriété d'une AWS::Lambda::EventSourceMapping ressource.

StartingPositionTimestamp

L'heure à partir de laquelle commencer la lecture, en secondes au format horaire Unix. Définissez StartingPositionTimestamp lorsque StartingPosition est défini sur AT_TIMESTAMP.

Type : double

Obligatoire : non

AWS CloudFormation compatibilité : cette propriété est transmise directement à la StartingPositionTimestamp propriété d'une AWS::Lambda::EventSourceMapping ressource.

Stream

L'Amazon Resource Name (ARN) du flux de données ou d'un consommateur de flux.

Type : chaîne

Obligatoire : oui

AWS CloudFormation compatibilité : cette propriété est transmise directement à la EventSourceArn propriété d'une AWS::Lambda::EventSourceMapping ressource.

Topics

Nom de la rubrique Kafka.

Type : liste

Obligatoire : oui

AWS CloudFormation compatibilité : cette propriété est transmise directement à la Topics propriété d'une AWS::Lambda::EventSourceMapping ressource.

Exemples

Configuration complète avec des rôles IAM

L'exemple suivant montre une configuration complète, y compris la configuration de rôle IAM requise pour utiliser Schema Registry :

Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry

ProvisionedPollerConfig exemple

ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20

Exemple Amazon MSK pour un cluster existant

Voici un exemple de type de source d'événement MSK pour un cluster Amazon MSK qui existe déjà dans un Compte AWS.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic

Exemple Amazon MSK pour un cluster déclaré dans le même modèle

Voici un exemple de type de source d'événement MSK pour un cluster Amazon MSK déclaré dans le même fichier modèle.

YAML

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic

Source d'événements MSK avec registre de schémas

Voici un exemple de type de source d'MSKévénement configuré avec un registre de schémas.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE

Source d'événements MSK avec registre de schémas Confluent

Voici un exemple de type de source d'MSKévénement configuré avec un registre de schémas Confluent.

Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE