使用 在 Amazon S3 資料表上執行 ETL 任務 AWS Glue - Amazon Simple Storage Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 在 Amazon S3 資料表上執行 ETL 任務 AWS Glue

AWS Glue 是一種無伺服器資料整合服務,可讓分析使用者輕鬆探索、準備、移動和整合來自多個來源的資料。您可以使用 AWS Glue 任務來執行擷取、轉換和載入 (ETL) 管道,將資料載入資料湖。如需詳細資訊 AWS Glue,請參閱《 AWS Glue 開發人員指南》中的什麼是 AWS Glue?

AWS Glue 任務會封裝連線至來源資料的指令碼、處理該指令碼,然後將其寫入資料目標。一般而言,任務會執行擷取、轉換和載入 (ETL) 指令碼。任務可以執行專為Apache Spark執行時間環境設計的指令碼。您可以監控任務執行以了解執行時間指標,例如完成狀態、持續時間和開始時間。

您可以使用 AWS Glue 任務來處理 S3 資料表中的資料,方法是透過與 AWS 分析服務的整合連線至資料表,或直接使用 Amazon S3 TablesIceberg REST 端點或適用於 的 Amazon S3 Tables Catalog 進行連線Apache Iceberg。本指南涵蓋開始使用 AWS Glue 與 S3 Tables 的基本步驟,包括:

注意

先決條件

您必須先設定 AWS Glue 可用於執行 AWS Glue 任務的 IAM 角色,並將 的 Amazon S3 Tables Catalog 上傳至可在執行任務時 AWS Glue 存取的 S3 Apache IcebergJAR儲存貯體,才能從任務查詢資料表。

  • 整合資料表儲存貯體與 AWS 分析服務

  • 建立 的 IAM 角色 AWS Glue

    • AmazonS3TablesFullAccess受管政策連接至角色。

    • AmazonS3FullAccess受管政策連接至角色。

  • (選用) 如果您為 使用 Amazon S3 Tables CatalogApache Iceberg,則需要下載用戶端目錄並將其JAR上傳至 S3 儲存貯體。

    下載目錄 JAR
    1. 檢查 Maven Central 上的最新版本。您可以使用瀏覽器或使用下列命令JAR從 Maven central 下載 。請務必將版本編號取代為最新版本。

      wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
    2. 將下載JAR的 上傳至 AWS Glue IAM 角色可存取的 S3 儲存貯體。您可以使用下列 AWS CLI 命令來上傳 JAR。請務必將版本編號取代為最新版本,並將儲存貯體名稱路徑取代為您自己的版本。

      aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

建立指令碼以連線至資料表儲存貯體

若要在執行 AWS Glue ETL 任務時存取資料表資料,請為 設定Apache Iceberg連線至 S3 資料表儲存貯體的Spark工作階段。您可以修改現有的指令碼以連線到資料表儲存貯體,或建立新的指令碼。如需建立 AWS Glue 指令碼的詳細資訊,請參閱《 AWS Glue 開發人員指南》中的教學課程:撰寫 AWS Glue for Spark 指令碼

您可以透過下列任一 S3 Tables 存取方法,將工作階段設定為連線至資料表儲存貯體:

  • S3 Tables 與 AWS 分析服務的整合

  • Amazon S3 TablesIceberg REST 端點

  • 的 Amazon S3 Tables Catalog Apache Iceberg

從下列存取方法中選擇,以檢視設定指示和組態範例。

AWS analytics services integration

使用 AWS 分析服務整合在 Spark上使用 AWS Glue 查詢資料表的先決條件是,您必須將資料表儲存貯體與 AWS 分析服務整合

您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。

使用 PySpark 指令碼

在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用 整合連線到資料表儲存貯體。

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
使用互動式 AWS Glue 工作階段

如果您將互動式筆記本工作階段與 AWS Glue 5.0 搭配使用,請在程式碼執行之前,使用儲存格中的%%configure魔術來指定相同的組態。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
Amazon S3 Tables Iceberg REST endpoint

您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。

使用 PySpark 指令碼

在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用端點連線至資料表儲存貯體。

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
使用互動式 AWS Glue 工作階段

如果您使用互動式筆記本工作階段搭配 AWS Glue 5.0,請在程式碼執行之前,使用儲存格中的%%configure魔術來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
Amazon S3 Tables Catalog for Apache Iceberg

您必須先Apache Iceberg下載最新的目錄 jar 並將其上傳至 S3 儲存貯體,才能使用 Amazon S3 Tables Catalog 連線到資料表。 S3 然後,當您建立任務時,您可以將用戶端目錄的路徑新增JAR為特殊參數。如需 中任務參數的詳細資訊 AWS Glue,請參閱《 AWS Glue 開發人員指南AWS Glue 》中的任務中使用的特殊參數

您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。

使用PySpark指令碼

在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用 連線到資料表儲存貯體JAR。將預留位置值取代為您自己的資料表儲存貯體資訊。

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
使用互動式 AWS Glue 工作階段

如果您使用互動式筆記本工作階段搭配 AWS Glue 5.0,請在程式碼執行之前,使用儲存格中的%%configure魔術來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}

範例指令碼

下列範例PySpark指令碼可用來測試使用 AWS Glue 任務查詢 S3 資料表。這些指令碼會連線至您的資料表儲存貯體並執行查詢:建立新的命名空間、建立範例資料表、將資料插入資料表,以及傳回資料表資料。若要使用指令碼,請將預留位置值取代為您擁有資料表儲存貯體的資訊。

根據您的 S3 Tables 存取方法,從下列指令碼中選擇。

S3 Tables integration with AWS analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Iceberg REST endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

建立查詢資料表 AWS Glue 的任務

下列程序示範如何設定連線至 S3 資料表儲存貯體 AWS Glue 的任務。您可以使用 AWS CLI 或使用 主控台搭配 AWS Glue Studio 指令碼編輯器來執行此操作。如需詳細資訊,請參閱AWS Glue 《 使用者指南》中的在 中編寫任務 AWS Glue

下列程序說明如何使用 AWS Glue Studio 指令碼編輯器來建立查詢 S3 資料表的 ETL 任務。

  1. 在 https://https://console.aws.amazon.com/glue/ 開啟 AWS Glue 主控台。

  2. 從導覽窗格中,選擇 ETL 任務

  3. 選擇指令碼編輯器,然後選擇上傳指令碼,然後上傳您建立來查詢 S3 資料表的PySpark指令碼。

  4. 選取任務詳細資訊索引標籤,然後針對基本屬性輸入下列內容。

    • 名稱中,輸入任務的名稱。

    • 針對 IAM 角色,選取您為其建立的角色 AWS Glue。

  5. (選用) 如果您使用 Amazon S3 Tables Catalog 進行Apache Iceberg存取方法,請展開進階屬性,並針對相依 JARs輸入上傳到 S3 儲存貯體的用戶端目錄 jar 的 S3 URI 做為先決條件。例如,s3:/amzn-s3-demo-bucket1//jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar

  6. 選擇儲存以建立任務。

  7. 選擇執行 開始任務,然後在執行索引標籤下檢閱任務狀態。

下列程序說明如何使用 AWS CLI 來建立查詢 S3 資料表的 ETL 任務。若要使用命令,請將預留位置值取代為您自己的值。

先決條件
  1. 建立 AWS Glue 任務。

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    注意

    (選用) 如果您使用 Amazon S3 Tables Catalog 進行Apache Iceberg存取方法,--default-arguments請使用 --extra-jars 參數將用戶端目錄新增至 JAR 。當您新增 參數時,將輸入預留位置取代為您自己的預留位置。

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. 啟動您的任務。

    aws glue start-job-run \ --job-name etl-tables-job
  3. 若要檢閱您的任務狀態,請從上一個命令複製執行 ID,並將其輸入至下列命令。

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad