

# 코드 예: 데이터 조인 및 관계화
<a name="aws-glue-programming-python-samples-legislators"></a>

이 예제에서는 [http://everypolitician.org/](http://everypolitician.org/)에서 Amazon Simple Storage Service(Amazon S3)(`s3://awsglue-datasets/examples/us-legislators/all`)의 `sample-dataset` 버킷으로 다운로드한 데이터 집합을 사용합니다. 데이터 세트에는 미국 입법 기관과 미 상원 및 하원에서 확보한 의석에 대한 JSON 형식의 데이터가 포함되어 있으며, 이 자습서의 용도에 맞게 퍼블릭 Amazon S3 버킷에서 사용할 수 있도록 데이터 세트가 약간 수정되었습니다.

이 예제의 소스 코드는 GitHub 웹 사이트의 [AWS Glue 샘플 리포지토리](https://github.com/awslabs/aws-glue-samples)에 있는 `join_and_relationalize.py` 파일에서 찾을 수 있습니다.

이 데이터를 사용하여 이 자습서에서는 다음을 수행하는 방법을 보여줍니다.
+ AWS Glue 크롤러를 사용하여 퍼블릭 Amazon S3 버킷에 저장된 객체를 분류하고 AWS Glue Data Catalog에 객체의 스키마를 저장합니다.
+ 크롤의 결과인 테이블 메타데이터 및 스키마를 검토합니다.
+ Data Catalog의 메타데이터를 사용하여 Python 추출, 변환, 로드(ETL) 스크립트를 작성하고 다음을 실행합니다.
  + 하나의 데이터 테이블로 다른 원본 파일의 데이터를 모읍니다 (즉, 데이터를 비정규화합니다).
  + 제정자 유형에 따라 모아진 테이블을 개별 테이블로 필터링합니다.
  + 다음 분석을 위해 결과 데이터를 Apache Parquet 파일을 작성합니다.

AWS에서 실행하는 중에 Python 또는 PySpark 스크립트를 디버깅하는 기본적인 방법은 [AWS Glue Studio에서 노트북](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)을 사용하는 것입니다.

## 1단계: Amazon S3 버킷에서 데이터 크롤
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)에서 AWS Glue 콘솔을 엽니다.

1. [크롤러 구성](define-crawler.md)의 단계에 따라 AWS Glue Data Catalog에서 `legislators`라는 데이터베이스로 `s3://awsglue-datasets/examples/us-legislators/all` 데이터 집합을 크롤링할 수 있는 새 크롤러를 만듭니다. 이 예제 데이터는 이 퍼블릭 Amazon S3 버킷에 이미 존재합니다.

1. 새로운 크롤러를 실행한 다음 `legislators` 데이터베이스를 확인합니다.

   크롤러는 다음 메타데이터 테이블을 생성합니다.
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   제정자와 제정자 기록을 포함한 테이블 반정규화된 컬렉션입니다.

## 2단계: 개발 엔드포인트 노트북에 표준 문안 스크립트 추가
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

다음 표준 문안 스크립트를 개발 엔드포인트 노트북에 복사하고 필요한 AWS Glue 라이브러리로 들여와 단일 `GlueContext`를 설정합니다.

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 3단계: Data Catalog의 데이터에서 스키마 검토
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

다음으로 AWS Glue Data Catalog에서 검사 DynamicFrame을 쉽게 생성하고 데이터의 스키마를 검사할 수 있습니다. 예를 들어, `persons_json` 테이블의 스키마를 보고 노트북에 다음을 추가합니다.

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

프린트 호출에 따른 출력값입니다.

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

테이블에 있는 각자는 미국 의회 기관의 멤버입니다.

`memberships_json` 테이블의 스키마를 보고 다음을 입력합니다.

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations`는 정당이고 미국의 상원과 하원인 의회 양원입니다. `organizations_json` 테이블의 스키마를 보고 다음을 입력합니다.

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## 4단계: 데이터 필터링
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

원하는 필드만 남기고 `id`를 `org_id`로 이름을 바꿉니다. 데이터셋은 전체를 볼 수 있을만큼 작습니다.

`toDF()`는 `DynamicFrame`를 Apache Spark `DataFrame`로 변환하기 때문에 Apache Spark SQL에 이미 존재한 변환을 적용할 수 있습니다.

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

다음은 출력값을 보여줍니다.

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

`memberships`에 나타나는 `organizations`를 보려면 다음을 입력합니다.

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

다음은 출력값을 보여줍니다.

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## 5단계: 데이터 조인
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

이제 AWS Glue를 사용하여 이런 관계형 테이블을 조인하고 제정자 `memberships`의 전체 기록 테이블과 제정자에 대응하는 `organizations`을 생성합니다.

1. 우선, `persons`와 `memberships`를 `id`와 `person_id`로 조인합니다.

1. 다음으로 `org_id` 및 `organization_id`에서 `orgs`와 결과를 조인합니다.

1. 그 후, 중복 필드, `person_id` 및 `org_id`을 드롭합니다.

이 모든 작업을 하나 줄의 (확장된) 코드로 실행할 수 있습니다.

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

이제, 분석을 위한 최종 테이블이 있습니다. AWS Glue, Amazon Athena 또는 Amazon Redshift Spectrum에서 SQL을 실행할 수 있는 압축되고 효율적인 분석 형식(즉, Parquet)으로 작성할 수 있습니다.

다음 호출은 여러 파일의 테이블을 작성하여 나중에 분석을 할 때 빠른 병렬 판독을 지원합니다.

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

하나의 파일에 기록 데이터 모두를 넣으면 데이터 프레임으로 변환하고 다시 분할하며 작성할 수 있습니다.

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

상원과 하원으로 나누고자 한다면

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## 6단계: 관계형 데이터베이스를 위한 데이터 변환
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue를 사용하면 반정형 데이터가 포함된, Amazon Redshift와 같은 관계형 데이터베이스에도 데이터를 쉽게 쓸 수 있습니다. 프레임의 객체가 아무리 복잡하더라도 `DynamicFrames`를 평면화하는 변환 `relationalize`를 제공합니다.

이 예제에 `l_history` `DynamicFrame`를 사용하여 루트 테이블의 이름(`hist_root`)과 임시 작업 경로를 `relationalize`로 넘깁니다. 이렇게 하면 `DynamicFrameCollection`가 반환됩니다. 컬렉션의 `DynamicFrames`의 이름을 열거할 수 있습니다.

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

다음은 이 `keys` 호출의 출력입니다.

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize`는 `DynamicFrame`의 각 객체의 기록을 포함한 루트 테이블과 배열의 보조 테이블을 포함한 6 개의 새로운 테이블로 기록 테이블을 나눕니다. 관계형 데이터베이스에서 처리한 배열은 보통 차선책일 수 있습니다. 특히 배열이 커지면 그렇습니다. 배열을 다른 테이블로 나누면 쿼리 과정이 빨라집니다.

이제, `contact_details`를 보고 나누는 과정에 대해 알아봅니다.

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

다음은 이 `show` 호출의 출력입니다.

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` 필드는 기존 `DynamicFrame`의 구조 배열이었습니다. 이 배열에 있는 각 요소가 `index`에 따라 보조 테이블의 별도 행입니다. 여기서 `id`는 `contact_details` 키가 있는 `hist_root` 테이블의 외래 키입니다.

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

출력값은 다음과 같습니다.

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

이 명령어에는 `toDF()`와 `where` 순으로 선택하여 보고자 하는 열을 필터링합니다.

따라서 보조 테이블과 함께 `hist_root`을 조인하여 다음과 같은 작업을 수행할 수 있습니다.
+ 배열 지원 없이 데이터베이스로 데이터를 로드합니다.
+ SQL을 사용하여 배열에서 개별 항목을 쿼리합니다.

AWS Glue 연결을 사용하여 Amazon Redshift 자격 증명을 안전하게 저장하고 액세스합니다. 자체 연결을 만드는 방법에 대한 자세한 내용은 [데이터에 연결](glue-connections.md)을 참조하십시오.

이제 `DynamicFrames`를 차례로 순환하여 데이터를 연결에 쓸 준비가 되었습니다.

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

연결 설정은 관계형 데이터베이스 유형에 따라 달라집니다.
+ Amazon Redshift에 쓰는 방법에 대한 지침은 [Redshift 연결](aws-glue-programming-etl-connect-redshift-home.md) 섹션을 참조하세요.
+ 다른 데이터베이스의 경우 [AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션](aws-glue-programming-etl-connect.md) 섹션을 참조하세요.

## 결론
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

전체적으로 AWS Glue는 유연성이 뛰어납니다. 보통 며칠을 걸려 작성해야 가능한 이 작업을 단 몇 줄의 코드로 얻을 수 있습니다. GitHub의 [AWS Glue 샘플](https://github.com/awslabs/aws-glue-samples)에 있는 Python 파일 `join_and_relationalize.py`에서 전체 소스-대상 ETL 스크립트를 찾을 수 있습니다.