Uso de Lambda para procesar los registros de Amazon Kinesis Data Streams
Puede utilizar una función de Lambda para procesar los registros de un flujo de datos de Amazon Kinesis. Puede asignar una función de Lambda a un consumidor de rendimiento compartido (iterador estándar) de Kinesis Data Streams o a un consumidor de rendimiento dedicado con distribución ramificada mejorada. Para iteradores estándar, Lambda sondea cada partición de la secuencia de Kinesis en busca de registros utilizando el protocolo HTTP. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.
Para obtener información detallada sobre los flujos de datos de Kinesis, consulte Lectura de datos de Amazon Kinesis Data Streams.
nota
Kinesis cobra por cada partición y, para una distribución ramificada mejorada, por los datos leídos desde el flujo. Para obtener más información sobre precios, consulte Precios de Amazon Kinesis
Flujos de sondeo y procesamiento por lotes
Lambda lee los registros del flujo de datos e invoca la función sincrónicamente con un evento que contiene registros de flujo. Lambda lee los registros por lotes e invoca la función para procesar los registros del lote. Cada lote contiene registros de una única partición o flujo de datos.
Su función de Lambda es una aplicación consumidora para su flujo de datos. Procesa un lote de registros a la vez desde cada partición. Puede asignar una función Lambda a un consumidor de rendimiento compartido (iterador estándar) o a un consumidor de rendimiento dedicado con distribución ramificada mejorada.
-
Iterador estándar: Lambda sondea cada partición del flujo de Kinesis y busca registros a una velocidad base de una vez por segundo. Cuando hay más registros disponibles, Lambda sigue procesando lotes hasta que la función se pone al día con el flujo. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.
-
Distribución ramificada mejorada: para minimizar la latencia y maximizar el rendimiento de lectura, cree un consumidor de flujo de datos con distribución ramificada mejorada. Los consumidores con distribución ramificada mejorada obtienen una conexión dedicada a cada partición que no afecta a las demás aplicaciones que leen el flujo. Los consumidores de flujos utilizan HTTP/2 para reducir la latencia enviando los registros a Lambda a través de una conexión de larga duración y mediante la compresión de los encabezados de las solicitudes. Es posible crear un consumidor de flujos con la API RegisterStreamConsumer de Kinesis.
aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
Debería ver los siguientes datos de salida:
{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }
Para incrementar la velocidad con la que la función procesa los registros, agregue particiones al flujo de datos
Si la función no puede aumentar para administrar el número total de lotes simultáneos, solicite un aumento de cuota o reserve la simultaneidad para la función.
De forma predeterminada, Lambda invoca su función tan pronto como los registros estén disponibles. Si el lote que Lambda lee del origen de eventos solo tiene un registro, Lambda envía solo un registro a la función. Para evitar invocar la función con un número de registros pequeño, puede indicar al origen de eventos que almacene en búfer registros durante hasta 5 minutos configurando un plazo de procesamiento por lotes. Antes de invocar la función, Lambda continúa leyendo los registros del origen de eventos hasta que haya recopilado un lote completo, venza el plazo de procesamiento por lotes o el lote alcance el límite de carga de 6 MB. Para obtener más información, consulte Comportamiento de procesamiento por lotes.
aviso
Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez, y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte ¿Cómo puedo hacer que mi función de Lambda sea idempotente?
Lambda no espera a que se completen las extensiones configuradas antes de enviar el siguiente lote para su procesamiento. En otras palabras, las extensiones pueden seguir ejecutándose mientras Lambda procesa el siguiente lote de registros. Esto puede provocar problemas de limitación si infringe alguno de los ajustes o límites de simultaneidad de la cuenta. Para detectar si se trata de un posible problema, supervise sus funciones y compruebe si ve métricas de simultaneidad más elevadas de lo esperado para la asignación de orígenes de eventos. Debido a los tiempos cortos entre invocaciones, Lambda puede informar brevemente un uso de simultaneidad superior al número de particiones. Esto puede ser cierto incluso para las funciones de Lambda sin extensiones.
Defina la configuración ParallelizationFactor para procesar una partición de un flujo de datos de Kinesis con más de una invocación de Lambda simultáneamente. Puede especificar el número de lotes simultáneos que Lambda sondea desde una partición a través de un factor de paralelización de 1 (predeterminado) a 10. Por ejemplo, cuando establece ParallelizationFactor en 2, puede tener un máximo de 200 invocaciones de Lambda simultáneas para procesar 100 particiones de datos de Kinesis (aunque, en la práctica, es posible que observe diferentes valores para la métrica ConcurrentExecutions). Esto ayuda a escalar verticalmente el rendimiento de procesamiento cuando el volumen de datos es volátil y el IteratorAge es alto. Cuando aumenta el número de lotes simultáneos por partición, Lambda sigue garantizando el procesamiento en orden a nivel de clave de partición.
También puede utilizar ParallelizationFactor con la agregación de Kinesis. El comportamiento de la asignación de orígenes de eventos depende de si utiliza la distribución ramificada mejorada:
-
Sin distribución ramificada mejorada: todos los eventos incluidos en un evento agregado deben tener la misma clave de partición. La clave de partición también debe coincidir con la del evento agregado. Si los eventos incluidos en el evento agregado tienen claves de partición diferentes, Lambda no puede garantizar el procesamiento de los eventos ordenados por clave de partición.
-
Con distribución ramificada mejorada: en primer lugar, Lambda decodifica el evento agregado en sus eventos individuales. El evento agregado puede tener una clave de partición diferente a la de los eventos que contiene. Sin embargo, los eventos que no se corresponden con la clave de partición se eliminan y se pierden
. Lambda no procesa estos eventos ni los envía a un destino de error configurado.
Evento de ejemplo
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }