

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 程式碼範例：加入和關聯化資料
<a name="aws-glue-programming-python-samples-legislators"></a>

本範例使用從 [http://everypolitician.org/](http://everypolitician.org/) 下載至 Amazon Simple Storage Service (Amazon S3) 的 `sample-dataset` 儲存貯體的資料集：`s3://awsglue-datasets/examples/us-legislators/all`。資料集包含美國國會議員和他們在美國眾議院和參議院內座位的 JSON 格式的資料，已針對教學用途稍作修改，並透過公有 Amazon S3 儲存貯體提供。

您可在 GitHub 網站 [AWS Glue 儲存庫範例](https://github.com/awslabs/aws-glue-samples)的 `join_and_relationalize.py` 檔案中找到此範例的原始程式碼。

本指南將利用這項資料告訴您如何執行下列動作：
+ 使用AWS Glue爬蟲程式來分類存放在公有 Amazon S3 儲存貯體中的物件，並將其結構描述儲存至 AWS Glue Data Catalog。
+ 檢查爬蟲程式所產生的資料表中繼資料和結構描述。
+ 編寫 Python 擷取、傳輸和載入 (ETL) 指令碼，使用 Data Catalog 中的中繼資料執行下列動作：
  + 將來自不同原始檔案的資料加入到單一資料表 (也就是將資料去正規化)。
  + 篩選加入的資料表，依國會議員類型放入不同的資料表。
  + 將產生的資料寫入到單獨的 Apache Parquet 檔案中，供以後分析之用。

在 上執行時偵錯 Python 或 PySpark 指令碼的偏好方法是 AWS 在 [Glue Studio AWS 上使用筆記本](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)。

## 步驟 1：在 Amazon S3 儲存貯體中網路爬取資料
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 開啟 AWS Glue主控台。

1. 遵循 中的步驟[設定編目程式](define-crawler.md)，建立新的爬蟲程式，可將`s3://awsglue-datasets/examples/us-legislators/all`資料集編目到 Glue Data Catalog `legislators`中名為 AWS 的資料庫。範例資料已放在這個公有 Amazon S3 儲存貯體中。

1. 執行新的爬蟲程式，接著查看 `legislators` 資料庫。

   爬蟲程式建立下列中繼資料資料表：
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   這是一個包含國會議員和其歷史的半標準化資料表集合。

## 步驟 2：新增樣板指令碼至開發端點筆記本
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

將以下樣板指令碼貼至開發端點以匯入您需要的 AWS Glue 程式庫，並且設定單一的 `GlueContext`：

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 步驟 3：從資料目錄中的資料檢查結構描述
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

接下來，您可以從 Glue Data Catalog AWS 輕鬆建立檢查 DynamicFrame，並檢查資料的結構描述。例如，若要查看 `persons_json` 資料表的結構描述，請將下列內容新增到筆記本：

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

以下為列印呼叫的輸出：

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

資料表中的每個人都是美國國會的成員。

若要檢視 `memberships_json` 資料表的結構描述，請輸入如下命令：

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

其輸出如下：

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations` 為政黨和參議院與眾議院這兩個議會殿堂。若要檢視 `organizations_json` 資料表的結構描述，請輸入如下命令：

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

其輸出如下：

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## 步驟 4：篩選資料
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

接著，保留需要的欄位，將 `id` 重新命名為 `org_id`。資料集很小，可以從整體來檢視。

`toDF()` 會將 `DynamicFrame` 轉換為 Apache Spark `DataFrame`，因此您可套用 Apache Spark SQL 中現有的轉換：

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

以下將顯示輸出：

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

輸入以下以檢視出現在 `memberships` 的 `organizations`：

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

以下將顯示輸出：

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## 步驟 5：全部整合為一
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

現在，使用 AWS Glue 加入這些關聯式表格，並建立一份關於國會議員 `memberships` 及其對應的 `organizations` 的完整歷史記錄資料表。

1. 首先，加入 `persons` 和 `memberships` 的 `id` 和 `person_id`。

1. 接著，將結果加入到 `orgs` 的 `org_id` 和 `organization_id`。

1. 然後，捨棄冗餘欄位 `person_id` 和 `org_id`。

您可以在同一 (延伸) 指令碼行執行所有這些操作：

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

其輸出如下：

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

您現在取得最終的資料表，可用於分析。您可以用精巧、有效率的格式編寫，以用於分析 (也就是 Parquet)，在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 上執行 SQL。

以下呼叫將資料表編寫到多個檔案，在稍後執行分析時支援快速平行讀取：

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

若要將所有歷史記錄資料合併成單一檔案，您必須將其轉換為資料框架、分割並編寫：

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

或者，如果您希望將其分為參議院和眾議院：

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## 步驟 6：轉換關聯式資料庫的資料
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue 可讓您輕鬆地將資料寫入關聯式資料庫，例如 Amazon Redshift，即使是半結構化資料。其提供轉換 `relationalize`，可將 `DynamicFrames` 扁平化，無論框架中的物件多複雜。

使用本範例中的 `l_history` `DynamicFrame`，以根資料表 (`hist_root`) 的名稱和暫時任務路徑傳送至 `relationalize`。將傳回 `DynamicFrameCollection`。然後，您可以將 `DynamicFrames` 的名稱列在該集合中：

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

以下為 `keys` 呼叫的輸出：

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` 將歷史記錄資料表分成六個新資料表：根資料表包含在 `DynamicFrame` 中的各物件記錄，和陣列的輔助資料表。關聯式資料庫中的陣列處理通常為次最佳化，尤其在這些陣列變得龐大時。將陣列分成不同的資料表，可加快查詢速度。

接著，檢查 `contact_details` 以查看分隔：

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

以下為 `show` 呼叫的輸出：

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` 欄位為原始 `DynamicFrame` 中結構的陣列。這些陣列的每個元素都是輔助資料表中的單獨資料列，以 `index` 編製索引。此處的 `id` 是 `hist_root` 資料表的外部金鑰，金鑰為 `contact_details`：

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

以下為其輸出：

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

請注意，這些命令將使用 `toDF()`，然後是 `where` 表達式，來篩選您想要查看的資料列。

因此，加入 `hist_root` 資料表與輔助資料表可執行下列動作：
+ 無需陣列支援便能將資料載入到資料庫。
+ 使用 SQL 查詢陣列中的每個個別項目。

使用 AWS Glue 連線以安全存放和存取您的 Amazon Redshift 憑證。有關如何建立自己的連線的詳細資訊，請參閱 [連線至資料](glue-connections.md)。

您現已準備好透過一次循環一個 `DynamicFrames` 來將資料寫入連接器：

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

您的連接器設定將因您的關聯式資料庫類型而異：
+ 如需有關寫入 Amazon Redshift 的指示，請參閱[Redshift 連線](aws-glue-programming-etl-connect-redshift-home.md)。
+ 若為其他資料庫，請參閱 [AWS Glue for Spark 中 ETL 的連線類型和選項](aws-glue-programming-etl-connect.md)。

## 結論
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

整體而言，AWS Glue 極具彈性。它讓您用幾行程式碼便能完成通常要好幾天才能完成撰寫的任務。您可在 GitHub [AWS Glue 範例](https://github.com/awslabs/aws-glue-samples)的 Python 檔案 `join_and_relationalize.py` 中找到完整從來源到目標的 ETL 指令碼。