

# AWS Glue Python 代码示例
<a name="aws-glue-programming-python-samples"></a>
+ [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md)
+ [代码示例：使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备](aws-glue-programming-python-samples-medicaid.md)

# 代码示例：对数据进行联接和关系化
<a name="aws-glue-programming-python-samples-legislators"></a>

本示例使用从 [http://everypolitician.org/](http://everypolitician.org/) 下载到 Amazon Simple Storage Service（Amazon S3）中的 `sample-dataset` 存储桶的数据集：`s3://awsglue-datasets/examples/us-legislators/all`。该数据集包含有关美国议员及其在美国众议院和参议院中占有的席位的数据（JSON 格式），并且已针对本教程进行了轻微修改且在公共 Amazon S3 存储桶中提供。

您可以在 GitHub 网站上 [AWS Glue 示例存储库](https://github.com/awslabs/aws-glue-samples)的 `join_and_relationalize.py` 文件中找到本示例的源代码。

利用此数据，本教程将介绍如何执行以下操作：
+ 使用 AWS Glue 爬网程序对存储在公有 Amazon S3 存储桶中的对象进行分类并将其架构保存到 AWS Glue Data Catalog。
+ 检查生成自爬网的表元数据和架构。
+ 编写 Python 提取、转移和加载（ETL）脚本，该脚本使用数据目录中的元数据执行以下操作：
  + 将不同源文件中的数据加入到单个数据表中 (即，使数据非规范化)。
  + 按议员类型筛选已加入到单独的表中的表。
  + 将生成的数据写入单独的 Apache Parquet 文件以供日后分析。

在 AWS 上运行时，调试 Python 或 PySpark 脚本的首选方法是[在 AWS Glue Studio 上使用笔记本](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)。

## 步骤 1：爬取 Amazon S3 存储桶中的数据
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. 登录 AWS 管理控制台 并打开位于 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) 的 AWS Glue 控制台。

1. 按照 [配置爬网程序](define-crawler.md) 中的步骤操作，创建可将 `s3://awsglue-datasets/examples/us-legislators/all` 数据集网络爬取到 AWS Glue Data Catalog 中名为 `legislators` 的数据库的新爬网程序。示例数据已位于此公共 Amazon S3 存储桶中。

1. 运行新爬网程序，然后检查 `legislators` 数据库。

   该爬网程序将创建以下元数据表：
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   这是一个包含议员及其历史记录的半规范化表集合。

## 步骤 2：向开发终端节点笔记本中添加样板文件脚本
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库，然后设置单个 `GlueContext`：

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 步骤 3：检查数据目录中数据的架构
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

接下来，您可以轻松地从 AWS Glue Data Catalog 检查 DynamicFrame，并检查数据的架构。例如，要查看 `persons_json` 表的架构，请在您的笔记本中添加以下内容：

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

下面是来自打印调用的输出：

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

表中的每个人都是某个美国国会机构的成员。

要查看 `memberships_json` 表的架构，请键入以下内容：

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations` 是美国国会的党派和两大议院，即参议院和众议院。要查看 `organizations_json` 表的架构，请键入以下内容：

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## 步骤 4：筛选数据
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

接下来，仅保留您需要的字段，然后将 `id` 重命名为 `org_id`。该数据集足够小，方便您完整查看。

`toDF()` 将 `DynamicFrame` 转换为 Apache Spark `DataFrame`，因此您可以应用已存在于 Apache Spark SQL 中的转换：

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

下面显示了输出：

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

键入以下内容以查看显示在 `memberships` 中的 `organizations`：

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

下面显示了输出：

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## 步骤 5：整合内容
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

现在，使用 AWS Glue 联接这些关系表并创建一个包含议员 `memberships` 及其对应的 `organizations` 的完整历史记录表。

1. 首先，联接 `id` 和 `person_id` 上的 `persons` 和 `memberships`。

1. 接下来，将结果与 `org_id` 和 `organization_id` 上的 `orgs` 联接。

1. 然后，删除多余的字段 `person_id` 和 `org_id`。

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

您现在有了可用于分析的最终表。您可以采用紧凑且高效的格式写入该表以供分析（即 Parquet），该格式可让您在 AWS Glue、Amazon Athena 或 Amazon Redshift Spectrum 中运行 SQL。

以下调用将跨多个文件写入该表以在日后执行分析时支持快速并行读取：

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

要将所有历史记录数据放入单个文件，您必须将其转换为数据帧，为其重新分区然后写入它：

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

或者，如果您希望按参议院和众议院分隔该数据：

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## 步骤 6：转换关系数据库的数据
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

利用 AWS Glue，您可以轻松将数据写入到关系数据库（如 Amazon Redshift），甚至对半结构化数据也是如此。它提供了一种转换 `relationalize`，这将展平 `DynamicFrames`，无论帧中的对象的复杂度可能如何。

使用本示例中的 `l_history` `DynamicFrame`，传入根表的名称 (`hist_root`) 和 `relationalize` 的临时工作路径。这将返回 `DynamicFrameCollection`。您随后可以列出该集合中 `DynamicFrames` 的名称：

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

以下是 `keys` 调用的输出：

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` 将该历史记录表拆分为 6 个新表：1 个包含 `DynamicFrame` 中每个对象的记录的根表以及 5 个用于数组的辅助表。关系数据库中的数组处理通常不够理想，尤其是在这些数组变大时。将这些数组分成不同的表会使查询进展得快得多。

接下来，通过检查 `contact_details` 来查看分隔：

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

以下是 `show` 调用的输出：

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` 字段是原始 `DynamicFrame` 中的一个结构数组。这些数组中的每个元素都是辅助表中的单独的行，通过 `index` 编制索引。此处的 `id` 是具有键 `contact_details` 的 `hist_root` 表的外键：

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

下面是输出：

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

请注意，这些命令中先后使用了 `toDF()` 和 `where` 表示式来筛选您要查看的行。

因此，将 `hist_root` 表与辅助表联接可让您执行以下操作：
+ 将数据加载到不支持数组的数据库中。
+ 使用 SQL 查询数组中的每个单独的项目。

使用 AWS Glue 连接安全地存储和访问您的 Amazon Redshift 凭证。有关如何创建您自己的连接的信息，请参阅 [连接到数据](glue-connections.md)。

现在，您已准备就绪，可以通过一次遍历一个 `DynamicFrames` 来将数据写入到连接：

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

根据您的关系数据库类型，连接设置将有所不同：
+ 有关写入到 Amazon Redshift 的说明，请参阅 [Redshift 连接](aws-glue-programming-etl-connect-redshift-home.md)。
+ 有关其他数据库，请参阅 [AWS Glue for Spark 中适用于 ETL 的连接类型和选项](aws-glue-programming-etl-connect.md)。

## 结论
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

总的来说，AWS Glue 非常灵活。它让您只需几行代码即可完成通常需要编写几天才能实现的功能。您可以在 GitHub 上 [`join_and_relationalize.py` 示例](https://github.com/awslabs/aws-glue-samples)的 Python 文件 AWS Glue 中找到完整的源到目标 ETL 脚本。

# 代码示例：使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备
<a name="aws-glue-programming-python-samples-medicaid"></a>

此示例使用的数据集包括从两个 [Data.CMS.gov](https://data.cms.gov) 数据集下载的医疗保健提供商付款数据：“Inpatient Prospective Payment System Provider Summary for the Top 100 Diagnosis-Related Groups - FY2011”和“Inpatient Charge Data FY 2011”。下载该数据后，我们修改了数据集，以在文件末尾引入了几个错误的记录。这个修改过的文件位于 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` 上的公有 Amazon S3 存储桶。

您可以在 [AWS Glue 示例 GitHub](https://github.com/awslabs/aws-glue-samples) 存储库中的 `data_cleaning_and_lambda.py`文件中找到本示例的源代码。

在 AWS 上运行时，调试 Python 或 PySpark 脚本的首选方法是[在 AWS Glue Studio 上使用笔记本](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)。

## 步骤 1：爬取 Amazon S3 存储桶中的数据
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. 登录 AWS 管理控制台，然后打开 AWS Glue 控制台，网址为：[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)。

1. 按照 [配置爬网程序](define-crawler.md) 中描述的过程进行操作，创建新的爬网程序，它可以网络爬取 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` 文件，而且可以将生成的元数据放入 AWS Glue 数据目录中一个名为 `payments` 的数据库。

1. 运行新爬网程序，然后检查 `payments` 数据库。在读取该文件的开头以确定其格式和分隔符之后，您应该发现爬网程序已经在数据库中创建了一个名为 `medicare` 的元数据表。

   新 `medicare` 表的架构如下所示：

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## 步骤 2：向开发终端节点笔记本中添加样板文件脚本
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

将以下样板文件脚本粘贴到开发终端节点笔记本中以导入所需的 AWS Glue 库，然后设置单个 `GlueContext`：

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 步骤 3：比较不同的架构解析
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

接下来，您可以查看由 Apache Spark `DataFrame` 识别的架构是否与您的 AWS Glue 爬网程序记录的架构相同。运行此代码：

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

下面是来自 `printSchema` 调用的输出：

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

接下来，查看 AWS Glue `DynamicFrame` 生成的架构：

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

`printSchema` 中的输出如下所示：

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

`DynamicFrame` 生成一个架构，在其中 `provider id` 可以是 `long` 或 `string` 类型。`DataFrame` 架构将 `Provider Id` 列为 `string` 类型，数据目录将 `provider id` 列为 `bigint` 类型。

哪一个是正确的？ 文件末尾有两条记录 (共计 16 万条记录)，该列中有 `string` 值。这些是为说明问题而引入的错误记录。

为解决此类问题，AWS Glue `DynamicFrame` 引入了 *choice* 类型的概念。在这种情况下，`DynamicFrame` 显示 `long` 和 `string` 值都出现在该列中。AWS Glue 爬网程序错过了 `string` 值，因为它仅被视为数据的一个 2 MB 前缀。Apache Spark `DataFrame` 考虑了整个数据集，但它被迫将最一般的类型分配给该列，即 `string`。事实上，当存在复杂类型或不熟悉的变体时，Spark 通常会采用最一般的情况。

要查询 `provider id` 列，请先解析选择类型。您可以在 `DynamicFrame` 中使用 `resolveChoice` 转换方法，通过 `cast:long` 选项将这些 `string` 值转换为 `long` 值：

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

`printSchema` 输出现在是：

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

其中，该值是无法强制转换的 `string`，AWS Glue 插入了一个 `null`。

另一个选项是将选择类型转换为一个 `struct`，以保持两种类型的值。

接下来，查看异常的行：

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

您看到以下内容：

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

现在，删除两个格式错误的记录，如下所示：

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## 步骤 4：映射数据和使用 Apache Spark Lambda 函数
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue 尚未直接支持 Lambda 函数，也称为用户定义函数。但是，您始终可以将 `DynamicFrame` 和 Apache Spark `DataFrame` 相互转换，以便除 `DynamicFrames` 的特殊功能外，还能利用 Spark 功能。

接下来，将付款信息转化为数字，以便 Amazon Redshift 或 Amazon Athena 这样的分析引擎可以更快地进行数字处理：

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

`show` 调用中的输出如下所示：

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

这些仍然是数据中的字符串。我们可以使用强大的 `apply_mapping` 转换方法来删除、重命名、转换和嵌套数据，以便其他数据编程语言和系统可以轻松地访问它：

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

`printSchema` 输出如下所示：

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

将数据重新变成 Spark `DataFrame` 后，您可以显示它现在的外观：

```
medicare_nest_dyf.toDF().show()
```

您可以在一个 (扩展) 代码行中执行所有这些操作：

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## 步骤 5：将数据写入到 Apache Parquet
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

有了 AWS Glue，可以很容易地以诸如 Apache Parquet 这样的格式编写数据，以便关系数据库可以有效地使用它：

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```