

# AWS Glue Python 코드 샘플
<a name="aws-glue-programming-python-samples"></a>
+ [코드 예: 데이터 조인 및 관계화](aws-glue-programming-python-samples-legislators.md)
+ [코드 예: ResolveChoice, Lambda, 및 ApplyMapping을 사용한 데이터 준비](aws-glue-programming-python-samples-medicaid.md)

# 코드 예: 데이터 조인 및 관계화
<a name="aws-glue-programming-python-samples-legislators"></a>

이 예제에서는 [http://everypolitician.org/](http://everypolitician.org/)에서 Amazon Simple Storage Service(Amazon S3)(`s3://awsglue-datasets/examples/us-legislators/all`)의 `sample-dataset` 버킷으로 다운로드한 데이터 집합을 사용합니다. 데이터 세트에는 미국 입법 기관과 미 상원 및 하원에서 확보한 의석에 대한 JSON 형식의 데이터가 포함되어 있으며, 이 자습서의 용도에 맞게 퍼블릭 Amazon S3 버킷에서 사용할 수 있도록 데이터 세트가 약간 수정되었습니다.

이 예제의 소스 코드는 GitHub 웹 사이트의 [AWS Glue 샘플 리포지토리](https://github.com/awslabs/aws-glue-samples)에 있는 `join_and_relationalize.py` 파일에서 찾을 수 있습니다.

이 데이터를 사용하여 이 자습서에서는 다음을 수행하는 방법을 보여줍니다.
+ AWS Glue 크롤러를 사용하여 퍼블릭 Amazon S3 버킷에 저장된 객체를 분류하고 AWS Glue Data Catalog에 객체의 스키마를 저장합니다.
+ 크롤의 결과인 테이블 메타데이터 및 스키마를 검토합니다.
+ Data Catalog의 메타데이터를 사용하여 Python 추출, 변환, 로드(ETL) 스크립트를 작성하고 다음을 실행합니다.
  + 하나의 데이터 테이블로 다른 원본 파일의 데이터를 모읍니다 (즉, 데이터를 비정규화합니다).
  + 제정자 유형에 따라 모아진 테이블을 개별 테이블로 필터링합니다.
  + 다음 분석을 위해 결과 데이터를 Apache Parquet 파일을 작성합니다.

AWS에서 실행하는 중에 Python 또는 PySpark 스크립트를 디버깅하는 기본적인 방법은 [AWS Glue Studio에서 노트북](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)을 사용하는 것입니다.

## 1단계: Amazon S3 버킷에서 데이터 크롤
<a name="aws-glue-programming-python-samples-legislators-crawling"></a>

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)에서 AWS Glue 콘솔을 엽니다.

1. [크롤러 구성](define-crawler.md)의 단계에 따라 AWS Glue Data Catalog에서 `legislators`라는 데이터베이스로 `s3://awsglue-datasets/examples/us-legislators/all` 데이터 집합을 크롤링할 수 있는 새 크롤러를 만듭니다. 이 예제 데이터는 이 퍼블릭 Amazon S3 버킷에 이미 존재합니다.

1. 새로운 크롤러를 실행한 다음 `legislators` 데이터베이스를 확인합니다.

   크롤러는 다음 메타데이터 테이블을 생성합니다.
   + `persons_json`
   + `memberships_json`
   + `organizations_json`
   + `events_json`
   + `areas_json`
   + `countries_r_json`

   제정자와 제정자 기록을 포함한 테이블 반정규화된 컬렉션입니다.

## 2단계: 개발 엔드포인트 노트북에 표준 문안 스크립트 추가
<a name="aws-glue-programming-python-samples-legislators-boilerplate"></a>

다음 표준 문안 스크립트를 개발 엔드포인트 노트북에 복사하고 필요한 AWS Glue 라이브러리로 들여와 단일 `GlueContext`를 설정합니다.

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 3단계: Data Catalog의 데이터에서 스키마 검토
<a name="aws-glue-programming-python-samples-legislators-schemas"></a>

다음으로 AWS Glue Data Catalog에서 검사 DynamicFrame을 쉽게 생성하고 데이터의 스키마를 검사할 수 있습니다. 예를 들어, `persons_json` 테이블의 스키마를 보고 노트북에 다음을 추가합니다.

```
persons = glueContext.create_dynamic_frame.from_catalog(
             database="legislators",
             table_name="persons_json")
print "Count: ", persons.count()
persons.printSchema()
```

프린트 호출에 따른 출력값입니다.

```
Count:  1961
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

테이블에 있는 각자는 미국 의회 기관의 멤버입니다.

`memberships_json` 테이블의 스키마를 보고 다음을 입력합니다.

```
memberships = glueContext.create_dynamic_frame.from_catalog(
                 database="legislators",
                 table_name="memberships_json")
print "Count: ", memberships.count()
memberships.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  10439
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string
```

`organizations`는 정당이고 미국의 상원과 하원인 의회 양원입니다. `organizations_json` 테이블의 스키마를 보고 다음을 입력합니다.

```
orgs = glueContext.create_dynamic_frame.from_catalog(
           database="legislators",
           table_name="organizations_json")
print "Count: ", orgs.count()
orgs.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  13
root
|-- classification: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- name: string
|-- seats: int
|-- type: string
```

## 4단계: 데이터 필터링
<a name="aws-glue-programming-python-samples-legislators-filtering"></a>

원하는 필드만 남기고 `id`를 `org_id`로 이름을 바꿉니다. 데이터셋은 전체를 볼 수 있을만큼 작습니다.

`toDF()`는 `DynamicFrame`를 Apache Spark `DataFrame`로 변환하기 때문에 Apache Spark SQL에 이미 존재한 변환을 적용할 수 있습니다.

```
orgs = orgs.drop_fields(['other_names',
                        'identifiers']).rename_field(
                            'id', 'org_id').rename_field(
                               'name', 'org_name')
orgs.toDF().show()
```

다음은 출력값을 보여줍니다.

```
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|classification|              org_id|            org_name|               links|seats|       type|               image|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
|         party|            party/al|                  AL|                null| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/democrat-li...|    Democrat-Liberal|[[website,http://...| null|       null|                null|
|   legislature|d56acebe-8fdc-47b...|House of Represen...|                null|  435|lower house|                null|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|party/new_progres...|     New Progressive|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/popular_dem...|    Popular Democrat|[[website,http://...| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|         party|party/republican-...|Republican-Conser...|[[website,http://...| null|       null|                null|
|         party|      party/democrat|            Democrat|[[website,http://...| null|       null|https://upload.wi...|
|         party|   party/independent|         Independent|                null| null|       null|                null|
|         party|    party/republican|          Republican|[[website,http://...| null|       null|https://upload.wi...|
|   legislature|8fa6c3d2-71dc-478...|              Senate|                null|  100|upper house|                null|
+--------------+--------------------+--------------------+--------------------+-----+-----------+--------------------+
```

`memberships`에 나타나는 `organizations`를 보려면 다음을 입력합니다.

```
memberships.select_fields(['organization_id']).toDF().distinct().show()
```

다음은 출력값을 보여줍니다.

```
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+
```

## 5단계: 데이터 조인
<a name="aws-glue-programming-python-samples-legislators-joining"></a>

이제 AWS Glue를 사용하여 이런 관계형 테이블을 조인하고 제정자 `memberships`의 전체 기록 테이블과 제정자에 대응하는 `organizations`을 생성합니다.

1. 우선, `persons`와 `memberships`를 `id`와 `person_id`로 조인합니다.

1. 다음으로 `org_id` 및 `organization_id`에서 `orgs`와 결과를 조인합니다.

1. 그 후, 중복 필드, `person_id` 및 `org_id`을 드롭합니다.

이 모든 작업을 하나 줄의 (확장된) 코드로 실행할 수 있습니다.

```
l_history = Join.apply(orgs,
                       Join.apply(persons, memberships, 'id', 'person_id'),
                       'org_id', 'organization_id').drop_fields(['person_id', 'org_id'])
print "Count: ", l_history.count()
l_history.printSchema()
```

출력값은 다음과 같습니다.

```
Count:  10439
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- death_date: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- family_name: string
|-- id: string
|-- start_date: string
|-- end_date: string
```

이제, 분석을 위한 최종 테이블이 있습니다. AWS Glue, Amazon Athena 또는 Amazon Redshift Spectrum에서 SQL을 실행할 수 있는 압축되고 효율적인 분석 형식(즉, Parquet)으로 작성할 수 있습니다.

다음 호출은 여러 파일의 테이블을 작성하여 나중에 분석을 할 때 빠른 병렬 판독을 지원합니다.

```
glueContext.write_dynamic_frame.from_options(frame = l_history,
          connection_type = "s3",
          connection_options = {"path": "s3://glue-sample-target/output-dir/legislator_history"},
          format = "parquet")
```

하나의 파일에 기록 데이터 모두를 넣으면 데이터 프레임으로 변환하고 다시 분할하며 작성할 수 있습니다.

```
s_history = l_history.toDF().repartition(1)
s_history.write.parquet('s3://glue-sample-target/output-dir/legislator_single')
```

상원과 하원으로 나누고자 한다면

```
l_history.toDF().write.parquet('s3://glue-sample-target/output-dir/legislator_part',
                               partitionBy=['org_name'])
```

## 6단계: 관계형 데이터베이스를 위한 데이터 변환
<a name="aws-glue-programming-python-samples-legislators-writing"></a>

AWS Glue를 사용하면 반정형 데이터가 포함된, Amazon Redshift와 같은 관계형 데이터베이스에도 데이터를 쉽게 쓸 수 있습니다. 프레임의 객체가 아무리 복잡하더라도 `DynamicFrames`를 평면화하는 변환 `relationalize`를 제공합니다.

이 예제에 `l_history` `DynamicFrame`를 사용하여 루트 테이블의 이름(`hist_root`)과 임시 작업 경로를 `relationalize`로 넘깁니다. 이렇게 하면 `DynamicFrameCollection`가 반환됩니다. 컬렉션의 `DynamicFrames`의 이름을 열거할 수 있습니다.

```
dfc = l_history.relationalize("hist_root", "s3://glue-sample-target/temp-dir/")
dfc.keys()
```

다음은 이 `keys` 호출의 출력입니다.

```
[u'hist_root', u'hist_root_contact_details', u'hist_root_links',
 u'hist_root_other_names', u'hist_root_images', u'hist_root_identifiers']
```

`Relationalize`는 `DynamicFrame`의 각 객체의 기록을 포함한 루트 테이블과 배열의 보조 테이블을 포함한 6 개의 새로운 테이블로 기록 테이블을 나눕니다. 관계형 데이터베이스에서 처리한 배열은 보통 차선책일 수 있습니다. 특히 배열이 커지면 그렇습니다. 배열을 다른 테이블로 나누면 쿼리 과정이 빨라집니다.

이제, `contact_details`를 보고 나누는 과정에 대해 알아봅니다.

```
l_history.select_fields('contact_details').printSchema()
dfc.select('hist_root_contact_details').toDF().where("id = 10 or id = 75").orderBy(['id','index']).show()
```

다음은 이 `show` 호출의 출력입니다.

```
root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|                         |
| 10|    1|                        |             202-225-1314|
| 10|    2|                   phone|                         |
| 10|    3|                        |             202-225-3772|
| 10|    4|                 twitter|                         |
| 10|    5|                        |          MikeRossUpdates|
| 75|    0|                     fax|                         |
| 75|    1|                        |             202-225-7856|
| 75|    2|                   phone|                         |
| 75|    3|                        |             202-225-2711|
| 75|    4|                 twitter|                         |
| 75|    5|                        |                SenCapito|
+---+-----+------------------------+-------------------------+
```

`contact_details` 필드는 기존 `DynamicFrame`의 구조 배열이었습니다. 이 배열에 있는 각 요소가 `index`에 따라 보조 테이블의 별도 행입니다. 여기서 `id`는 `contact_details` 키가 있는 `hist_root` 테이블의 외래 키입니다.

```
dfc.select('hist_root').toDF().where(
    "contact_details = 10 or contact_details = 75").select(
       ['id', 'given_name', 'family_name', 'contact_details']).show()
```

출력값은 다음과 같습니다.

```
+--------------------+----------+-----------+---------------+
|                  id|given_name|family_name|contact_details|
+--------------------+----------+-----------+---------------+
|f4fc30ee-7b42-432...|      Mike|       Ross|             10|
|e3c60f34-7d1b-4c0...|   Shelley|     Capito|             75|
+--------------------+----------+-----------+---------------+
```

이 명령어에는 `toDF()`와 `where` 순으로 선택하여 보고자 하는 열을 필터링합니다.

따라서 보조 테이블과 함께 `hist_root`을 조인하여 다음과 같은 작업을 수행할 수 있습니다.
+ 배열 지원 없이 데이터베이스로 데이터를 로드합니다.
+ SQL을 사용하여 배열에서 개별 항목을 쿼리합니다.

AWS Glue 연결을 사용하여 Amazon Redshift 자격 증명을 안전하게 저장하고 액세스합니다. 자체 연결을 만드는 방법에 대한 자세한 내용은 [데이터에 연결](glue-connections.md)을 참조하십시오.

이제 `DynamicFrames`를 차례로 순환하여 데이터를 연결에 쓸 준비가 되었습니다.

```
for df_name in dfc.keys():
  m_df = dfc.select(df_name)
  print "Writing to table: ", df_name
  glueContext.write_dynamic_frame.from_jdbc_conf(frame = m_df, connection settings here)
```

연결 설정은 관계형 데이터베이스 유형에 따라 달라집니다.
+ Amazon Redshift에 쓰는 방법에 대한 지침은 [Redshift 연결](aws-glue-programming-etl-connect-redshift-home.md) 섹션을 참조하세요.
+ 다른 데이터베이스의 경우 [AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션](aws-glue-programming-etl-connect.md) 섹션을 참조하세요.

## 결론
<a name="aws-glue-programming-python-samples-legislators-conclusion"></a>

전체적으로 AWS Glue는 유연성이 뛰어납니다. 보통 며칠을 걸려 작성해야 가능한 이 작업을 단 몇 줄의 코드로 얻을 수 있습니다. GitHub의 [AWS Glue 샘플](https://github.com/awslabs/aws-glue-samples)에 있는 Python 파일 `join_and_relationalize.py`에서 전체 소스-대상 ETL 스크립트를 찾을 수 있습니다.

# 코드 예: ResolveChoice, Lambda, 및 ApplyMapping을 사용한 데이터 준비
<a name="aws-glue-programming-python-samples-medicaid"></a>

이 예에서 사용된 데이터 집합은 두 [Data.CMS.gov](https://data.cms.gov) 데이터 집합("상위 100개 진단 관련 그룹에 대한 입원 환자 예상 지불 시스템 제공자 요약 - FY2011" 및 "FY2011 입원 환자 비용 데이터")에서 다운로드한 Medicare Provider 지불 데이터로 구성됩니다. 데이터 다운로드 후 데이터 집합을 수정하여 파일 끝에 몇 가지 잘못된 기록을 소개합니다. 이 수정된 파일은 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv`의 퍼블릭 Amazon S3 버킷에 있습니다.

이 예제에 대한 소스 코드는 [AWS Glue 예제](https://github.com/awslabs/aws-glue-samples) GitHub 리포지토리의 `data_cleaning_and_lambda.py` 파일에서 찾을 수 있습니다.

AWS에서 실행하는 중에 Python 또는 PySpark 스크립트를 디버깅하는 기본적인 방법은 [AWS Glue Studio에서 노트북](https://docs.aws.amazon.com/glue/latest/ug/notebooks-chapter.html)을 사용하는 것입니다.

## 1단계: Amazon S3 버킷에서 데이터 크롤
<a name="aws-glue-programming-python-samples-medicaid-crawling"></a>

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/)에서 AWS Glue 콘솔을 엽니다.

1. [크롤러 구성](define-crawler.md)의 절차를 밟고 `s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv` 파일을 크롤할 수 있는 새로운 크롤러를 생성하며 그 결과 생성된 메타데이터를 AWS Glue Data Catalog의 `payments`라는 데이터베이스에 둡니다.

1. 새로운 크롤러를 실행한 다음 `payments` 데이터베이스를 확인합니다. 파일의 시작 부분을 읽어 형식과 구분 기호를 확인한 후 크롤러가 데이터베이스에 `medicare`라는 이름의 메타데이터 테이블을 생성했을 것입니다.

   새로운 `medicare`의 스키마는 다음과 같습니다.

   ```
   Column  name                            Data type
   ==================================================
   drg definition                             string
   provider id                                bigint
   provider name                              string
   provider street address                    string
   provider city                              string
   provider state                             string
   provider zip code                          bigint
   hospital referral region description       string
   total discharges                           bigint
   average covered charges                    string
   average total payments                     string
   average medicare payments                  string
   ```

## 2단계: 개발 엔드포인트 노트북에 표준 문안 스크립트 추가
<a name="aws-glue-programming-python-samples-medicaid-boilerplate"></a>

다음 표준 문안 스크립트를 개발 엔드포인트 노트북에 복사하고 필요한 AWS Glue 라이브러리로 들여와 단일 `GlueContext`를 설정합니다.

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())
```

## 3단계: 다른 스키마 파싱과 비교
<a name="aws-glue-programming-python-samples-medicaid-schemas"></a>

그 다음, Apache Spark `DataFrame`이 인지한 스키마가 AWS Glue 크롤러가 기록한 것과 동일한지 알아봅니다. 이 코드를 실행합니다.

```
medicare = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").load(
   's3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv')
medicare.printSchema()
```

`printSchema` 호출에 따른 출력값입니다.

```
root
 |-- DRG Definition: string (nullable = true)
 |-- Provider Id: string (nullable = true)
 |-- Provider Name: string (nullable = true)
 |-- Provider Street Address: string (nullable = true)
 |-- Provider City: string (nullable = true)
 |-- Provider State: string (nullable = true)
 |-- Provider Zip Code: integer (nullable = true)
 |-- Hospital Referral Region Description: string (nullable = true)
 |--  Total Discharges : integer (nullable = true)
 |--  Average Covered Charges : string (nullable = true)
 |--  Average Total Payments : string (nullable = true)
 |-- Average Medicare Payments: string (nullable = true)
```

다음, AWS Glue `DynamicFrame`가 생성한 스키마를 알아봅니다.

```
medicare_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
       database = "payments",
       table_name = "medicare")
medicare_dynamicframe.printSchema()
```

`printSchema`의 출력값은 다음과 같습니다.

```
root
 |-- drg definition: string
 |-- provider id: choice
 |    |-- long
 |    |-- string
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

`DynamicFrame`는 `provider id`가 `long` 혹은 `string` 유형일 수 있는 스키마를 생성합니다. `DataFrame` 스키마는 `Provider Id`를 `string` 유형으로 목록에 기록하고 Data Catalog는 `provider id`를 `bigint` 유형으로 목록에 기록합니다.

무엇이 정답입니까? 파일의 끝에는 이 열에 있는 `string` 값과 함께 (160,000 기록 중) 두 기록이 있습니다. 이 두 기록은 문제를 보여주기 위한 잘못된 기록입니다.

이런 문제를 설명하기 위해서 AWS Glue `DynamicFrame`는 *선택* 유형의 개념을 도입합니다. 이 경우, `DynamicFrame`는 이 열에 나타나는 `long` 및 `string` 모두를 보여줍니다. AWS Glue 크롤러는 `string` 값을 누락했는데 그 이유는 데이터 2MB 접두사만 고려했기 때문입니다. Apache Spark `DataFrame`는 전체 데이터 세트를 고려했지만 `string`이라는 열에 가장 일반적인 유형을 지정하도록 강요되었습니다. 사실, Spark는 익숙하지 않은 복잡한 유형이나 변수가 있으면 가장 일반적인 케이스를 적용합니다.

`provider id` 열을 쿼리하려면 먼저 선택 유형을 선택합니다. `DynamicFrame`의 `resolveChoice` 변환 방법을 사용하여 `cast:long` 옵션으로 `string` 값을 `long` 값으로 변환할 수 있습니다.

```
medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
medicare_res.printSchema()
```

이제 `printSchema` 출력값은 다음과 같습니다.

```
root
 |-- drg definition: string
 |-- provider id: long
 |-- provider name: string
 |-- provider street address: string
 |-- provider city: string
 |-- provider state: string
 |-- provider zip code: long
 |-- hospital referral region description: string
 |-- total discharges: long
 |-- average covered charges: string
 |-- average total payments: string
 |-- average medicare payments: string
```

값이 보낼 수 없는 `string`이면 AWS Glue는 `null`을 삽입합니다.

다른 방법으로 선택 유형을 `struct`으로 변환하는 것인데 두 유형의 값은 유지됩니다.

다음, 이례적인 열을 알아봅니다.

```
medicare_res.toDF().where("'provider id' is NULL").show()
```

다음을 알아봅니다.

```
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|      drg definition|provider id|  provider name|provider street address|provider city|provider state|provider zip code|hospital referral region description|total discharges|average covered charges|average total payments|average medicare payments|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
|948 - SIGNS & SYM...|       null|            INC|       1050 DIVISION ST|      MAUSTON|            WI|            53948|                        WI - Madison|              12|              $11961.41|              $4619.00|                 $3775.33|
|948 - SIGNS & SYM...|       null| INC- ST JOSEPH|     5000 W CHAMBERS ST|    MILWAUKEE|            WI|            53210|                      WI - Milwaukee|              14|              $10514.28|              $5562.50|                 $4522.78|
+--------------------+-----------+---------------+-----------------------+-------------+--------------+-----------------+------------------------------------+----------------+-----------------------+----------------------+-------------------------+
```

이제, 오류 기록을 다음과 같이 제거합니다.

```
medicare_dataframe = medicare_res.toDF()
medicare_dataframe = medicare_dataframe.where("'provider id' is NOT NULL")
```

## 4단계: 데이터 매핑 및 Apache Spark Lambda 함수 사용
<a name="aws-glue-programming-python-samples-medicaid-lambda-mapping"></a>

AWS Glue는 사용자 정의 함수인 Lambda 함수를 직접 지원하지 않습니다. 하지만 항상 `DynamicFrame`와 Apache Spark `DataFrame` 간에 변환하여 `DynamicFrames`의 특별한 기능 외에도 Spark 기능도 활용할 수 있습니다.

그런 다음 결제 정보를 숫자로 변환하여 Amazon Redshift 또는 Amazon Athena와 같은 분석 엔진이 빠르게 수 처리를 할 수 있게 만듭니다.

```
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

chop_f = udf(lambda x: x[1:], StringType())
medicare_dataframe = medicare_dataframe.withColumn(
        "ACC", chop_f(
            medicare_dataframe["average covered charges"])).withColumn(
                "ATP", chop_f(
                    medicare_dataframe["average total payments"])).withColumn(
                        "AMP", chop_f(
                            medicare_dataframe["average medicare payments"]))
medicare_dataframe.select(['ACC', 'ATP', 'AMP']).show()
```

`show` 호출의 출력값은 다음과 같습니다.

```
+--------+-------+-------+
|     ACC|    ATP|    AMP|
+--------+-------+-------+
|32963.07|5777.24|4763.73|
|15131.85|5787.57|4976.71|
|37560.37|5434.95|4453.79|
|13998.28|5417.56|4129.16|
|31633.27|5658.33|4851.44|
|16920.79|6653.80|5374.14|
|11977.13|5834.74|4761.41|
|35841.09|8031.12|5858.50|
|28523.39|6113.38|5228.40|
|75233.38|5541.05|4386.94|
|67327.92|5461.57|4493.57|
|39607.28|5356.28|4408.20|
|22862.23|5374.65|4186.02|
|31110.85|5366.23|4376.23|
|25411.33|5282.93|4383.73|
| 9234.51|5676.55|4509.11|
|15895.85|5930.11|3972.85|
|19721.16|6192.54|5179.38|
|10710.88|4968.00|3898.88|
|51343.75|5996.00|4962.45|
+--------+-------+-------+
only showing top 20 rows
```

이것은 아직도 모두 데이터에서 문자열입니다. 강력한 `apply_mapping` 변환 방법을 사용하여 데이터를 드롭, 이름 바꾸기, 중첩할 수 있어 다른 데이터 프로그램 언어 및 시스템이 쉽게 접근할 수 있도록 합니다.

```
from awsglue.dynamicframe import DynamicFrame
medicare_tmp_dyf = DynamicFrame.fromDF(medicare_dataframe, glueContext, "nested")
medicare_nest_dyf = medicare_tmp_dyf.apply_mapping([('drg definition', 'string', 'drg', 'string'),
                 ('provider id', 'long', 'provider.id', 'long'),
                 ('provider name', 'string', 'provider.name', 'string'),
                 ('provider city', 'string', 'provider.city', 'string'),
                 ('provider state', 'string', 'provider.state', 'string'),
                 ('provider zip code', 'long', 'provider.zip', 'long'),
                 ('hospital referral region description', 'string','rr', 'string'),
                 ('ACC', 'string', 'charges.covered', 'double'),
                 ('ATP', 'string', 'charges.total_pay', 'double'),
                 ('AMP', 'string', 'charges.medicare_pay', 'double')])
medicare_nest_dyf.printSchema()
```

`printSchema` 출력값은 다음과 같습니다.

```
root
 |-- drg: string
 |-- provider: struct
 |    |-- id: long
 |    |-- name: string
 |    |-- city: string
 |    |-- state: string
 |    |-- zip: long
 |-- rr: string
 |-- charges: struct
 |    |-- covered: double
 |    |-- total_pay: double
 |    |-- medicare_pay: double
```

데이터를 Spark `DataFrame`으로 되돌리면 현재는 어떻게 생겼는지 볼 수 있습니다.

```
medicare_nest_dyf.toDF().show()
```

출력값은 다음과 같습니다.

```
+--------------------+--------------------+---------------+--------------------+
|                 drg|            provider|             rr|             charges|
+--------------------+--------------------+---------------+--------------------+
|039 - EXTRACRANIA...|[10001,SOUTHEAST ...|    AL - Dothan|[32963.07,5777.24...|
|039 - EXTRACRANIA...|[10005,MARSHALL M...|AL - Birmingham|[15131.85,5787.57...|
|039 - EXTRACRANIA...|[10006,ELIZA COFF...|AL - Birmingham|[37560.37,5434.95...|
|039 - EXTRACRANIA...|[10011,ST VINCENT...|AL - Birmingham|[13998.28,5417.56...|
|039 - EXTRACRANIA...|[10016,SHELBY BAP...|AL - Birmingham|[31633.27,5658.33...|
|039 - EXTRACRANIA...|[10023,BAPTIST ME...|AL - Montgomery|[16920.79,6653.8,...|
|039 - EXTRACRANIA...|[10029,EAST ALABA...|AL - Birmingham|[11977.13,5834.74...|
|039 - EXTRACRANIA...|[10033,UNIVERSITY...|AL - Birmingham|[35841.09,8031.12...|
|039 - EXTRACRANIA...|[10039,HUNTSVILLE...|AL - Huntsville|[28523.39,6113.38...|
|039 - EXTRACRANIA...|[10040,GADSDEN RE...|AL - Birmingham|[75233.38,5541.05...|
|039 - EXTRACRANIA...|[10046,RIVERVIEW ...|AL - Birmingham|[67327.92,5461.57...|
|039 - EXTRACRANIA...|[10055,FLOWERS HO...|    AL - Dothan|[39607.28,5356.28...|
|039 - EXTRACRANIA...|[10056,ST VINCENT...|AL - Birmingham|[22862.23,5374.65...|
|039 - EXTRACRANIA...|[10078,NORTHEAST ...|AL - Birmingham|[31110.85,5366.23...|
|039 - EXTRACRANIA...|[10083,SOUTH BALD...|    AL - Mobile|[25411.33,5282.93...|
|039 - EXTRACRANIA...|[10085,DECATUR GE...|AL - Huntsville|[9234.51,5676.55,...|
|039 - EXTRACRANIA...|[10090,PROVIDENCE...|    AL - Mobile|[15895.85,5930.11...|
|039 - EXTRACRANIA...|[10092,D C H REGI...|AL - Tuscaloosa|[19721.16,6192.54...|
|039 - EXTRACRANIA...|[10100,THOMAS HOS...|    AL - Mobile|[10710.88,4968.0,...|
|039 - EXTRACRANIA...|[10103,BAPTIST ME...|AL - Birmingham|[51343.75,5996.0,...|
+--------------------+--------------------+---------------+--------------------+
only showing top 20 rows
```

## 5단계: 데이터를 Apache Parquet에 쓰기
<a name="aws-glue-programming-python-samples-medicaid-writing"></a>

AWS Glue는 Apache Parquet과 같은 형식으로 데이터를 쉽게 작성할 수 있어 관계형 데이터베이스가 효과적으로 소비될 수 있습니다.

```
glueContext.write_dynamic_frame.from_options(
       frame = medicare_nest_dyf,
       connection_type = "s3",
       connection_options = {"path": "s3://glue-sample-target/output-dir/medicare_parquet"},
       format = "parquet")
```