Utilisation de Lambda pour traiter les enregistrements d'Amazon Kinesis Data Streams - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Lambda pour traiter les enregistrements d'Amazon Kinesis Data Streams

Vous pouvez utiliser une fonction Lambda pour traiter des enregistrements dans un flux de données Amazon Kinesis. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) Kinesis Data Streams ou à un consommateur à débit dédié avec diffusion améliorée. Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis pour des enregistrements qui utilisent le protocole HTTP. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

Pour plus d’informations sur les flux de données Kinesis, consultez Lecture de données à partir d’Amazon Kinesis Data Streams.

Note

Kinesis facture chaque partition et, pour les diffusions améliorées, les données lues à partir du flux. Pour obtenir des informations de tarification, consultez Tarification Amazon Kinesis.

Flux d’interrogation et de mise en lots

Lambda lit les enregistrements du flux de données et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les registres par lots et invoque votre fonction pour les traiter. Chaque lot contient des enregistrements provenant d'un seul shard/data flux.

Votre fonction Lambda est une application consommateur pour votre flux de données. Elle traite un lot d’enregistrements à la fois à partir de chaque partition. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) ou à un consommateur à débit dédié avec diffusion améliorée.

  • Itérateur standard : Lambda interroge chaque partition de votre flux Kinesis afin d’obtenir des enregistrements à une fréquence de base d’une fois par seconde. Lorsque d’autres enregistrements sont disponibles, Lambda continue de traiter les lots jusqu’à ce que la fonction rattrape le flux. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

  • Diffusion améliorée : pour réduire la latence et optimiser le débit en lecture, créez un consommateur de flux de données avec diffusion améliorée. Les consommateurs avec diffusion améliorée obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Les consommateurs de flux utilisent HTTP/2 afin de réduire la latence en transférant les enregistrements à Lambda via une connexion longue durée et en comprimant les en-têtes de requête. Vous pouvez créer un consommateur de flux à l'aide de l'API Kinesis RegisterStreamConsumer.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Vous devriez voir la sortie suivante :

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

Pour augmenter la vitesse à laquelle votre fonction traite les enregistrements, ajoutez des partitions à votre flux de données. Lambda traite les enregistrements de chaque partition dans l’ordre. Il arrête de traiter les enregistrements supplémentaires d’une partition si votre fonction renvoie une erreur. Plus de partitions signifient plus de lots traités en une seule fois, ce qui réduit l’impact des erreurs sur la simultanéité.

Si votre fonction ne peut pas augmenter sa capacité pour traiter le nombre total de lots simultanés, demandez une augmentation de quota ou réservez de la simultanéité pour votre fonction.

Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour de plus amples informations, veuillez consulter Comportement de traitement par lots.

Avertissement

Les mappages des sources d’événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Lambda envoi le prochain lot pour traitement sans attendre que les extensions configurées soient terminées. En d’autres termes, vos extensions peuvent continuer à s’exécuter pendant que Lambda traite le prochain lot d’enregistrements. Cela peut causer des problèmes de limitation si vous enfreignez l’un des paramètres ou l’une des limites de simultanéité de votre compte. Pour détecter s’il s’agit d’un problème éventuel, surveillez vos fonctions et vérifiez si vous observez des métriques de simultanéité plus élevées que prévu pour votre mappage des sources d’événements. En raison de la brièveté des intervalles entre les invocations, Lambda peut brièvement signaler une utilisation simultanée supérieure au nombre de partitions. Cela peut être vrai même pour les fonctions Lambda sans extensions.

Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor sur 2, vous pouvez avoir jusqu’à 200 invocations Lambda simultanés pour traiter 100 partitions de données Kinesis (bien que, dans la réalité, la métrique ConcurrentExecutions puisse indiquer une valeur différente). Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge est élevée. Lorsque vous augmentez le nombre de lots simultanés par partition, Lambda assure toujours un traitement dans l’ordre au niveau clé de partition.

Vous pouvez également utiliser ParallelizationFactor avec l’agrégation Kinesis. Le comportement du mappage des sources d’événements varie selon que vous utilisez ou non une diffusion améliorée :

  • Sans diffusion améliorée : tous les événements d’un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l’événement agrégé. Si les événements contenus dans l’événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement dans l’ordre des événements par clé de partition.

  • Avec une diffusion améliorée : Lambda décode d’abord l’événement agrégé en événements individuels. L’événement agrégé peut avoir une clé de partition différente de celle des événements qu’il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus. Lambda ne traite pas ces événements et ne les envoie pas vers une destination en cas de panne configurée.

Exemple d’évènement

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }