Conceitos do AWS Glue Streaming
As seções apresentadas a seguir fornecem informações sobre os conceitos do AWS Glue Streaming.
Anatomia de um trabalho de streaming do AWS Glue
AWS GlueOs trabalhos de streaming do operam no paradigma de streaming do Spark e aproveitam o streaming estruturado da estrutura do Spark. Os trabalhos de streaming pesquisam constantemente a fonte de dados de streaming, em um intervalo de tempo específico, para buscar registros como micro lotes. As seções apresentadas a seguir examinam as diferentes partes de um trabalho de streaming do AWS Glue.
forEachBatch
O método forEachBatch é o ponto de entrada de uma execução de trabalho de streaming do AWS Glue. Os trabalhos de streaming do AWS Glue usam o método forEachBatch para pesquisar dados. Ele funciona como um iterador que permanece ativo durante o ciclo de vida do trabalho de streaming e pesquisa regularmente a fonte de streaming em busca de novos dados, além de processar os dados mais recentes em micro lotes.
glueContext.forEachBatch( frame=dataFrame_AmazonKinesis_node1696872487972, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, )
Configure a propriedade frame de forEachBatch para especificar uma fonte de streaming. Neste exemplo, o nó de origem que você criou na tela em branco durante a criação do trabalho é preenchido com o DataFrame padrão do trabalho. Configure a propriedade batch_function como a function que você decide invocar para cada operação de micro lote. Você deve definir uma função para tratar da transformação em lote relacionada aos dados de entrada.
Origem
Na primeira etapa da função processBatch, o programa verifica a contagem de registros do DataFrame que você definiu como a propriedade frame de forEachBatch. O programa acrescenta um carimbo de data/hora de ingestão a um DataFrame que não está vazio. A cláusula data_frame.count()>0 determina se o micro lote mais recente não está vazio e se está pronto para o processamento adicional.
def processBatch(data_frame, batchId): if data_frame.count() >0: AmazonKinesis_node1696872487972 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame", )
Mapeamento
A próxima seção do programa corresponde à aplicação do mapeamento. O método Mapping.apply em um DataFrame do Spark permite definir regras de transformação em torno de elementos de dados. Normalmente, é possível renomear, alterar o tipo de dados ou aplicar uma função personalizada na coluna de dados da fonte e mapeá-los para as colunas de destino.
#Script generated for node ChangeSchema ChangeSchema_node16986872679326 = ApplyMapping.apply( frame = AmazonKinesis_node1696872487972, mappings = [ ("eventtime", "string", "eventtime", "string"), ("manufacturer", "string", "manufacturer", "string"), ("minutevolume", "long", "minutevolume", "int"), ("o2stats", "long", "OxygenSaturation", "int"), ("pressurecontrol", "long", "pressurecontrol", "int"), ("serialnumber", "string", "serialnumber", "string"), ("ventilatorid", "long", "ventilatorid", "long"), ("ingest_year", "string", "ingest_year", "string"), ("ingest_month", "string", "ingest_month", "string"), ("ingest_day", "string", "ingest_day", "string"), ("ingest_hour", "string", "ingest_hour", "string"), ], transformation_ctx="ChangeSchema_node16986872679326", ) )
Sink
Nesta seção, o conjunto de dados de entrada da fonte de streaming é armazenado em um local de destino. Neste exemplo, gravaremos os dados em um local do Amazon S3. Os detalhes da propriedade AmazonS3_node_path são preenchidos previamente, conforme determinado pelas configurações usadas durante a criação do trabalho na tela em branco. É possível definir updateBehavior com base em seu caso de uso e decidir Não atualizar a tabela do catálogo de dados ou Criar um catálogo de dados e atualizar o esquema do catálogo de dados em execuções subsequentes, ou criar uma tabela do catálogo e não atualizar a definição do esquema em execuções subsequentes.
A propriedade partitionKeys define a opção de partição de armazenamento. O comportamento padrão é particionar os dados de acordo com o ingestion_time_columns que foi disponibilizado na seção da fonte. A propriedade compression permite definir o algoritmo de compactação a ser aplicado durante a gravação de destino. Você tem as opções Snappy, LZO ou GZIP para definir uma técnica de compactação. A propriedade enableUpdateCatalog controla se a tabela do catálogo do AWS Glue precisa ser atualizada. As opções disponíveis para essa propriedade são True ou False.
#Script generated for node Amazon S3 AmazonS3_node1696872743449 = glueContext.getSink( path = AmazonS3_node1696872743449_path, connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"], compression = "snappy", enableUpdateCatalog = True, transformation_ctx = "AmazonS3_node1696872743449", )
AWS GlueColetor do catálogo do
Esta seção do trabalho controla o comportamento de atualização da tabela do catálogo do AWS Glue. Defina as propriedades catalogDatabase and catalogTableName de acordo com o nome do banco de dados do Catálogo do AWS Glue e o nome da tabela associada ao trabalho do AWS Glue que você está projetando. É possível definir o formato do arquivo dos dados de destino por meio da propriedade setFormat. Neste exemplo, armazenaremos os dados no formato Parquet.
Depois de configurar e executar o trabalho de streaming do AWS Glue referente a este tutorial, os dados de streaming produzidos no Amazon Kinesis Data Streams serão armazenados no local do Amazon S3 em formato Parquet com compactação rápida. Em execuções com êxito do trabalho de streaming, será possível consultar os dados por meio do Amazon Athena.
AmazonS3_node1696872743449 = setCatalogInfo( catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result" ) AmazonS3_node1696872743449.setFormat("glueparquet") AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326") )