AWS Glue を使用した Amazon S3 テーブルでの ETL ジョブの実行 - Amazon Simple Storage Service

AWS Glue を使用した Amazon S3 テーブルでの ETL ジョブの実行

AWS Glueは、分析を行うユーザーが複数のソースからのデータを簡単に検出、準備、移動、統合できるようにするサーバーレスのデータ統合サービスです。AWS Glue ジョブを使用して抽出、変換、ロード (ETL) パイプラインを実行し、データをデータレイクにロードできます。AWS Glue の詳細については、AWS Glue デベロッパーガイド の「What is AWS Glue?」を参照してください。

AWS Glue ジョブには、ソースデータに接続して処理し、データターゲットに書き出すスクリプトがカプセル化されています。通常、ジョブは、抽出、変換、ロード (ETL) スクリプトを実行します。ジョブは、Apache Spark ランタイム環境向けに設計されたスクリプトを実行できます。ジョブ実行をモニタリングすると、完了ステータス、継続時間、開始時間などのランタイムメトリクスを知ることができます。

AWS Glue のジョブを使用して S3 テーブルのデータを処理するには、AWS 分析サービスとの統合を通じてテーブルに接続するか、Amazon S3 Tables の Iceberg REST エンドポイントまたは Apache Iceberg の Amazon S3 Tables カタログを使用して直接接続します。このガイドでは、S3 Tables で AWS Glue の使用を開始するための基本的な手順を示します。

注記

S3 Tables は、AWS Glue バージョン 5.0 以降でサポートされています。

前提条件

AWS Glue ジョブからテーブルに対してクエリを実行する前に、AWS Glue がジョブの実行に使用できる IAM ロールを設定し、ジョブの実行時に AWS Glue がアクセスできる Amazon S3 バケットに Amazon S3 Tables Catalog for Apache Iceberg JAR をアップロードする必要があります。

  • テーブルバケットを AWS 分析サービスと統合します

  • AWS Glue 用の IAM ロールを作成します

    • AmazonS3TablesFullAccess 管理ポリシーをロールにアタッチします。

    • AmazonS3FullAccess 管理ポリシーをロールにアタッチします。

  • (オプション) Apache Iceberg で Amazon S3 Tables カタログを使用する場合は、クライアントカタログ JAR をダウンロードして S3 バケットにアップロードする必要があります。

    カタログ JAR のダウンロード
    1. Maven Central で最新バージョンを検索します。ブラウザまたは次のコマンドを使用して、Maven central から JAR をダウンロードできます。必ずバージョン番号を最新バージョンに置き換えてください。

      wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
    2. ダウンロードした JAR を、AWS Glue IAM ロールがアクセスできる S3 バケットにアップロードします。JAR をアップロードするには、次の AWS CLI コマンドを実行します。バージョン番号を最新バージョンに置き換え、バケット名パスを独自のものに置き換えます。

      aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

テーブルバケットに接続するスクリプトを作成する

AWS Glue ETL ジョブの実行時にテーブルデータにアクセスするには、S3 テーブルバケットに接続する Apache Iceberg の Spark セッションを設定します。既存のスクリプトを変更してテーブルバケットに接続するか、新しいスクリプトを作成できます。AWS Glue スクリプトの作成の詳細については、「AWS Glue デベロッパーガイド」の「チュートリアル: AWS Glue for Spark スクリプトの記述」を参照してください。

次の S3 Tables アクセス方法のいずれかを使用して、テーブルバケットに接続するようにセッションを設定できます。

  • S3 Tables と AWS 分析サービスの統合

  • Amazon S3 Tables の Iceberg REST エンドポイント

  • Apache Iceberg の Amazon S3 Tables カタログ

セットアップ手順と設定例を表示するには、次のアクセス方法から選択します。

AWS analytics services integration

AWS 分析サービス統合を使用して AWS Glue の Spark でテーブルをクエリするための前提条件として、テーブルバケットを AWS 分析サービスと統合する必要があります。

テーブルバケットへの接続は、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用して設定できます。以下の例を使用するには、プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

PySpark スクリプトの使用

PySpark スクリプトで次のコードスニペットを使用して、統合を通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
インタラクティブな AWS Glue セッションの使用

AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure マジックを使用して同じ設定を指定します。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
Amazon S3 Tables Iceberg REST endpoint

テーブルバケットへの接続を設定するには、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用できます。以下の例を使用するには、プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

PySpark スクリプトの使用

次のコードスニペットを PySpark スクリプトで使用して、エンドポイントを通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
インタラクティブな AWS Glue セッションの使用

AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure マジックを使用して同じ設定を指定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
Amazon S3 Tables Catalog for Apache Iceberg

Apache Iceberg で Amazon S3 Tables カタログを使用してテーブルに接続するための前提条件として、まず最新のカタログ jar をダウンロードして S3 バケットにアップロードする必要があります。次に、ジョブの作成時に、クライアントカタログ JAR へのパスを特別なパラメータとして追加します。AWS Glue のジョブパラメータの詳細については、「AWS Glue Developer Guide」の「Special parameters used in AWS Glue jobs」を参照してください。

テーブルバケットへの接続を設定するには、ジョブの Spark セッションを使用するか、インタラクティブセッションの AWS Glue Studio マジックを使用できます。以下の例を使用するには、プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

PySpark スクリプトの使用

次のコードスニペットを PySpark スクリプトで使用して、JAR を通じてテーブルバケットに接続するように AWS Glue ジョブを設定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
インタラクティブな AWS Glue セッションの使用

AWS Glue 5.0 でインタラクティブなノートブックセッションを使用する場合は、コードの実行前にセルで %%configure マジックを使用して同じ設定を指定します。プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}

サンプルスクリプト

以下のサンプル PySpark スクリプトは、AWS Glue ジョブで S3 テーブルへのクエリをテストするために使用できます。各スクリプトはテーブルバケットに接続し、新しい名前空間の作成、サンプルテーブルの作成、テーブルへのデータの挿入、テーブルデータの取得を行うためのクエリを実行します。スクリプトを使用するには、プレースホルダー値を独自のテーブルバケットの情報に置き換えます。

S3 Tables のアクセス方法に応じて、以下のスクリプトから選択します。

S3 Tables integration with AWS analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Iceberg REST endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

テーブルをクエリする AWS Glue ジョブを作成する

次の手順は、S3 テーブルバケットに接続する AWS Glue ジョブの設定方法を示しています。これを行うには、AWS CLI を使用するか、コンソールで AWS Glue Studio スクリプトエディタを使用します。詳細については、「AWS Glue ユーザーガイド」の「AWS Glue でジョブを作成する」を参照してください。

次の手順は、AWS Glue Studio スクリプトエディタを使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。

  1. https://console.aws.amazon.com/glue/ で AWS Glue コンソール を開きます。

  2. ナビゲーションペインで、[ETL ジョブ] を選択します。

  3. [スクリプトエディタ] を選択し、[スクリプトのアップロード] を選択し、S3 テーブルをクエリするために作成した PySpark スクリプトをアップロードします。

  4. [ジョブの詳細] タブを選択し、[基本プロパティ] に次のように入力します。

    • [名前] で、ジョブの名前を入力します。

    • [IAM ロール] で、作成した AWS Glue ロールを選択します。

  5. (オプション) Amazon S3 Tables カタログを Apache Iceberg のアクセス方法として使用する場合は、[詳細プロパティ] を展開し、[依存 JARS パス] に、S3 バケットにアップロードしたクライアントカタログ jar の S3 URI を前提条件として入力します。例: s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar

  6. [保存] を選択してジョブを作成します。

  7. [実行] を選択してジョブを開始し、[実行] タブでジョブのステータスを確認します。

次の手順は、AWS CLI を使用して S3 テーブルをクエリする ETL ジョブを作成する方法を示しています。コマンドを使用するには、プレースホルダー値を独自の値に置き換えます。

前提条件
  1. AWS Glue ジョブを作成します。

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    注記

    (オプション) Amazon S3 Tables カタログを Apache Iceberg のアクセス方法として使用する場合は、--extra-jars パラメータを使用してクライアントカタログ JAR を --default-arguments に追加します。パラメータを追加するときに、入力プレースホルダーは独自のものに置き換えます。

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. ジョブを開始します。

    aws glue start-job-run \ --job-name etl-tables-job
  3. ジョブのステータスを確認するには、前のコマンドの実行 ID をコピーして、次のコマンドに入力します。

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad