使用 AWS Glue 在 Amazon S3 Tables 上執行 ETL 任務
AWS Glue 是無伺服器資料整合服務,讓分析使用者可從多個來源輕鬆探索、準備、移動和整合資料。您可以使用 AWS Glue 任務執行擷取、轉換和載入 (ETL) 管道,以將資料載入資料湖。如需 AWS Glue 的相關資訊,請參閱《AWS Glue 開發人員指南》中的什麼是 AWS Glue?。
AWS Glue 任務封裝了一個指令碼,會連線至您的來源資料、處理資料,然後將它寫出至您的資料目標。一般而言,任務會執行擷取、轉換和載入 (ETL) 指令碼。任務可以執行專為 Apache Spark 執行時期環境設計的指令碼。您可以監控任務執行以了解執行時間指標,例如完成狀態、持續時間和開始時間。
您可以透過與 AWS 分析服務的整合來連線至您的資料表,或直接使用 Amazon S3 Tables Iceberg REST 端點或 Amazon S3 Tables Catalog for Apache Iceberg 進行連線,以使用 AWS Glue 任務來處理資料。本指南涵蓋開始使用 AWS Glue 與 S3 Tables 的基本步驟,包括:
根據您的特定 AWS Glue ETL 任務要求來選擇您的存取方法:
-
AWS 分析服務整合 (建議) – 當您需要跨多個 AWS 分析服務進行集中式中繼資料管理、需要藉由 Lake Formation 來利用現有的 AWS Glue Data Catalog 許可權和精細存取控制權,或正在建置與 Athena 或 Amazon EMR 等其他 AWS 服務整合的生產 ETL 管道時,建議使用此項。
-
Amazon S3 Tables Iceberg REST 端點 – 當您需要從支援 Apache Iceberg 的第三方查詢引擎連線至 S3 資料表、建置需要直接 REST API 存取權的自訂 ETL 應用程式,或當您需要控制目錄操作而不依賴 AWS Glue Data Catalog 時,建議使用此項。
-
Amazon S3 Tables Catalog for Apache Iceberg – 僅用於需要 Java 用戶端程式庫的舊版應用程式,或特定程式設計案例。由於額外的 JAR 相依性管理和複雜性,因此不建議將此方法用於新的 AWS Glue ETL 任務實作。
步驟 1 – 事前準備
您必須先將 IAM 角色設定為 AWS Glue 可用來執行AWS Glue任務,才能從任務查詢資料表。選擇您的存取方法,以查看該方法的特定先決條件。
- AWS analytics services integration (Recommended)
-
使用 S3 Tables AWS 分析整合以執行 AWS Glue 任務時所需的先決條件。
- Amazon S3 Tables Iceberg REST endpoint
-
使用 Amazon S3 Tables Iceberg REST 端點執行 AWS Glue ETL 任務的先決條件。
- Amazon S3 Tables Catalog for Apache Iceberg
-
先決條件使用 Amazon S3 Tables Catalog for Apache Iceberg 執行 AWS Glue ETL 任務。
-
為 AWS Glue 建立 IAM 角色。
-
將 AmazonS3TablesFullAccess 受管政策連接至角色。
-
將 AmazonS3FullAccess 受管政策連接至角色。
-
若要使用 Amazon S3 Tables Catalog for Apache Iceberg,您需要下載用戶端目錄 JAR,並將其上傳至 S3 儲存貯體。
下載目錄 JAR
-
檢查 Maven Central 上的最新版本。您可以使用瀏覽器或使用下列命令,從 Maven 中央下載 JAR。請務必將 version number 取代為最新版本。
wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
-
將下載的 JAR 上傳至您的 AWS Glue IAM 角色可存取的 S3 儲存貯體。您可以使用下列 AWS CLI 命令上傳 JAR。請務必將 version number 取代為最新版本,並使用您自己的 bucket name 和 path。
aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/
步驟 2 – 建立指令碼以連線至資料表儲存貯體
若要在執行 AWS Glue ETL 任務時存取資料表資料,請為連線至 S3 資料表儲存貯體的 Apache Iceberg 設定 Spark 工作階段。您可以修改現有的指令碼以連線到資料表儲存貯體,或建立新的指令碼。如需建立 AWS Glue 指令碼的詳細資訊,請參閱《AWS Glue 開發人員指南》中的教學課程:為 Spark 指令碼撰寫 AWS Glue。
您可以透過下列任何 S3 Tables 存取方法,將工作階段設定為連線至資料表儲存貯體:
-
S3 Tables AWS 分析服務整合 (建議)
-
Amazon S3 Tables Iceberg REST 端點
-
Amazon S3 Tables Catalog for Apache Iceberg
從下列存取方法中選擇,以檢視設定指示和組態範例。
- AWS analytics services integration (Recommended)
-
利用 Spark 在 AWS Glue 上使用 AWS 分析服務整合查詢資料表的先決條件是,您必須將資料表儲存貯體與 AWS 分析服務整合
您可以透過任務中的 Spark 工作階段,或在互動式工作階段中使用 AWS Glue Studio 魔術命令,設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。
- 使用 PySpark 指令碼
-
在 PySpark 指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用整合來連線至資料表儲存貯體。
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
-
如果您與 AWS Glue 5.0 搭配使用互動式筆記本工作階段,請在程式碼執行之前使用儲存格中的 %%configure 魔術命令來指定相同的組態。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
- Amazon S3 Tables Iceberg REST endpoint
-
您可以透過任務中的 Spark 工作階段,或在互動式工作階段中使用 AWS Glue Studio 魔術命令,設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。
- 使用 PySpark 指令碼
在 PySpark 指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用端點來連線至資料表儲存貯體。
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
如果您與 AWS Glue 5.0 搭配使用互動式筆記本工作階段,請在程式碼執行之前使用儲存格中的 %%configure 魔術命令來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
- Amazon S3 Tables Catalog for Apache Iceberg
-
您必須先下載最新的目錄 jar 並將其上傳至 S3 儲存貯體,才能使用 Amazon S3 Tables Catalog for Apache Iceberg 連線到資料表。然後,當您建立任務時,可以將用戶端目錄 JAR 的路徑新增為特殊參數。如需 AWS Glue 中任務參數的詳細資訊,請參閱《AWS Glue 開發人員指南》中的在 AWS Glue 任務中使用的特殊參數。
您可以透過任務中的 Spark 工作階段,或在互動式工作階段中使用 AWS Glue Studio 魔術命令,設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值取代為您自己的資料表儲存貯體資訊。
- 使用 PySpark 指令碼
-
在 PySpark 指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用 JAR 來連線至資料表儲存貯體。將預留位置值取代為您自己的資料表儲存貯體資訊。
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
-
如果您與 AWS Glue 5.0 搭配使用互動式筆記本工作階段,請在程式碼執行之前使用儲存格中的 %%configure 魔術命令來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}
範例指令碼
下列範例 PySpark 指令碼可用來測試使用 AWS Glue 任務查詢 S3 資料表。這些指令碼會連線至您的資料表儲存貯體並執行查詢:建立新的命名空間、建立範例資料表、將資料插入資料表,以及傳回資料表資料。若要使用指令碼,請將預留位置值取代為您擁有資料表儲存貯體的資訊。
根據您的 S3 Tables 存取方法,從下列指令碼中選擇。
- S3 Tables integration with AWS analytics services
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
namespace = "new_namespace"
table = "new_table"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Iceberg REST endpoint
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
namespace = "s3_tables_rest_namespace"
table = "new_table_s3_rest"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Catalog for Apache Iceberg
-
from pyspark.sql import SparkSession
#Spark session configurations
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
#Script
namespace = "new_namespace"
table = "new_table"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
步驟 3 – 建立查詢資料表的 AWS Glue 任務
下列程序示範如何設定連線至 S3 資料表儲存貯體的 AWS Glue 任務。您可以使用 AWS CLI,或搭配 AWS Glue Studio 指令碼編輯器使用主控台來執行此操作。如需詳細資訊,請參閱《AWS Glue 使用者指南》中的在 AWS Glue 中授權任務。
下列程序示範如何使用 AWS Glue Studio 指令碼編輯器來建立查詢 S3 資料表的 ETL 任務。
前往 https://console.aws.amazon.com/glue/ 開啟 AWS Glue 主控台。
-
從導覽窗格中選擇 ETL 任務。
-
選擇指令碼編輯器,然後選擇上傳指令碼,然後上傳您建立來查詢 S3 資料表的 PySpark 指令碼。
-
選取任務詳細資訊索引標籤,然後針對基本屬性輸入下列內容。
-
(選用) 如果您使用 Amazon S3 Tables Catalog for Apache Iceberg 存取方法,請展開進階屬性,並針對相依 JAR 路徑,輸入您上傳到 S3 儲存貯體的用戶端目錄 jar 的 S3 URI 做為先決條件。例如 s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
-
選擇儲存以建立任務。
-
選擇執行開始執行任務,然後在執行索引標籤下檢閱任務狀態。
下列程序示範如何使用 AWS CLI 來建立負責查詢 S3 資料表的 ETL 任務。若要使用命令,請將預留位置值取代為您自己的值。
-
建立 AWS Glue 任務。
aws glue create-job \
--name etl-tables-job \
--role arn:aws:iam::111122223333:role/AWSGlueServiceRole \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--job-language": "python",
"--class": "GlueApp"
}' \
--glue-version "5.0"
(選用) 如果您使用 Amazon S3 Tables Catalog for Apache Iceberg 存取方法,請使用 --extra-jars 參數將用戶端目錄 JAR 新增至 --default-arguments。當您新增參數時,將 input placeholders 取代為您自己的預留位置。
"--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
-
開始執行您的任務。
aws glue start-job-run \
--job-name etl-tables-job
-
若要檢閱您的任務狀態,請從上一個命令複製執行 ID,並將其輸入至下列命令。
aws glue get-job-run --job-name etl-tables-job \
--run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad