

# 代码示例：对数据进行联接和关系化
<a name="aws-glue-programming-python-samples-legislators"></a>

本示例使用从 [http://everypolitician.org/](http://everypolitician.org/) 下载到 Amazon Simple Storage Service（Amazon S3）中的 `sample-dataset` 存储桶的数据集：`s3://awsglue-datasets/examples/us-legislators/all`。该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据（JSON 格式），并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

您可以在 GitHub 网站上 [AWS Glue 示例存储库](https://github.com/awslabs/aws-glue-samples)的 `join_and_relationalize.py` 文件中找到本示例的源代码。

利用此数据，本教程将介绍如何执行以下操作：
+ 使用 AWS Glue 爬网程序对存储在公有 Amazon S3 存储桶中的对象进行分类并将其架构保存到 AWS Glue Data Catalog。
+ 检查生成自爬网的表元数据和架构。
+ 编写 Python 提取、转移和加载（ETL）脚本，该脚本使用数据目录中的元数据执行以下操作：
  + 将不同源文件中的数据加入到单个数据表中 (即，使数据非规范化)。
  + 按议员类型筛选已加入到单独的表中的表。
  + 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

在 AWS 上运行时，调试 Python 或 PySpark 脚本的首选方法是[在 AWS Glue Studio 上使用笔记本](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)。

## 步骤 1：爬取 Amazon S3 存储桶中的数据
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. 登录 AWS 管理控制台 并打开位于 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 的 AWS Glue 控制台。

1. 按照 [配置爬网程序](define-crawler.md) 中的步骤操作，创建可将 `s3://awsglue-datasets/examples/us-legislators/all` 数据集网络爬取到 AWS Glue Data Catalog 中名为 `legislators` 的数据库的新爬网程序。示例数据已位于此公共 Amazon S3 存储桶中。

1. 运行新爬网程序，然后检查 `legislators` 数据库。

   该爬网程序将创建以下元数据表：
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   这是一个包含议员及其历史记录的半规范化表集合。

## 步骤 2：向开发终端节点笔记本中添加样板文件脚本
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库，然后设置单个 `GlueContext`：

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 步骤 3：检查数据目录中数据的架构
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

接下来，您可以轻松地从 AWS Glue Data Catalog 检查 DynamicFrame，并检查数据的架构。例如，要查看 `persons_json` 表的架构，请在您的笔记本中添加以下内容：

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

下面是来自打印调用的输出：

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

表中的每个人都是某个美国国会机构的成员。

要查看 `memberships_json` 表的架构，请键入以下内容：

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations` 是美国国会的党派和两大议院，即参议院和众议院。要查看 `organizations_json` 表的架构，请键入以下内容：

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## 步骤 4：筛选数据
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

接下来，仅保留您需要的字段，然后将 `id` 重命名为 `org_id`。该数据集足够小，方便您完整查看。

`toDF()` 将 `DynamicFrame` 转换为 Apache Spark `DataFrame`，因此您可以应用已存在于 Apache Spark SQL 中的转换：

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

下面显示了输出：

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

键入以下内容以查看显示在 `memberships` 中的 `organizations`：

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

下面显示了输出：

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## 步骤 5：整合内容
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

现在，使用 AWS Glue 联接这些关系表并创建一个包含议员 `memberships` 及其对应的 `organizations` 的完整历史记录表。

1. 首先，联接 `id` 和 `person_id` 上的 `persons` 和 `memberships`。

1. 接下来，将结果与 `org_id` 和 `organization_id` 上的 `orgs` 联接。

1. 然后，删除多余的字段 `person_id` 和 `org_id`。

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析（即 Parquet），该格式可让您在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 中运行 SQL。

以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取：

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

要将所有历史记录数据放入单个文件，您必须将其转换为数据帧，为其重新分区然后写入它：

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

或者，如果您希望按参议院和众议院分隔该数据：

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## 步骤 6：转换关系数据库的数据
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

利用 AWS Glue，您可以轻松将数据写入到关系数据库（如 Amazon Redshift），甚至对半结构化数据也是如此。它提供了一种转换 `relationalize`，这将展平 `DynamicFrames`，无论帧中的对象的复杂度可能如何。

使用本示例中的 `l_history` `DynamicFrame`，传入根表的名称 (`hist_root`) 和 `relationalize` 的临时工作路径。这将返回 `DynamicFrameCollection`。您随后可以列出该集合中 `DynamicFrames` 的名称：

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

以下是 `keys` 调用的输出：

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` 将该历史记录表拆分为 6 个新表：1 个包含 `DynamicFrame` 中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想，尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来，通过检查 `contact_details` 来查看分隔：

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

以下是 `show` 调用的输出：

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` 字段是原始 `DynamicFrame` 中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行，通过 `index` 编制索引。此处的 `id` 是具有键 `contact_details` 的 `hist_root` 表的外键：

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

下面是输出：

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

请注意，这些命令中先后使用了 `toDF()` 和 `where` 表示式来筛选您要查看的行。

因此，将 `hist_root` 表与辅助表联接可让您执行以下操作：
+ 将数据加载到不支持数组的数据库中。
+ 使用 SQL 查询数组中的每个单独的项目。

使用 AWS Glue 连接安全地存储和访问您的 Amazon Redshift 凭证。有关如何创建您自己的连接的信息，请参阅 [连接到数据](glue-connections.md)。

现在，您已准备就绪，可以通过一次遍历一个 `DynamicFrames` 来将数据写入到连接：

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

根据您的关系数据库类型，连接设置将有所不同：
+ 有关写入到 Amazon Redshift 的说明，请参阅 [Redshift 连接](aws-glue-programming-etl-connect-redshift-home.md)。
+ 有关其他数据库，请参阅 [AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。

## 结论
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

总的来说，AWS Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 GitHub 上 [`join_and_relationalize.py` 示例](https://github.com/awslabs/aws-glue-samples)的 Python 文件 AWS Glue 中找到完整的源到目标 ETL 脚本。