本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 在 Amazon S3 資料表上執行 ETL 任務 AWS Glue
AWS Glue 是一種無伺服器資料整合服務,可讓分析使用者輕鬆探索、準備、移動和整合來自多個來源的資料。您可以使用 AWS Glue 任務來執行擷取、轉換和載入 (ETL) 管道,將資料載入資料湖。如需詳細資訊 AWS Glue,請參閱《 AWS Glue 開發人員指南》中的什麼是 AWS Glue?。
AWS Glue 任務會封裝連線至來源資料的指令碼、處理該指令碼,然後將其寫入資料目標。一般而言,任務會執行擷取、轉換和載入 (ETL) 指令碼。任務可以執行專為Apache Spark執行時間環境設計的指令碼。您可以監控任務執行以了解執行時間指標,例如完成狀態、持續時間和開始時間。
您可以使用 AWS Glue 任務來處理 S3 資料表中的資料,方法是透過與 AWS 分析服務的整合連線至資料表,或直接使用 Amazon S3 TablesIceberg REST 端點或適用於 的 Amazon S3 Tables Catalog 進行連線Apache Iceberg。本指南涵蓋開始使用 AWS Glue 與 S3 Tables 的基本步驟,包括:
先決條件
您必須先設定 AWS Glue 可用於執行 AWS Glue 任務的 IAM 角色,並將 的 Amazon S3 Tables Catalog 上傳至可在執行任務時 AWS Glue 存取的 S3 Apache IcebergJAR儲存貯體,才能從任務查詢資料表。
建立指令碼以連線至資料表儲存貯體
若要在執行 AWS Glue ETL 任務時存取資料表資料,請為 設定Apache Iceberg連線至 S3 資料表儲存貯體的Spark工作階段。您可以修改現有的指令碼以連線到資料表儲存貯體,或建立新的指令碼。如需建立 AWS Glue 指令碼的詳細資訊,請參閱《 AWS Glue 開發人員指南》中的教學課程:撰寫 AWS Glue for Spark 指令碼。
您可以透過下列任一 S3 Tables 存取方法,將工作階段設定為連線至資料表儲存貯體:
從下列存取方法中選擇,以檢視設定指示和組態範例。
- AWS analytics services integration
-
使用 AWS 分析服務整合在 Spark上使用 AWS Glue 查詢資料表的先決條件是,您必須將資料表儲存貯體與 AWS 分析服務整合
您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值
取代為您自己的資料表儲存貯體資訊。
- 使用 PySpark 指令碼
-
在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用 整合連線到資料表儲存貯體。
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket
/warehouse/") \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
-
如果您將互動式筆記本工作階段與 AWS Glue 5.0 搭配使用,請在程式碼執行之前,使用儲存格中的%%configure
魔術來指定相同的組態。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
--conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket
/warehouse/"}
- Amazon S3 Tables Iceberg REST endpoint
-
您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值
取代為您自己的資料表儲存貯體資訊。
- 使用 PySpark 指令碼
在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用端點連線至資料表儲存貯體。
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region
.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region
:111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region
") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
如果您使用互動式筆記本工作階段搭配 AWS Glue 5.0,請在程式碼執行之前,使用儲存格中的%%configure
魔術來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region
.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region
:111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
--conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region
--conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
- Amazon S3 Tables Catalog for Apache Iceberg
-
您必須先Apache Iceberg下載最新的目錄 jar 並將其上傳至 S3 儲存貯體,才能使用 Amazon S3 Tables Catalog 連線到資料表。 S3 然後,當您建立任務時,您可以將用戶端目錄的路徑新增JAR為特殊參數。如需 中任務參數的詳細資訊 AWS Glue,請參閱《 AWS Glue 開發人員指南AWS Glue 》中的任務中使用的特殊參數。
您可以透過任務中的Spark工作階段或在互動式工作階段中使用 AWS Glue Studio 魔術設定與資料表儲存貯體的連線。若要使用下列範例,請將預留位置值
取代為您自己的資料表儲存貯體資訊。
- 使用PySpark指令碼
-
在PySpark指令碼中使用下列程式碼片段,設定 AWS Glue 任務以使用 連線到資料表儲存貯體JAR。將預留位置值取代為您自己的資料表儲存貯體資訊。
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region
:111122223333:bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
- 使用互動式 AWS Glue 工作階段
-
如果您使用互動式筆記本工作階段搭配 AWS Glue 5.0,請在程式碼執行之前,使用儲存格中的%%configure
魔術來指定相同的組態。將預留位置值取代為您自己的資料表儲存貯體資訊。
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region
:111122223333:bucket/amzn-s3-demo-table-bucket
", "extra-jars": "s3://amzn-s3-demo-bucket
/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar"}
範例指令碼
下列範例PySpark指令碼可用來測試使用 AWS Glue 任務查詢 S3 資料表。這些指令碼會連線至您的資料表儲存貯體並執行查詢:建立新的命名空間、建立範例資料表、將資料插入資料表,以及傳回資料表資料。若要使用指令碼,請將預留位置值
取代為您擁有資料表儲存貯體的資訊。
根據您的 S3 Tables 存取方法,從下列指令碼中選擇。
- S3 Tables integration with AWS analytics services
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333
:s3tablescatalog/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket
/bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
namespace = "new_namespace
"
table = "new_table
"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Iceberg REST endpoint
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region
.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region
:111122223333
:bucket/amzn-s3-demo-table-bucket
") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region
") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
namespace = "s3_tables_rest_namespace
"
table = "new_table_s3_rest
"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Catalog for Apache Iceberg
-
from pyspark.sql import SparkSession
#Spark session configurations
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region
:111122223333
:bucket/amzn-s3-demo-table-bucket
") \
.getOrCreate()
#Script
namespace = "new_namespace
"
table = "new_table
"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
建立查詢資料表 AWS Glue 的任務
下列程序示範如何設定連線至 S3 資料表儲存貯體 AWS Glue 的任務。您可以使用 AWS CLI 或使用 主控台搭配 AWS Glue Studio 指令碼編輯器來執行此操作。如需詳細資訊,請參閱AWS Glue 《 使用者指南》中的在 中編寫任務 AWS Glue。
下列程序說明如何使用 AWS Glue Studio 指令碼編輯器來建立查詢 S3 資料表的 ETL 任務。
在 https://https://console.aws.amazon.com/glue/ 開啟 AWS Glue 主控台。
-
從導覽窗格中,選擇 ETL 任務。
-
選擇指令碼編輯器,然後選擇上傳指令碼,然後上傳您建立來查詢 S3 資料表的PySpark指令碼。
-
選取任務詳細資訊索引標籤,然後針對基本屬性輸入下列內容。
-
(選用) 如果您使用 Amazon S3 Tables Catalog 進行Apache Iceberg存取方法,請展開進階屬性,並針對相依 JARs輸入上傳到 S3 儲存貯體的用戶端目錄 jar 的 S3 URI 做為先決條件。例如,s3:/amzn-s3-demo-bucket1
//jars
/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar
-
選擇儲存以建立任務。
-
選擇執行 開始任務,然後在執行索引標籤下檢閱任務狀態。
下列程序說明如何使用 AWS CLI 來建立查詢 S3 資料表的 ETL 任務。若要使用命令,請將預留位置值
取代為您自己的值。
-
建立 AWS Glue 任務。
aws glue create-job \
--name etl-tables-job
\
--role arn:aws:iam::111122223333
:role/AWSGlueServiceRole
\
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://amzn-s3-demo-bucket1
/scripts/glue-etl-query.py
",
"PythonVersion": "3"
}' \
--default-arguments '{
"--job-language": "python",
"--class": "GlueApp"
}' \
--glue-version "5.0"
(選用) 如果您使用 Amazon S3 Tables Catalog 進行Apache Iceberg存取方法,--default-arguments
請使用 --extra-jars
參數將用戶端目錄新增至 JAR 。當您新增 參數時,將輸入預留位置
取代為您自己的預留位置。
"--extra-jars": "s3://amzn-s3-demo-bucket
/jar-path
/s3-tables-catalog-for-iceberg-runtime-0.1.5
.jar"
-
啟動您的任務。
aws glue start-job-run \
--job-name etl-tables-job
-
若要檢閱您的任務狀態,請從上一個命令複製執行 ID,並將其輸入至下列命令。
aws glue get-job-run --job-name etl-tables-job
\
--run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad