Esempio di codice: unione e relazioni dei dati
In questo esempio viene usato un set di dati scaricato da http://everypolitician.org/sample-dataset in Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. Il set di dati contiene i dati in formato JSON sui legislatori degli Stati Uniti e sui seggi che hanno occupato nella Camera dei rappresentanti e al Senato che sono stati modificati leggermente e resi disponibili in un bucket Amazon S3 pubblico a fini di questo tutorial.
Puoi trovare il codice sorgente di questo esempio nel file join_and_relationalize.py presente nel repository degli esempi di AWS Glue
L'esercitazione illustra con questi dati come:
Usa un crawler AWS Glue per classificare gli oggetti archiviati in un bucket Amazon S3 pubblico e salvare i relativi schemi nel catalogo dati di AWS Glue.
Esaminare gli schemi e i metadati della tabella restituiti dal crawling.
-
Scrivere uno script di estrazione, trasferimento e caricamento (ETL) Python che usa i metadati del catalogo dati per:
Unire insieme i dati dei diversi file di origine in un'unica tabella di dati (ovvero denormalizzare i dati).
Filtrare la tabella unita in tabelle separate in base al tipo di legislatore.
Scrivere i dati risultanti per separare i file di Apache Parquet per analisi successive.
Il modo più indicato per eseguire il debug degli script Python o PySpark durante l'esecuzione su AWS consiste nell'utilizzare Notebook su AWS Glue Studio.
Fase 1: esecuzione del crawling sui dati nel bucket Amazon S3
-
Accedi alla AWS Management Console e apri la console AWS Glue all'indirizzo https://console.aws.amazon.com/glue/
. -
Seguendo le fasi in Configurazione di un crawler, crea un nuovo crawler che esegue il crawling del set di dati
s3://awsglue-datasets/examples/us-legislators/allin un database denominatolegislatorsnel in AWS Glue Data Catalog. I dati di esempio sono già in questo bucket Amazon S3 pubblico. -
Esegui il nuovo crawler e controlla il database
legislators.Il crawler crea le seguenti tabelle di metadati:
-
persons_json -
memberships_json -
organizations_json -
events_json -
areas_json -
countries_r_json
Si tratta di una raccolta di tabelle semi-normalizzata contenenti i legislatori e le relative storie.
-
Fase 2: aggiunta dello script Boilerplate al notebook degli endpoint di sviluppo
Incolla lo script boilerplate seguente nel notebook degli endpoint di sviluppo per importare le librerie AWS Glue necessarie e configurare un singolo oggetto GlueContext:
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job glueContext = GlueContext(SparkContext.getOrCreate())
Fase 3: esame degli schemi dai dati nel catalogo dati
Successivamente, è possibile creare facilmente un DynamicFrame da AWS Glue Data Catalog ed esaminare gli schemi dei dati. Ad esempio, per visualizzare lo schema della tabella persons_json, aggiungi quanto segue nel notebook:
persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json") print "Count: ", persons.count() persons.printSchema()
Ecco l'output dalle chiamate di stampa:
Count: 1961
root
|-- family_name: string
|-- name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- sort_name: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- death_date: string
Ogni persona nella tabella è membro di alcuni enti del Congresso degli Stati Uniti.
Per visualizzare lo schema della tabella memberships_json, digita quando segue:
memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json") print "Count: ", memberships.count() memberships.printSchema()
L'output è il seguente:
Count: 10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
Gli elementi organizations sono i partiti e le due camere del Congresso, il Senato e la Camera dei rappresentanti. Per visualizzare lo schema della tabella organizations_json, digita quando segue:
orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json") print "Count: ", orgs.count() orgs.printSchema()
L'output è il seguente:
Count: 13
root
|-- classification: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- image: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- other_names: array
| |-- element: struct
| | |-- lang: string
| | |-- note: string
| | |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
Fase 4: filtrare i dati
A questo punto mantieni solo i campi che desideri e rinomina id in org_id. Il set di dati è sufficientemente piccolo da poterlo visualizzare tutto insieme.
L'elemento toDF() converte un oggetto DynamicFrame in un elemento DataFrame di Apache Spark in modo da poter applicare le trasformazioni già esistenti in Apache Spark SQL:
orgs = orgs.drop_fields(['other_names', 'identifiers']).rename_field( 'id', 'org_id').rename_field( 'name', 'org_name') orgs.toDF().show()
Di seguito è riportato l'output:
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification| org_id| org_name| links|seats| type| image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
| party| party/al| AL| null| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party|party/democrat-li...| Democrat-Liberal|[[website,http://...| null| null| null|
| legislature|d56acebe-8fdc-47b...|House of Represen...| null| 435|lower house| null|
| party| party/independent| Independent| null| null| null| null|
| party|party/new_progres...| New Progressive|[[website,http://...| null| null|https://upload.wi...|
| party|party/popular_dem...| Popular Democrat|[[website,http://...| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| party|party/republican-...|Republican-Conser...|[[website,http://...| null| null| null|
| party| party/democrat| Democrat|[[website,http://...| null| null|https://upload.wi...|
| party| party/independent| Independent| null| null| null| null|
| party| party/republican| Republican|[[website,http://...| null| null|https://upload.wi...|
| legislature|8fa6c3d2-71dc-478...| Senate| null| 100|upper house| null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
Digita quanto segue per visualizzare gli elementi organizations presenti nell'oggetto memberships:
memberships.select_fields(['organization_id']).toDF().distinct().show()
Di seguito è riportato l'output:
+--------------------+
| organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
Fase 5: unione dei dati
Usa quindi AWS Glue per unire queste tabelle relazionali e creare una tabella con la cronologia completa per l'oggetto memberships dei legislatori e i relativi elementi organizations.
-
In primo luogo, unisci
personsemembershipsinideperson_id. -
Quindi, unisci il risultato a
orgsinorg_ideorganization_id. -
Quindi, rilascia i campi ridondanti,
person_ideorg_id.
Puoi eseguire tutte queste operazioni in una sola riga di codice estesa:
l_history = Join.apply(orgs, Join.apply(persons, memberships, 'id', 'person_id'), 'org_id', 'organization_id').drop_fields(['person_id', 'org_id']) print "Count: ", l_history.count() l_history.printSchema()
L'output è il seguente:
Count: 10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
| |-- element: struct
| | |-- note: string
| | |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
| |-- element: struct
| | |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
| |-- element: struct
| | |-- note: string
| | |-- name: string
| | |-- lang: string
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
| |-- element: struct
| | |-- scheme: string
| | |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
Ora hai la tabella finale che puoi utilizzare per l'analisi. Puoi scrivere in un formato compatto ed efficiente per le analisi dei dati, vale a dire Parquet, che puoi usare per eseguire SQL in AWS Glue, Amazon Athena o Amazon Redshift Spectrum.
La seguente chiamata scrive la tabella in più file per supportare le operazioni di lettura parallela veloce nella fase di analisi successiva:
glueContext.write_dynamic_frame.from_options(frame = l_history, connection_type = "s3", connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"}, format = "parquet")
Per inserire tutti i dati cronologici in un singolo file, devi convertirli in un frame di dati, suddividerlo in partizioni e scriverlo:
s_history = l_history.toDF().repartition(1) s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
In alternativa, se vuoi separarlo dal Senato e dalla Camera:
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part', partitionBy=['org_name'])
Fase 6: trasformare i dati per i database relazionali
AWS Glue permette di scrivere in modo semplice i dati, anche semistrutturati, in database relazionali come Amazon Redshift,. Offre una trasformazione di tipo relationalize, che appiattisce gli elementi DynamicFrames indipendentemente dalla complessità degli oggetti in frame.
Utilizzando l_history DynamicFrame in questo esempio, passi il nome di una tabella radice (hist_root) e un percorso temporaneo a relationalize. Viene restituito un elemento DynamicFrameCollection. Puoi quindi elencare i nomi degli elementi DynamicFrames nella raccolta:
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/") dfc.keys()
Di seguito è riportato l'output della chiamata keys:
[u'hist_root', u'hist_root_contact_details', u'hist_root_links', u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
Relationalize suddivide la tabella della cronologia in sei nuove tabelle: una tabella radice che contiene un record per ogni oggetto dell'elemento DynamicFrame e le tabelle ausiliarie per le matrici. La gestione delle matrici nei database relazionali spesso non è ottimali, soprattutto quando le matrici diventano grandi. Separando le matrici in tabelle diverse velocizza l'esecuzione delle query.
A questo punto, controlla la separazione esaminando contact_details:
l_history.select_fields('contact_details').printSchema() dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
Di seguito è riportato l'output della chiamata show:
root
|-- contact_details: array
| |-- element: struct
| | |-- type: string
| | |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10| 0| fax| |
| 10| 1| | 202-225-1314|
| 10| 2| phone| |
| 10| 3| | 202-225-3772|
| 10| 4| twitter| |
| 10| 5| | MikeRossUpdates|
| 75| 0| fax| |
| 75| 1| | 202-225-7856|
| 75| 2| phone| |
| 75| 3| | 202-225-2711|
| 75| 4| twitter| |
| 75| 5| | SenCapito|
+---+-----+------------------------+-------------------------+
Il campo contact_details era una matrice di strutture nell'elemento DynamicFrame originale. Ogni elemento di tali matrici è una riga separata nella tabella ausiliaria, indicizzata da index. L'id qui è una chiave esterna nella tabella hist_root con la chiave contact_details:
dfc.select('hist_root').toDF().where( "contact_details = 10 or contact_details = 75").select( ['id', 'given_name', 'family_name', 'contact_details']).show()
Di seguito è riportato l'output:
+--------------------+----------+-----------+---------------+
| id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...| Mike| Ross| 10|
|e3c60f34-7d1b-4c0...| Shelley| Capito| 75|
+--------------------+----------+-----------+---------------+
In questi comandi vengono utilizzati toDF() e un'espressione where per filtrare le righe che vuoi vedere.
Quindi, unendo la tabella hist_root con le tabelle ausiliarie ti consente di effettuare le operazioni descritte di seguito.
Caricare i dati nei database senza il supporto di matrici.
Eseguire la query di ogni singolo elemento in una matrice con SQL.
Archivia e accedi in modo sicuro alle tue credenziali Amazon Redshift con una connessione AWS Glue. Per informazioni su come creare la tua connessione, vedi Connessione ai dati.
Siete ora pronti a scrivere i vostri dati su una connessione, scorrendo uno alla volta i DynamicFrames:
for df_name in dfc.keys(): m_df = dfc.select(df_name) print "Writing to table: ", df_name glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df,connection settings here)
Le impostazioni di connessione variano in base al tipo di database relazionale:
-
Per istruzioni su come scrivere su Amazon Redshift, consultare Connessioni Redshift.
-
Per altri database, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
Conclusioni
Complessivamente, AWS Glue è molto flessibile. Con poche righe di codice, ti consente di eseguire ciò che normalmente ti potrebbe richiedere giorni di scrittura. Puoi trovare gli interi script ETL origine-destinazione nel file Python join_and_relationalize.py negli esempi di AWS Glue