

# コード例: データの結合と関係付け
<a name="aws-glue-programming-python-samples-legislators"></a>

この例では、[http://everypolitician.org/](http://everypolitician.org/) から、Amazon Simple Storage Service (Amazon S3) の `sample-dataset` バケットにデータセットをダウンロードして使用します。`s3://awsglue-datasets/examples/us-legislators/all`このデータセットには、米国議会議員や米国下院および上院議員の議席に関する JSON 形式のデータが含まれており、このチュートリアルの目的のため少し変更され、パブリック Amazon S3 バケットで利用可能になりました。

この例のソースコードは、GitHub ウェブサイトの [`join_and_relationalize.py` Glue サンプルリポジトリ](https://github.com/awslabs/aws-glue-samples)の AWS Glue ファイルにあります。

このデータを使用して、このチュートリアルでは以下のことを実行する方法を示します。
+ AWS Glue クローラを使用して、パブリックな Amazon S3 バケットに保存されているオブジェクトを分類し、それらのスキーマを AWS Glue Data Catalog に保存します。
+ クロールの結果のテーブルのメタデータとスキーマを調べます。
+ Data Catalog のメタデータを使用して Python の抽出、転送、およびロード (ETL) スクリプトを記述し、以下の操作を行います。
  + 異なるソースファイル内のデータをまとめて単一のデータテーブルに結合します (つまり、データを非正規化します)。
  + 議員のタイプ別に、結合テーブルを別のテーブルにフィルタリングします。
  + 生成されたデータを後で分析するために Apache Parquet ファイルに分割して書き出します。

AWS で実行中に Python または PySpark スクリプトをデバッグするための推奨方法は、[AWS Glue Studio のノートブック](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)を使用することです。

## ステップ 1: Amazon S3 バケット内のデータをクロールする
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. AWS マネジメントコンソール にサインインし、AWS Glue コンソール ([https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)) を開きます。

1. [クローラーの設定](define-crawler.md) の手順に従って、`s3://awsglue-datasets/examples/us-legislators/all` データセットをクロールできる新しいクローラを、AWS Glue Data Catalog 内のデータベース `legislators` に作成します。サンプルデータは既に、このパブリックな Amazon S3 バケットに用意されています。

1. 新しいクローラを実行し、`legislators` データベースを確認します。

   クローラは、次のメタデータテーブルを作成します。
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   これは、議員とその履歴を含むテーブルの半正規化されたテーブルの集合です。

## ステップ 2: 開発エンドポイントノートブックに共通スクリプトを追加する
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

次の共通スクリプトを開発エンドポイントノートブックに貼り付けて、必要な AWS Glue ライブラリをインポートし、単一の `GlueContext` を設定します。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## ステップ 3: Data Catalog 内のデータでスキーマを確認する
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

次に、簡単な手順で AWS Glue Data Catalog から DynamicFrame を作成し、そのデータのスキーマを調べます。例えば、`persons_json` テーブルのスキーマを表示するには、ノートブックに以下を追加します。

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

プリントコールの出力を以下に示します。

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

テーブル内の各人は、米国議会のメンバーです。

`memberships_json` テーブルのスキーマを表示するには、次のように入力します。

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

出力は次のとおりです。

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations` は政党および上院と下院の 2 つの議会です。`organizations_json` テーブルのスキーマを表示するには、次のように入力します。

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

出力は次のとおりです。

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## ステップ 4: データをフィルタリングする
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

次に、必要なフィールドのみを保持し、`id` の名前を `org_id` に変更します。データセットは、小さいため全体を表示することができます。

`toDF()` は `DynamicFrame` を Apache Spark に変換するので、Apache Spark SQL に既に存在する `DataFrame` 変換を適用できます。

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

以下に出力を示します。

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

`memberships` に表示される `organizations` を表示するには、次のように入力します。

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

以下に出力を示します。

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## ステップ 5: すべてをまとめる
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

ここで AWS Glue を使用して、これらのリレーショナルテーブルを結合し、議員の `memberships` とそれに対応する `organizations` の 1 つの完全な履歴テーブルを作成します。

1. まず、`persons` および `memberships` を `id` および `person_id` と結合します。

1. 次に、結果を `orgs` と `org_id` および `organization_id` と結合します。

1. 次に、冗長なフィールド `person_id` および `org_id` を削除します。

これらの操作はすべて、1 行の (拡張された) コードで行うことができます。

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

出力は次のとおりです。

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

これで、分析に使用できる最終テーブルが作成されました。これは、分析のためのコンパクトで効率的な形式 (つまり Parquet) で記述することができ、AWS Glue、Amazon Athena、または Amazon Redshift Spectrum で SQL を実行できます。

次の呼び出しは、複数のファイルにわたってテーブルを書き込んで、後で解析するときに高速な並列読み込みをサポートします。

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

すべての履歴データを単一のファイルにまとめるには、データフレームに変換し、再パーティション化して書き出す必要があります。

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

または、上院と下院でそれを分けたい場合。

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## ステップ 6: リレーショナルデータベース向けにデータを変換する
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue では半構造化データでも Amazon Redshift のようなリレーショナルデータベースに簡単に書き込むことができるのです。これにより、フレーム内のオブジェクトがどれほど複雑であっても、`DynamicFrames` をフラット化する変換 `relationalize` が提供されます。

この例の `l_history` `DynamicFrame` を使用して、ルートテーブル (`hist_root`) の名前と一時的な作業パスを `relationalize` に渡します。これにより、`DynamicFrameCollection` が返されます。その後、そのコレクション内の `DynamicFrames` の名前を一覧表示できます。

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

`keys` 呼び出しの出力は次のとおりです。

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize` は、履歴テーブルを 6 つの新しいテーブルに分割します。`DynamicFrame` の各オブジェクトのレコードを含むルートテーブル、および配列の補助テーブルです。リレーショナルデータベースでの配列の処理は、特に配列が大きくなる場合に、最適ではないことがあります。配列を別のテーブルに分けることで、クエリの実行速度が大幅に向上します。

次に、`contact_details` を調べて分離を確認します。

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

`show` 呼び出しの出力は次のとおりです。

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` フィールドは、元の `DynamicFrame` の構造体の配列です。これらの配列の各要素は、`index` によってインデックス化された、補助テーブルの個別の行です。ここで `id` は、`contact_details` キーを使用する `hist_root` テーブルの外部キーです。

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

出力を次に示します。

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

これらのコマンドでは、`toDF()` および `where` 式を使用して、表示する行をフィルタリングすることに注意してください。

したがって、`hist_root` テーブルを補助テーブルと結合すると、次のことが可能になります。
+ 配列をサポートせずにデータベースにデータをロードします。
+ SQL を使用して配列内の各項目にクエリを実行します。

AWS Glue 接続を使用して、Amazon Redshift の認証情報を安全に保存してアクセスします。独自の接続の作成方法については、「[データへの接続](glue-connections.md)」を参照してください。

`DynamicFrames` を 1 つずつ切り替えて、接続にデータを書き込みできるようになりました。

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

接続設定は、リレーショナルデータベースのタイプによって異なります。
+ Amazon Redshift への書き込み手順については、「[Redshift 接続](aws-glue-programming-etl-connect-redshift-home.md)」を参照してください。
+ その他のデータベースについては、「[AWS Glue for Spark での ETL の接続タイプとオプション](aws-glue-programming-etl-connect.md)」を参照してください。

## 結論
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

全体として、AWS Glue は非常に柔軟です。通常は書くのに数日かかるところを、数行のコードで達成できます。ソースからターゲットへの ETL スクリプトの全体は、GitHub の [AWS Glue サンプル](https://github.com/awslabs/aws-glue-samples)内の Python ファイル `join_and_relationalize.py` にあります。