AWS Glue を使用した Amazon S3 テーブルでの ETL ジョブの実行
AWS Glueは、分析を行うユーザーが複数のソースからのデータを簡単に検出、準備、移動、統合できるようにするサーバーレスのデータ統合サービスです。AWS Glue ジョブを使用して抽出、変換、ロード (ETL) パイプラインを実行し、データをデータレイクにロードできます。AWS Glue の詳細については、AWS Glue デベロッパーガイド の「What is AWS Glue?」を参照してください。
AWS Glue ジョブには、ソースデータに接続して処理し、データターゲットに書き出すスクリプトがカプセル化されています。通常、ジョブは、抽出、変換、ロード (ETL) スクリプトを実行します。ジョブは、Apache Spark ランタイム環境向けに設計されたスクリプトを実行できます。ジョブ実行をモニタリングすると、完了ステータス、継続時間、開始時間などのランタイムメトリクスを知ることができます。
AWS Glue のジョブを使用して S3 テーブルのデータを処理するには、AWS 分析サービスとの統合を通じてテーブルに接続するか、Amazon S3 Tables の Iceberg REST エンドポイントまたは Apache Iceberg の Amazon S3 Tables カタログを使用して直接接続します。このガイドでは、S3 Tables で AWS Glue の使用を開始するための基本的な手順を示します。
前提条件
AWS Glue ジョブからテーブルに対してクエリを実行する前に、AWS Glue がジョブの実行に使用できる IAM ロールを設定し、ジョブの実行時に AWS Glue がアクセスできる Amazon S3 バケットに Amazon S3 Tables Catalog for Apache Iceberg JAR をアップロードする必要があります。
-
テーブルバケットを AWS 分析サービスと統合します。
-
AWS Glue 用の IAM ロールを作成します。
-
(オプション) Apache Iceberg で Amazon S3 Tables カタログを使用する場合は、クライアントカタログ JAR をダウンロードして S3 バケットにアップロードする必要があります。
カタログ JAR のダウンロード
-
Maven Central で最新バージョンを検索します。ブラウザまたは次のコマンドを使用して、Maven central から JAR をダウンロードできます。必ずバージョン番号
を最新バージョンに置き換えてください。
wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5
/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
-
ダウンロードした JAR を、AWS Glue IAM ロールがアクセスできる S3 バケットにアップロードします。JAR をアップロードするには、次の AWS CLI コマンドを実行します。バージョン番号
を最新バージョンに置き換え、バケット名
とパス
を独自のものに置き換えます。
aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar s3://amzn-s3-demo-bucket
/jars/
テーブルバケットに接続するスクリプトを作成する
AWS Glue ETL ジョブの実行時にテーブルデータにアクセスするには、S3 テーブルバケットに接続する Apache Iceberg の Spark セッションを設定します。既存のスクリプトを変更してテーブルバケットに接続するか、新しいスクリプトを作成できます。AWS Glue スクリプトの作成の詳細については、「AWS Glue デベロッパーガイド」の「チュートリアル: AWS Glue for Spark スクリプトの記述」を参照してください。
次の S3 Tables アクセス方法のいずれかを使用して、テーブルバケットに接続するようにセッションを設定できます。
-
S3 Tables と AWS 分析サービスの統合
-
Amazon S3 Tables の Iceberg REST エンドポイント
-
Apache Iceberg の Amazon S3 Tables カタログ
セットアップ手順と設定例を表示するには、次のアクセス方法から選択します。
- AWS analytics services integration
-
AWS 分析サービス統合を使用して AWS Glue の Spark でテーブルをクエリするための前提条件として、テーブルバケットを AWS 分析サービスと統合する必要があります。
テーブルバケットへの接続は、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用して設定できます。以下の例を使用するには、プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
- PySpark スクリプトの使用
-
PySpark スクリプトで次のコードスニペットを使用して、統合を通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket
/warehouse/") \
.getOrCreate()
- インタラクティブな AWS Glue セッションの使用
-
AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure
マジックを使用して同じ設定を指定します。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
--conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket
/warehouse/"}
- Amazon S3 Tables Iceberg REST endpoint
-
テーブルバケットへの接続を設定するには、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用できます。以下の例を使用するには、プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
- PySpark スクリプトの使用
次のコードスニペットを PySpark スクリプトで使用して、エンドポイントを通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region
.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region
:111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region
") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
- インタラクティブな AWS Glue セッションの使用
AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure
マジックを使用して同じ設定を指定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region
.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region
:111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
--conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region
--conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
- Amazon S3 Tables Catalog for Apache Iceberg
-
Apache Iceberg で Amazon S3 Tables カタログを使用してテーブルに接続するための前提条件として、まず最新のカタログ jar をダウンロードして S3 バケットにアップロードする必要があります。次に、ジョブの作成時に、クライアントカタログ JAR へのパスを特別なパラメータとして追加します。AWS Glue のジョブパラメータの詳細については、「AWS Glue Developer Guide」の「Special parameters used in AWS Glue jobs」を参照してください。
テーブルバケットへの接続を設定するには、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用できます。以下の例を使用するには、プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
- PySpark スクリプトの使用
-
次のコードスニペットを PySpark スクリプトで使用して、JAR を通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region
:111122223333:bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
- インタラクティブな AWS Glue セッションの使用
-
AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure
マジックを使用して同じ設定を指定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region
:111122223333:bucket/amzn-s3-demo-table-bucket
", "extra-jars": "s3://amzn-s3-demo-bucket
/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar"}
サンプルスクリプト
以下のサンプル PySpark スクリプトは、AWS Glue ジョブで S3 テーブルへのクエリをテストするために使用できます。各スクリプトはテーブルバケットに接続し、新しい名前空間の作成、サンプルテーブルの作成、テーブルへのデータの挿入、テーブルデータの取得を行うためのクエリを実行します。スクリプトを使用するには、プレースホルダー値
を独自のテーブルバケットの情報に置き換えます。
S3 Tables のアクセス方法に応じて、以下のスクリプトから選択します。
- S3 Tables integration with AWS analytics services
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket
/bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
namespace = "new_namespace
"
table = "new_table
"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Iceberg REST endpoint
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region
.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region
:111122223333
:bucket/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region
") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
namespace = "s3_tables_rest_namespace
"
table = "new_table_s3_rest
"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Catalog for Apache Iceberg
-
from pyspark.sql import SparkSession
#Spark session configurations
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region
:111122223333
:bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
#Script
namespace = "new_namespace
"
table = "new_table
"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
テーブルをクエリする AWS Glue ジョブを作成する
次の手順は、S3 テーブルバケットに接続する AWS Glue ジョブの設定方法を示しています。これを行うには、AWS CLI を使用するか、コンソールで AWS Glue Studio スクリプトエディタを使用します。詳細については、「AWS Glue ユーザーガイド」の「AWS Glue でジョブを作成する」を参照してください。
次の手順は、AWS Glue Studio スクリプトエディタを使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。
https://console.aws.amazon.com/glue/ で AWS Glue コンソール を開きます。
-
ナビゲーションペインで、[ETL ジョブ] を選択します。
-
[スクリプトエディタ] を選択し、[スクリプトのアップロード] を選択し、S3 テーブルをクエリするために作成した PySpark スクリプトをアップロードします。
-
[ジョブの詳細] タブを選択し、[基本プロパティ] に次のように入力します。
-
(オプション) Amazon S3 Tables カタログを Apache Iceberg のアクセス方法として使用する場合は、[詳細プロパティ] を展開し、[依存 JARS パス] に、S3 バケットにアップロードしたクライアントカタログ jar の S3 URI を前提条件として入力します。例: s3://amzn-s3-demo-bucket1
/jars
/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar
-
[保存] を選択してジョブを作成します。
-
[実行] を選択してジョブを開始し、[実行] タブでジョブのステータスを確認します。
次の手順は、AWS CLI を使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。コマンドを使用するには、プレースホルダー値
を独自の値に置き換えます。
-
AWS Glue ジョブを作成します。
aws glue create-job \
--name etl-tables-job
\
--role arn:aws:iam::111122223333
:role/AWSGlueServiceRole
\
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://amzn-s3-demo-bucket1
/scripts/glue-etl-query.py
",
"PythonVersion": "3"
}' \
--default-arguments '{
"--job-language": "python",
"--class": "GlueApp"
}' \
--glue-version "5.0"
(オプション) Amazon S3 Tables カタログを Apache Iceberg のアクセス方法として使用する場合は、--extra-jars
パラメータを使用してクライアントカタログ JAR を --default-arguments
に追加します。パラメータを追加するときに、入力プレースホルダー
は独自のものに置き換えます。
"--extra-jars": "s3://amzn-s3-demo-bucket
/jar-path
/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar"
-
ジョブを開始します。
aws glue start-job-run \
--job-name etl-tables-job
-
ジョブのステータスを確認するには、前のコマンドの実行 ID をコピーして、次のコマンドに入力します。
aws glue get-job-run --job-name etl-tables-job
\
--run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad