Como usar o Lambda para processar registros do Amazon Kinesis Data Streams
Você pode usar uma função do Lambda para processar registros em um fluxo de dados do Amazon Kinesis. É possível mapear uma função do Lambda em um consumidor de throughput compartilhado (iterador padrão) do Kinesis Data Stream ou em um consumidor de throughput dedicado com distribuição avançada. Para iteradores padrão, o Lambda sonda cada fragmento no stream do Kinesis em busca de registros usando o protocolo HTTP. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.
Para obter detalhes sobre transmissões de dados do Kinesis, consulteLer dados do Amazon Kinesis Data Streams.
nota
O Kinesis cobra por cada fragmento e, para distribuição avançada, dados lidos da transmissão. Para obter detalhes de preço, consulte Preço do Amazon Kinesis
Fluxos de sondagem e agrupamento em lotes
O Lambda lê registros do fluxo de dados e invoca sua função de maneira síncrona com um evento que contém registros de transmissão. O Lambda lê registros em lotes e invoca sua função para processar registros do lote. Cada lote contém registros de um único fragmento/fluxo de dados.
Sua função do Lambda é uma aplicação de consumidor para seu fluxo de dados. Ele processa um lote de registros por vez de cada estilhaço. É possível mapear uma função Lambda para um consumidor de throughput compartilhado (iterador padrão) ou para um consumidor de throughput dedicado com distribuição avançada.
-
Iterador padrão: o Lambda sonda cada fragmento em seu fluxo do Kinesis para identificar registros a uma taxa básica de uma vez por segundo. Quando houver mais registros disponíveis, o Lambda mantém lotes de processamento até que a função alcance o fluxo. O mapeamento da origem do evento compartilha a throughput de leitura com outros consumidores do fragmento.
-
Distribuição avançada: para minimizar a latência e maximizar o throughput da leitura, crie um consumidor de fluxo de dados com distribuição avançada. Os consumidores da distribuição avançada obtêm uma conexão dedicada para cada fragmento que não afeta outros aplicativos que fazem leitura do stream. Os consumidores de fluxos usam HTTP/2 para reduzir a latência, enviando os registros para o Lambda por meio de uma conexão de longa duração e compactando cabeçalhos de solicitação. Você pode criar um consumidor de fluxo com a API RegisterStreamConsumer do Kinesis.
aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
A seguinte saída deverá ser mostrada:
{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }
Para aumentar a velocidade com que sua função processa registros, adicione fragmentos ao fluxo de dados
Se a função não conseguir se expandir para lidar com o número total de lotes simultâneos, solicite um aumento de cota ou reserve simultaneidade para a função.
Por padrão, o Lambda invoca a função assim que os registros estão disponíveis. Se o lote que o Lambda lê da fonte de eventos tiver apenas um registro, o Lambda enviará apenas um registro à função. Para evitar a invocação da função com poucos registros, instrua a fonte de eventos para armazenar os registros em buffer por até cinco minutos, configurando uma janela de lotes. Antes de invocar a função, o Lambda continua a ler registros da fonte de eventos até coletar um lote inteiro, até que a janela de lote expire ou até que o lote atinja o limite de carga útil de 6 MB. Para obter mais informações, consulte Comportamento de lotes.
Atenção
Os mapeamentos da origem do evento do Lambda processam cada evento ao menos uma vez, podendo haver o processamento duplicado de registros. Para evitar possíveis problemas relacionados a eventos duplicados, é altamente recomendável tornar o código da função idempotente. Para saber mais, consulte Como tornar minha função do Lambda idempotente
O Lambda não espera a conclusão de nenhuma extensãoconfigurada para enviar o próximo lote para processamento. Em outras palavras, suas extensões podem continuar sendo executadas enquanto o Lambda processa o próximo lote de registros. Isso pode causar problemas de controle de utilização se você violar quaisquer configurações ou limites de simultaneidade de sua conta. Para detectar se esse é um problema em potencial, monitore suas funções e verifique se você está vendo métricas de simultaneidade mais altas do que o esperado para o seu mapeamento da origem do evento. Devido ao curto intervalo entre as invocações, o Lambda pode relatar brevemente um uso de simultaneidade maior do que o número de fragmentos. Isso pode ser verdadeiro até mesmo para funções do Lambda sem extensões.
Configure a opção ParallelizationFactor para processar um fragmento de um fluxo de dados do Kinesis com mais de uma invocação do Lambda simultaneamente. Você pode especificar o número de lotes simultâneos que o Lambda pesquisa de um fragmento por meio de um fator de paralelização de 1 (padrão) a 10. Por exemplo, quando você define ParallelizationFactor como 2, pode ter até 200 invocações simultâneas do Lambda para processar 100 fragmentos de dados do Kinesis (embora, na prática, você possa ver valores diferentes para a métrica ConcurrentExecutions). Isso ajuda a aumentar o throughput de processamento quando o volume de dados é volátil e o valor de IteratorAge é alto. Quando você aumenta o número de lotes simultâneos por fragmento, o Lambda ainda garante o processamento em ordem no nível de chave de partição.
Você também pode usar ParallelizationFactor com a agregação do Kinesis. O comportamento do mapeamento da origem do evento depende de você estar ou não usando fan-out avançado:
-
Sem fan-out avançado: todos os eventos dentro de um evento agregado devem ter a mesma chave de partição. A chave de partição também deve ser igual à do evento agregado. Se os eventos dentro do evento agregado tiverem chaves de partição diferentes, o Lambda não poderá garantir o processamento dos eventos na ordem, por chave de partição.
-
Com fan-out avançado: primeiro o Lambda decodifica o evento agregado em eventos individuais. O evento agregado pode ter uma chave de partição diferente da chave dos eventos que ele contém. Porém, os eventos que não correspondem à chave de partição são descartados e perdidos
. O Lambda não processa esses eventos e não os envia para um destino de falha configurado.
Evento de exemplo
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }