

# AWS Glue ETL ジョブを使用してデータカタログのスキーマを更新し、新規パーティションを追加する
<a name="update-from-job"></a>

抽出、変換、ロード (ETL) ジョブによって、ターゲットデータストアに新しいテーブルパーティションが作成される場合があります。データセットスキーマは、時間の経過とともに AWS Glue Data Catalog スキーマから進化し、拡散する可能性があります。AWS GlueETL ジョブには、ETL スクリプト内で使用できるいくつかの機能が用意され、Data Catalog でスキーマおよびパーティションを更新できるようになりました。これらの機能を使用すると、クローラを再実行することなく、Data Catalog での ETL 作業の結果を確認できます。

## 新しいパーティション
<a name="update-from-job-partitions"></a>

AWS Glue Data Catalog で新しいパーティションを表示するには、次のいずれかを実行します。
+ ジョブが終了したら、クローラを再実行し、クローラの完了時にコンソールで新しいパーティションを表示します。
+ ジョブが終了すると、クローラを再実行することなく、コンソールで新しいパーティションがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードは `enableUpdateCatalog` 引数を使用して、新しいパーティションの作成時のジョブ実行中に Data Catalog を更新する必要があることを示します。

**方式 1**  
オプション引数に `enableUpdateCatalog` と `partitionKeys` を渡します。  

```
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["region", "year", "month", "day"]


sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database={{<target_db_name>}},
                                                    table_name={{<target_table_name>}}, transformation_ctx="write_sink",
                                                    additional_options=additionalOptions)
```

```
val options = JsonOptions(Map(
    "path" -> {{<S3_output_path>}}, 
    "partitionKeys" -> Seq("region", "year", "month", "day"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(
    database = {{<target_db_name>}}, 
    tableName = {{<target_table_name>}}, 
    additionalOptions = options)sink.writeDynamicFrame(df)
```

**方式 2**  
`enableUpdateCatalog` に `partitionKeys` と `getSink()` を渡して、`DataSink` オブジェクトで `setCatalogInfo()` を呼び出します。  

```
sink = glueContext.getSink(
    connection_type="s3", 
    path="{{<S3_output_path>}}",
    enableUpdateCatalog=True,
    partitionKeys=["region", "year", "month", "day"])
sink.setFormat("json")
sink.setCatalogInfo(catalogDatabase={{<target_db_name>}}, catalogTableName={{<target_table_name>}})
sink.writeFrame(last_transform)
```

```
val options = JsonOptions(
   Map("path" -> {{<S3_output_path>}}, 
       "partitionKeys" -> Seq("region", "year", "month", "day"), 
       "enableUpdateCatalog" -> true))
val sink = glueContext.getSink("s3", options).withFormat("json")
sink.setCatalogInfo({{<target_db_name>}}, {{<target_table_name>}})
sink.writeDynamicFrame(df)
```

クローラを再実行することなく、AWS Glue ETLジョブ自体を使用して、Data Catalog での新しいカタログテーブルの作成、変更されたスキーマによる既存のテーブルの更新、および新しいテーブルパーティションの追加が可能になりました。

## テーブルスキーマの更新
<a name="update-from-job-updating-table-schema"></a>

Data Catalog テーブルのスキーマを上書きする場合は、次のいずれかを実行します。
+ ジョブが終了したら、クローラを再実行し、テーブル定義も更新するようにクローラが設定されていることを確認します。クローラが終了したら、新しいパーティションをスキーマの更新とともにコンソールに表示します。詳細については、「[API を使用したクローラの設定](https://docs.aws.amazon.com/glue/latest/dg/crawler-configuration.html#crawler-configure-changes-api)」を参照してください。
+ ジョブが終了すると、クローラを再実行することなく、コンソールで変更済みスキーマがすぐに表示されます。この機能は、次の例に示すように、ETL スクリプトに数行のコードを追加して有効にすることができます。このコードでは `enableUpdateCatalog` を true に設定し、`updateBehavior` を `UPDATE_IN_DATABASE` に設定しています。これは、ジョブ実行中にスキーマを上書きし、Data Catalog に新しいパーティションを追加することを示します。

------
#### [ Python ]

```
additionalOptions = {
    "enableUpdateCatalog": True, 
    "updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"]

sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database={{<dst_db_name>}},
    table_name={{<dst_tbl_name>}}, transformation_ctx="write_sink",
    additional_options=additionalOptions)
job.commit()
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("partition_0", "partition_1"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options)
sink.writeDynamicFrame(df)
```

------

テーブルスキーマが上書きされないようにして、新しいパーティションを追加する場合は、`updateBehavior` 値を `LOG` に設定することもできます。`updateBehavior` のデフォルト値は `UPDATE_IN_DATABASE` です。そのため、明示的に定義しない場合、テーブルスキーマは上書きされます。

`enableUpdateCatalog` が true に設定されていない場合、`updateBehavior` で選択したオプションに関係なく、ETL ジョブは Data Catalog 内のテーブルを更新しません。

## 新しいテーブルの作成
<a name="update-from-job-creating-new-tables"></a>

同じオプションを使用して、Data Catalog で新しいテーブルを作成することもできます。`setCatalogInfo` を使用して、データベースと新しいテーブル名を指定できます。

------
#### [ Python ]

```
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
    enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase={{<dst_db_name>}}, catalogTableName={{<dst_tbl_name>}})
sink.writeFrame(last_transform)
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), 
    "enableUpdateCatalog" -> true, 
    "updateBehavior" -> "UPDATE_IN_DATABASE"))
val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>")
sink.setCatalogInfo(catalogDatabase = “{{<dst_db_name>}}”, catalogTableName = “{{<dst_tbl_name>}}”)
sink.writeDynamicFrame(df)
```

------

## 制限事項
<a name="update-from-job-restrictions"></a>

次の制限事項に注意してください。
+ Amazon Simple Storage Service (Amazon S3) のターゲットのみがサポートされています。
+ `enableUpdateCatalog` 機能は、管理対象テーブルでは、サポートされていません。
+ `json`、`csv`、`avro`、および `parquet` の形式のみがサポートされます。
+ `parquet` 分類でテーブルを作成または更新するには、AWS Glue に最適化された DynamicFrames 用 parquet ライターを使用する必要があります。これは、次のいずれかの方法で実現できます。
  + `parquet` 分類を使用してカタログ内の既存のテーブルを更新する場合は、更新前にテーブルでは `"useGlueParquetWriter"` テーブルプロパティを `true` に設定する必要があります。このプロパティは、AWS Glue API/SDK、コンソール、または Athena DDL ステートメントから設定できます。  
![AWS Glue コンソールのカタログテーブルプロパティの編集フィールド。](http://docs.aws.amazon.com/ja_jp/glue/latest/dg/images/edit-table-property.png)

    カタログテーブルプロパティを設定したら、次のコードスニペットを使用して新しいデータでカタログテーブルを更新します。

    ```
    glueContext.write_dynamic_frame.from_catalog(
        frame={{frameToWrite}},
        database="{{dbName}}",
        table_name="{{tableName}}",
        additional_options={
            "enableUpdateCatalog": True,
            "updateBehavior": "UPDATE_IN_DATABASE"
        }
    )
    ```
  + テーブルがカタログ内にまだ存在しない場合は、`connection_type="s3"` を用いた `getSink()` メソッドをスクリプトで使用して、データを Amazon S3 に書き込むとともに、テーブルとそのパーティションをカタログに追加します。ワークフローに適切な `partitionKeys` と `compression` を指定します。

    ```
    s3sink = glueContext.getSink(
        path="s3://{{bucket/folder/}}",
        connection_type="s3",
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=[],
        compression="snappy",
        enableUpdateCatalog=True
    )
        
    s3sink.setCatalogInfo(
        catalogDatabase="{{dbName}}", catalogTableName="{{tableName}}"
    )
        
    s3sink.setFormat("parquet", useGlueParquetWriter=True)
    s3sink.writeFrame({{frameToWrite}})
    ```
  + `glueparquet` フォーマットの値は、AWS Glue parquet ライターを有効にする従来のメソッドです。
+ `updateBehavior` を `LOG` に設定した場合、`DynamicFrame` スキーマが Data Catalog テーブルのスキーマに定義されている列のサブセットと同等であるか、またはそのサブセットを含んでいる場合にのみ、新しいパーティションが追加されます。
+ スキーマの更新は、パーティション化されていないテーブル (「partitionKeys」オプションを使用していない) ではサポートされていません。
+ partitionKeys は、ETL スクリプトで渡されるパラメータと Data Catalog テーブルスキーマの partitionKeys で同等で、同じ順序でなければなりません。
+ この機能は、現在、更新スキーマがネストされているテーブルの更新/作成をサポートしていません (例えば、構造体の内部の配列)。

詳細については、[Spark スクリプトのプログラミング](aws-glue-programming.md) を参照してください。