View a markdown version of this page

AWS Glue Concepts de diffusion - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

AWS Glue Concepts de diffusion

Les sections suivantes fournissent des informations sur les concepts du AWS Glue streaming.

Anatomie d'une tâche AWS Glue de streaming

AWS Glue les jobs de streaming fonctionnent selon le paradigme de streaming Spark et tirent parti du streaming structuré à partir du framework Spark. Les tâches de streaming interrogent en permanence la source de données de streaming, à un intervalle de temps spécifique, pour récupérer les enregistrements sous forme de microlots. Les sections suivantes examinent les différentes parties d'une tâche de AWS Glue streaming.

La capture d'écran montre un journal de Amazon CloudWatch surveillance, dans l' AWS Glue exemple fourni ci-dessus, qui examine le nombre d'exécuteurs nécessaires (ligne orange) et redimensionne les exécuteurs (ligne bleue) pour qu'ils correspondent à ce nombre sans nécessiter de réglage manuel.

forEachBatch

La forEachBatch méthode est le point d'entrée d'une tâche de AWS Glue streaming exécutée. AWS Glue les tâches de streaming utilisent la forEachBatch méthode d'interrogation des données qui fonctionne comme un itérateur qui reste actif pendant le cycle de vie de la tâche de streaming, interroge régulièrement la source de streaming à la recherche de nouvelles données et traite les données les plus récentes par microlots.

glueContext.forEachBatch( frame=dataFrame_AmazonKinesis_node1696872487972, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/", }, )

Configurez la propriété frame de forEachBatch pour spécifier une source de streaming. Dans cet exemple, le nœud source que vous avez créé dans le canevas vierge lors de la création de la tâche est renseigné avec la valeur par défaut DataFrame de la tâche. Définissez la propriété batch_function comme la function que vous décidez d’invoquer pour chaque opération de microlot. Vous devez définir une fonction pour gérer la transformation par lots des données entrantes.

Source

Dans la première étape de la processBatch fonction, le programme vérifie le nombre d'enregistrements de DataFrame ce que vous avez défini comme propriété de cadre deforEachBatch. Le programme ajoute un horodatage d'ingestion à un fichier non DataFrame vide. La clause data_frame.count()>0 détermine si le dernier microlot n’est pas vide et est prêt pour un traitement ultérieur.

def processBatch(data_frame, batchId): if data_frame.count() >0: AmazonKinesis_node1696872487972 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame", )

Mappage

La section suivante du programme consiste à appliquer le mappage. La Mapping.apply méthode sur une étincelle vous DataFrame permet de définir une règle de transformation autour des éléments de données. Vous pouvez généralement renommer, modifier le type de données ou appliquer une fonction personnalisée à la colonne de données source et les faire correspondre aux colonnes cibles.

#Script generated for node ChangeSchema ChangeSchema_node16986872679326 = ApplyMapping.apply( frame = AmazonKinesis_node1696872487972, mappings = [ ("eventtime", "string", "eventtime", "string"), ("manufacturer", "string", "manufacturer", "string"), ("minutevolume", "long", "minutevolume", "int"), ("o2stats", "long", "OxygenSaturation", "int"), ("pressurecontrol", "long", "pressurecontrol", "int"), ("serialnumber", "string", "serialnumber", "string"), ("ventilatorid", "long", "ventilatorid", "long"), ("ingest_year", "string", "ingest_year", "string"), ("ingest_month", "string", "ingest_month", "string"), ("ingest_day", "string", "ingest_day", "string"), ("ingest_hour", "string", "ingest_hour", "string"), ], transformation_ctx="ChangeSchema_node16986872679326", ) )

Sink

Dans cette section, le jeu de données entrant provenant de la source de streaming est stocké dans un emplacement cible. Dans cet exemple, nous écrivons les données dans un emplacement Amazon S3. Les détails de la propriété AmazonS3_node_path sont préremplis en fonction des paramètres que vous avez utilisés lors de la création de tâches à partir du canevas. Vous pouvez définir le updateBehavior en fonction de votre cas d’utilisation et décider soit de ne pas mettre à jour la table du catalogue de données, soit de créer le catalogue de données et de mettre à jour le schéma du catalogue de données lors des exécutions suivantes, soit de créer une table de catalogue sans mettre à jour la définition du schéma lors des exécutions suivantes.

La propriété partitionKeys définit l’option de partition de stockage. Le comportement par défaut consiste à partitionner les données conformément aux ingestion_time_columns mises à disposition dans la section source. La propriété compression vous permet de définir l’algorithme de compression à appliquer lors de l’écriture cible. Vous avez la possibilité de définir Snappy, LZO ou GZIP comme technique de compression. La propriété enableUpdateCatalog détermine si la table du catalogue AWS Glue doit être mise à jour. Les options disponibles pour cette propriété sont True ou False.

#Script generated for node Amazon S3 AmazonS3_node1696872743449 = glueContext.getSink( path = AmazonS3_node1696872743449_path, connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"], compression = "snappy", enableUpdateCatalog = True, transformation_ctx = "AmazonS3_node1696872743449", )

AWS Glue évier de catalogue

Cette section de la tâche contrôle le comportement de mise à jour des tables de AWS Glue catalogue. Définissez catalogDatabase et catalogTableName propriétés selon le nom de votre base de données AWS Glue Catalog et le nom de table associé à la AWS Glue tâche que vous concevez. Vous pouvez définir le format de fichier des données cibles via la propriété setFormat. Dans cet exemple, nous allons stocker les données au format parquet.

Une fois que vous avez configuré et exécuté la tâche de AWS Glue streaming en vous référant à ce didacticiel, les données de streaming produites sur le site Amazon S3 Amazon Kinesis Data Streams seront stockées sur le site Amazon S3 dans un format parquet avec une compression rapide. En cas d’exécution réussie de la tâche de streaming, vous pourrez interroger les données via Amazon Athena.

AmazonS3_node1696872743449 = setCatalogInfo( catalogDatabase = "demo", catalogTableName = "demo_stream_transform_result" ) AmazonS3_node1696872743449.setFormat("glueparquet") AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326") )