Traitement des enregistrements Amazon Kinesis Data Streams avec Lambda - AWS Lambda

Traitement des enregistrements Amazon Kinesis Data Streams avec Lambda

Pour traiter les enregistrements Amazon Kinesis Data Streams avec Lambda, créez un mappage des sources d’événements Lambda. Vous pouvez mapper une fonction Lambda à un flux de données itérateur standard ou à un consommateur de diffusion améliorée. Pour de plus amples informations, consultez Flux d’interrogation et de mise en lots.

Créer un mappage des sources d’événements Kinesis

Pour invoquer votre fonction Lambda avec des enregistrements provenant de votre flux de données, créez un mappage des sources d’événements. Vous pouvez créer plusieurs mappages de source d’événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments en provenance de plusieurs flux de données avec une seule fonction. Lorsque vous traitez des éléments à partir de plusieurs flux, chaque lot ne contient que des enregistrements provenant d’une seule partition ou d’un seul flux.

Vous pouvez configurer des mappages de sources d’événements pour traiter les enregistrements d’un flux dans un autre Compte AWS. Pour en savoir plus, veuillez consulter la section Création d’un mappage des sources d’événements entre comptes.

Avant de créer un mappage des sources d’événements, vous devez autoriser votre fonction Lambda à lire à partir d’un flux de données Kinesis. Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux de données Kinesis :

La politique AWSLambdaKinesisExecutionRole gérée par AWS inclut ces autorisations. Ajoutez cette politique gérée à votre fonction comme décrit dans la procédure suivante.

Note
  • Vous n’avez pas besoin de l’autorisation kinesis:ListStreams pour créer et gérer des mappages des sources d’événements pour Kinesis. Toutefois, si vous créez un mappage des sources d’événements dans la console et que vous ne disposez pas de cette autorisation, vous ne pourrez pas sélectionner un flux Kinesis dans une liste déroulante et la console affichera un message d’erreur. Pour créer le mappage des sources d’événements, vous devez saisir manuellement l’Amazon Resource Name (ARN) de votre flux.

  • Lambda effectue des appels d’API kinesis:GetRecords et kinesis:GetShardIterator lorsqu’il tente à nouveau des appels qui ont échoué.

AWS Management Console
Pour ajouter des autorisations Kinesis à votre fonction
  1. Ouvrez la page Fonctions de la console Lambda et choisissez votre fonction.

  2. Sous l’onglet Configuration, sélectionnez Autorisations.

  3. Dans le volet Rôle d’exécution, sous Nom du rôle, choisissez le lien vers le rôle d’exécution de votre fonction. Ce lien ouvre la page de ce rôle dans la console IAM.

  4. Dans le volet Politiques d’autorisations, choisissez Ajouter des autorisations, puis sélectionnez Attacher des politiques.

  5. Dans le champ de recherche, entrez AWSLambdaKinesisExecutionRole.

  6. Cochez la case en regard de la politique, puis choisissez Ajouter une autorisation.

AWS CLI
Pour ajouter des autorisations Kinesis à votre fonction
  • Exécutez la commande de la CLI suivante pour ajouter la politique AWSLambdaKinesisExecutionRole au rôle d’exécution de votre fonction :

    aws iam attach-role-policy \ --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
Pour ajouter des autorisations Kinesis à votre fonction
  • Dans la définition de votre fonction, ajoutez la propriété Policies comme indiqué dans l’exemple ci-dessous :

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole

Après avoir configuré les autorisations requises, créez le mappage des sources d’événements.

AWS Management Console
Pour créer le mappage des sources d’événements Kinesis
  1. Ouvrez la page Fonctions de la console Lambda et choisissez votre fonction.

  2. Dans le volet de Présentation de la fonction, choisissez Ajouter un déclencheur.

  3. Sous Configuration du déclencheur, pour la source, sélectionnez Kinesis.

  4. Sélectionnez le flux Kinesis pour lequel vous souhaitez créer le mappage des sources d’événements et, éventuellement, un consommateur de votre flux.

  5. (Facultatif) Modifiez la Taille de lot, la Position de départ et la Fenêtre de traitement par lot de votre mappage des sources d’événements.

  6. Choisissez Ajouter.

Lorsque vous créez votre mappage des sources d’événements à partir de la console, votre rôle IAM doit disposer des autorisations kinesis:ListStreams et kinesis:ListStreamConsumers.

AWS CLI
Pour créer le mappage des sources d’événements Kinesis
  • Exécutez la commande de la CLI suivante pour créer un mappage des sources d’événements Kinesis. Choisissez votre propre taille de lot et votre position de départ en fonction de votre cas d’utilisation.

    aws lambda create-event-source-mapping \ --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ --starting-position LATEST \ --batch-size 100

Pour spécifier une fenêtre de traitement par lot, ajoutez l’option --maximum-batching-window-in-seconds. Pour plus d’informations sur ce paramètre et d’autres, consultez create-event-source-mapping dans la Référence des commandes de l’AWS CLI.

AWS SAM
Pour créer le mappage des sources d’événements Kinesis
  • Dans la définition de votre fonction, ajoutez la propriété KinesisEvent comme indiqué dans l’exemple ci-dessous :

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs22.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Pour en savoir plus sur la création d’un mappage des sources d’événements pour Kinesis Data Streams dans AWS SAM, consultez Kinesis dans le guide du développeur AWS Serverless Application Model.

Position de départ du sondage et du stream

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

Création d’un mappage des sources d’événements entre comptes

Amazon Kinesis Data Streams prend en charge les politiques basées sur les ressources. De ce fait, vous pouvez traiter les données ingérées dans un flux dans un Compte AWS à l’aide d’une fonction Lambda dans un autre compte.

Pour créer un mappage des sources d’événements pour votre fonction Lambda à l’aide d’un flux Kinesis dans un autre Compte AWS, vous devez configurer le flux à l’aide d’une politique basée sur les ressources afin d’autoriser votre fonction Lambda à lire des éléments. Pour savoir comment configurer votre flux de manière à autoriser l’accès intercomptes, consultez la rubrique Sharing access with cross-account AWS Lambda functions du guide du développeur Amazon Kinesis Streams.

Une fois que vous avez configuré votre flux avec une politique basée sur les ressources qui donne à votre fonction Lambda les autorisations requises, créez le mappage des sources d’événements à l’aide de l’une des méthodes décrites dans la section précédente.

Si vous choisissez de créer votre mappage des sources d’événements à l’aide de la console Lambda, collez l’ARN de votre flux directement dans la zone de saisie. Si vous souhaitez spécifier un consommateur pour votre flux, le champ du flux est automatiquement rempli lorsque l’ARN du consommateur est collé.