本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
GlueContext 類別
包裝 Apache Spark SparkContext
__init__
__init__(sparkContext)
sparkContext– 欲使用的 Apache Spark 細節。
正在建立
getSource
getSource(connection_type, transformation_ctx = "", **options)
建立可用於從外部來源讀取 DynamicFrames 的 DataSource 物件。
connection_type– 要使用的連線類型,例如 Amazon Simple Storage Service (Amazon S3)、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver、oracle和dynamodb。transformation_ctx– 欲使用的轉換細節 (選用)。options– 選擇性的名稱/值對的集合。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
以下是 getSource 的使用範例:
>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()
create_dynamic_frame_from_rdd
create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")
傳回從 Apache Spark 彈性分散式資料集 (RDD) 建立的 DynamicFrame。
data– 欲使用的資料來源。name– 欲使用的資料名稱。schema– 欲使用的結構描述 (選用)。sample_ratio– 欲使用的取樣率 (選用)。transformation_ctx– 欲使用的轉換細節 (選用)。
create_dynamic_frame_from_catalog
create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)
傳回使用 Data Catalog 資料庫和資料表名稱建立的 DynamicFrame。使用此方法時,您可以透過指定的 AWS Glue Data Catalog 資料表上的資料表屬性提供 format_options,以及透過 additional_options 引數提供其他選項。
Database– 欲讀取的資料庫。table_name– 欲讀取的資料表的名稱。redshift_tmp_dir– 所要使用的 Amazon Redshift 暫時目錄 (選用)。transformation_ctx– 欲使用的轉換細節 (選用)。push_down_predicate– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需支援的來源和限制,請參閱在 使用 AWS Glue ETL 中的下推來最佳化讀取。如需更多詳細資訊,請參閱 使用 pushdown 述詞預先篩選。additional_options– 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了endpointUrl、streamName、bootstrap.servers、security.protocol、topicName、classification以及delimiter。另一個支援的選項是catalogPartitionPredicate:catalogPartitionPredicate— 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊,請參閱 AWS Glue 分割區索引。注意push_down_predicate和catalogPartitionPredicate使用不同的語法。前者使用 Spark SQL 標準語法,後者使用 JSQL 剖析器。catalog_id— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。
create_dynamic_frame_from_options
create_dynamic_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
傳回使用指定的連線和格式建立的 DynamicFrame。
connection_type– 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver、oracle和dynamodb。connection_options- 連線選項,例如路徑和資料庫資料表 (選用)。如果是connection_type的s3,會定義 Amazon S3 路徑清單。connection_options = {"paths": ["s3://aws-glue-target/temp"]}如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。
警告
不建議在指令碼中存放密碼。考慮使用
boto3從 AWS Secrets Manager 或 AWS Glue Data Catalog 擷取它們。connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}dbtable屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
format– 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。format_options– 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。transformation_ctx– 欲使用的轉換細節 (選用)。push_down_predicate– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需支援的來源和限制,請參閱在 使用 AWS Glue ETL 中的下推來最佳化讀取。如需詳細資訊,請參閱使用 Pushdown 述詞預先篩選。
create_sample_dynamic_frame_from_catalog
create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)
傳回使用 Data Catalog 資料庫和資料表名稱建立的範例 DynamicFrame。DynamicFrame 僅包含來自資料來源的第一個 num 記錄。
-
database– 欲讀取的資料庫。 -
table_name– 欲讀取的資料表的名稱。 -
num– 傳回的範例動態框架中記錄的最大數目。 redshift_tmp_dir:所要使用的 Amazon Redshift 臨時目錄 (選用)。-
transformation_ctx– 欲使用的轉換細節 (選用)。 push_down_predicate– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需更多詳細資訊,請參閱 使用 pushdown 述詞預先篩選。-
additional_options– 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了endpointUrl、streamName、bootstrap.servers、security.protocol、topicName、classification以及delimiter。 -
sample_options– 用於控制取樣行為的參數 (選用)。Amazon S3 來源的目前可用參數:maxSamplePartitions– 取樣將讀取的分割區數目上限。預設值為 10maxSampleFilesPerPartition– 取樣將在一個分割區中讀取的檔案數目上限。預設值為 10。這些參數有助於減少檔案清單所耗用的時間。例如,假設資料集有 1000 個分割區,並且每個分割區都有 10 個檔案。如果您設定
maxSamplePartitions= 10 和maxSampleFilesPerPartition= 10,而不是列出所有 10,000 個檔案,而是僅列出和讀取前 10 個分割區及每個分割區的前 10 個檔案 (總計為 10*10 = 100 個檔案)。
-
catalog_id– 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為None。None預設為服務中呼叫帳戶的目錄 ID。
create_sample_dynamic_frame_from_options
create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")
傳回使用指定的連線和格式建立的範例 DynamicFrame。DynamicFrame 僅包含來自資料來源的第一個 num 記錄。
connection_type– 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver、oracle和dynamodb。connection_options- 連線選項,例如路徑和資料庫資料表 (選用)。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。-
num– 傳回的範例動態框架中記錄的最大數目。 -
sample_options– 用於控制取樣行為的參數 (選用)。Amazon S3 來源的目前可用參數:maxSamplePartitions– 取樣將讀取的分割區數目上限。預設值為 10maxSampleFilesPerPartition– 取樣將在一個分割區中讀取的檔案數目上限。預設值為 10。這些參數有助於減少檔案清單所耗用的時間。例如,假設資料集有 1000 個分割區,並且每個分割區都有 10 個檔案。如果您設定
maxSamplePartitions= 10 和maxSampleFilesPerPartition= 10,而不是列出所有 10,000 個檔案,而是僅列出和讀取前 10 個分割區及每個分割區的前 10 個檔案 (總計為 10*10 = 100 個檔案)。
format– 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。format_options– 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。-
transformation_ctx– 欲使用的轉換細節 (選用)。 push_down_predicate– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需更多詳細資訊,請參閱 使用 pushdown 述詞預先篩選。
add_ingestion_time_columns
add_ingestion_time_columns(dataFrame, timeGranularity = "")
附加擷取時間欄 (如 ingest_year、ingest_month、ingest_day、ingest_hour、ingest_minute) 到輸入 DataFrame。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時,此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割,而不需要輸入資料中的明確擷取時間欄。
-
dataFrame– 要將擷取時間欄附加到的dataFrame。 -
timeGranularity– 時間欄的精密程度。有效值為 "day"、"hour" 和 "minute"。例如:如果 "hour" 被傳遞給函數,原始dataFrame會附加上 "ingest_year"、"ingest_month"、"ingest_day" 和 "ingest_hour" 時間欄。
傳回附加時間粒度欄後的資料框架。
範例:
dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))
create_data_frame_from_catalog
create_data_frame_from_catalog(database, table_name, transformation_ctx = "",
additional_options = {})
傳回使用 Data Catalog 資料表的資訊建立的 DataFrame。
-
database– 要從中讀取的 Data Catalog 資料庫。 -
table_name– 要從中讀取的 Data Catalog 資料表的名稱。 -
transformation_ctx– 欲使用的轉換細節 (選用)。 -
additional_options– 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出用於串流來源的項目,例如startingPosition、maxFetchTimeInMs以及startingOffsets。-
useSparkDataSource– 當設定為 true 時,強制 AWS Glue 使用原生 Spark Data Source API 來讀取資料表。Spark Data Source API 支援下列格式:AVRO、二進位、CSV、JSON、ORC、Parquet 和文字。在 Data Catalog 資料表中,您可以使用classification屬性指定格式。若要進一步了解 Spark Data Source API,請參閱官方 Apache Spark 文件。 將
create_data_frame_from_catalog與useSparkDataSource一起使用具有以下好處:-
直接傳回
DataFrame並提供create_dynamic_frame.from_catalog().toDF()的替代方案。 -
支援原生格式的 AWS Lake Formation 資料表層級許可控制。
-
支援讀取資料湖格式,無需 AWS Lake Formation 資料表層級許可控制。如需更多詳細資訊,請參閱 將資料湖架構與 AWS Glue ETL 任務搭配使用。
啟用
useSparkDataSource時,您也可以視需要在additional_options中新增任何 Spark Data Source 選項。AWSGlue 將這些選項直接傳遞給 Spark 閱讀器。 -
-
useCatalogSchema– 設定為 true 時,AWS Glue 會將 Data Catalog 結構描述套用至產生的DataFrame。否則,讀取器會從資料推斷結構描述。啟用useCatalogSchema時,也必須將useSparkDataSource設定為 true。
-
限制
使用 useSparkDataSource 選項時請考慮以下限制:
-
當您使用
useSparkDataSource時,AWS Glue 會在不同於原始 Spark 工作階段的個別 Spark 工作階段中建立新的DataFrame。 -
Spark DataFrame 分割區篩選不適用於以下 AWS Glue 功能。
若要搭配這些功能使用分割區篩選,您可以使用 AWS Glue 下推述詞。如需更多詳細資訊,請參閱 使用 pushdown 述詞預先篩選。篩選未分割資料欄不會受到影響。
下列範例指令碼示範使用
excludeStorageClasses選項執行分割區篩選的不正確方法。// Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")下列範例指令碼示範使用
excludeStorageClasses選項,利用下推述詞來執行分割區篩選的正確方法。// Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")
範例:使用 Spark Data Source 讀取器來建立 CSV 資料表
// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )
create_data_frame_from_options
create_data_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
此 API 現已棄用。請改用 getSource() API。傳回使用指定的連線和格式建立的 DataFrame。這個函數只能用於 AWS Glue 串流來源。
-
connection_type- 串流連線類型。有效值包括kinesis與kafka。 -
connection_options— 連線選項,這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 AWS Glue for Spark 中 ETL 的連線類型和選項 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處:-
Kinesis 串流來源需要
streamARN、startingPosition、inferSchema以及classification。 -
Kafka 串流來源需要
connectionName、topicName、startingOffsets、inferSchema以及classification。
-
-
format– 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項。 -
format_options– 指定格式的格式選項。如需支援格式選項的詳細資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項。 -
transformation_ctx– 欲使用的轉換細節 (選用)。
Amazon Kinesis 串流來源範例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kafka 串流來源範例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
forEachBatch
forEachBatch(frame, batch_function, options)
將傳入的 batch_function 套用至從串流來源讀取的每個微批次。
-
frame– 包含目前微批次的 DataFrame。 -
batch_function– 將套用至每個微批次的函數。 -
options– 索引鍵/值配對的集合,其中包含如何處理微批次的相關資訊。下列選項是必要的:-
windowSize– 處理每個批次的時間量。 -
checkpointLocation- 串流 ETL 任務的檢查點儲存位置。 -
batchMaxRetries– 如果失敗,可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及更高版本上才可設定。
-
範例:
glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )
在 Amazon S3 中使用資料集
purge_table
purge_table(catalog_id=None, database="", table_name="", options={},
transformation_ctx="")
從 Amazon S3 中刪除指定目錄資料庫和資料表的檔案。如果刪除分割區中的所有檔案,該分割區也會從目錄中刪除。對於向 Lake Formation 註冊的資料表,我們不支援 purge_table 動作。
如果您希望能夠復原已刪除的物件,您可以在 Amazon S3 儲存貯體上開啟物件版本控制。從未啟用物件版本控制的儲存貯體中刪除物件時,無法復原物件。如需如何復原已啟用版本控制之儲存貯體中已刪除物件的詳細資訊,請參閱 AWS Support 知識中心的如何擷取已刪除的 Amazon S3 物件?
-
catalog_id– 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為None。None預設為服務中呼叫帳戶的目錄 ID。 database– 所要使用的資料庫。table_name- 要使用的資料表名稱。options- 篩選要刪除之檔案和用於產生資訊清單檔案的選項。retentionPeriod- 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。partitionPredicate- 滿足此述詞的分割區會被刪除。這些分割區中仍在保留期間內的檔案不會被刪除。設定為""– 預設為空值。excludeStorageClasses- 不會刪除excludeStorageClasses集合中具有儲存體方案的檔案。預設為Set()– 空集合。manifestFilePath- 產生資訊清單檔案的選用路徑。所有已成功清除的檔案都會記錄在Success.csv中,失敗的則記錄在Failed.csv中
transformation_ctx– 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。
glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})purge_s3_path
purge_s3_path(s3_path, options={}, transformation_ctx="")
以遞迴方式刪除指定 Amazon S3 路徑中的檔案。
如果您希望能夠復原已刪除的物件,您可以在 Amazon S3 儲存貯體上開啟物件版本控制。從未開啟物件版本控制的儲存貯體中刪除物件時,無法復原物件。如需如何復原已啟用版本控制之儲存貯體中已刪除物件的詳細資訊,請參閱 Support 知識中心中的如何擷取已刪除的 Amazon S3 物件?
s3_path- 要刪除之檔案的 Amazon S3 路徑,格式為s3://<bucket>/<prefix>/options- 篩選要刪除之檔案和用於產生資訊清單檔案的選項。retentionPeriod- 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。excludeStorageClasses- 不會刪除excludeStorageClasses集合中具有儲存體方案的檔案。預設為Set()– 空集合。manifestFilePath- 產生資訊清單檔案的選用路徑。所有已成功清除的檔案都會記錄在Success.csv中,失敗的則記錄在Failed.csv中
transformation_ctx– 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})transition_table
transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)
針對指定之目錄的資料庫和資料表,轉換儲存在 Amazon S3 上之檔案的儲存體方案。
您可以在任意兩個儲存體方案之間轉換。對於 GLACIER 和 DEEP_ARCHIVE 儲存體方案,您可以轉換到這些方案。但是,您可以使用 S3 RESTORE 從 GLACIER 和 DEEP_ARCHIVE 儲存體方案轉換。
如果您執行的 AWS Glue ETL 任務會從 Amazon S3 讀取檔案或分割區,則您可排除部分 Amazon S3 儲存類別類型。如需詳細資訊,請參閱排除 Amazon S3 儲存體方案。
database– 所要使用的資料庫。table_name- 要使用的資料表名稱。transition_to– 要轉移的 Amazon S3 儲存方案。options- 篩選要刪除之檔案和用於產生資訊清單檔案的選項。retentionPeriod- 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。partitionPredicate- 滿足此述詞的分割區會被轉換。這些分割區中仍在保留期間內的檔案不會被轉換。設定為""– 預設為空值。excludeStorageClasses- 不會轉換excludeStorageClasses集合中具有儲存體方案的檔案。預設為Set()– 空集合。manifestFilePath- 產生資訊清單檔案的選用路徑。所有已成功轉換的檔案都會記錄在Success.csv中,失敗的則記錄在Failed.csv中accountId– 要執行轉移轉換的 Amazon Web Services 帳戶 ID。對於這種轉換是強制性的。roleArn- 執行轉移轉換的 AWS 角色。對於這種轉換是強制性的。
transformation_ctx– 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。catalog_id– 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為None。None預設為服務中呼叫帳戶的目錄 ID。
glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})transition_s3_path
transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")
以遞迴方式轉換指定 Amazon S3 路徑中檔案的儲存體方案。
您可以在任意兩個儲存體方案之間轉換。對於 GLACIER 和 DEEP_ARCHIVE 儲存體方案,您可以轉換到這些方案。但是,您可以使用 S3 RESTORE 從 GLACIER 和 DEEP_ARCHIVE 儲存體方案轉換。
如果您執行的 AWS Glue ETL 任務會從 Amazon S3 讀取檔案或分割區,則您可排除部分 Amazon S3 儲存類別類型。如需詳細資訊,請參閱排除 Amazon S3 儲存體方案。
s3_path- 要以格式s3://<轉換之檔案的 Amazon S3 路徑。bucket>/<prefix>/transition_to– 要轉移的 Amazon S3 儲存方案。options- 篩選要刪除之檔案和用於產生資訊清單檔案的選項。retentionPeriod- 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。partitionPredicate- 滿足此述詞的分割區會被轉換。這些分割區中仍在保留期間內的檔案不會被轉換。設定為""– 預設為空值。excludeStorageClasses- 不會轉換excludeStorageClasses集合中具有儲存體方案的檔案。預設為Set()– 空集合。manifestFilePath- 產生資訊清單檔案的選用路徑。所有已成功轉換的檔案都會記錄在Success.csv中,失敗的則記錄在Failed.csv中accountId– 要執行轉移轉換的 Amazon Web Services 帳戶 ID。對於這種轉換是強制性的。roleArn- 執行轉移轉換的 AWS 角色。對於這種轉換是強制性的。
transformation_ctx– 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。
glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})擷取
extract_jdbc_conf
extract_jdbc_conf(connection_name, catalog_id = None)
從 Data Catalog 中的 AWS Glue 連線物件傳回含索引鍵 (具有組態屬性) 的 dict。
user:資料庫使用者名稱。password:資料庫密碼。vendor:指定廠商 (mysql、postgresql、oracle、sqlserver等)。enforceSSL:布林字串,指示是否需要安全連線。customJDBCCert:使用指定 Amazon S3 路徑中的特定用戶端憑證。skipCustomJDBCCertValidation:布林字串,指示customJDBCCert必須由 CA 驗證。customJDBCCertString:有關自訂憑證的其他資訊,因驅動程式類型而異。url:(已棄用) 僅包含通訊協定、伺服器和連接埠的 JDBC URL。fullUrl:建立連線時輸入的 JDBC URL (適用於 AWS Glue 3.0 版或更新版本)。
擷取 JDBC 組態的範例:
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}
交易
start_transaction
start_transaction(read_only)
開始新交易。內部呼叫 Lake Formation startTransaction API。
read_only– (布林值) 指出此交易應該是唯讀,還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。
傳回交易 ID。
commit_transaction
commit_transaction(transaction_id, wait_for_commit = True)
嘗試遞交指定的交易。commit_transaction 可能會在交易完成遞交之前返回。內部呼叫 Lake Formation commitTransaction API。
transaction_id– (字串) 要遞交的交易。wait_for_commit– (布林值) 決定commit_transaction是否立即傳回。預設值為 true。如為 False,commit_transaction輪詢並等待,直到交易完成遞交。使用指數退避時,等待時間長度限制為 1 分鐘,最多可嘗試 6 次重試。
傳回一個布林值,指示遞交是否完成。
cancel_transaction
cancel_transaction(transaction_id)
嘗試取消指定的交易。如果交易先前已遞交,傳回 TransactionCommittedException 例外狀況。內部呼叫 Lake Formation CancelTransaction API。
-
transaction_id– (字串) 要取消的交易。
寫入
getSink
getSink(connection_type, format = None, transformation_ctx = "", **options)
取得可用於將 DynamicFrames 寫入外部來源的 DataSink 物件。請先檢查 SparkSQL format 以確保取得預期的目的地。
connection_type– 要使用的連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver、oracle、kinesis和kafka。format– 要使用的 SparkSQL 格式 (選用)。transformation_ctx– 欲使用的轉換細節 (選用)。options– 名稱/值對的集合,用來指定連線選項。一些可能的值為:-
user和password:適用於授權 -
url:資料存放區的端點 -
dbtable:目標資料表的名稱。 -
bulkSize:插入操作的平行程度
-
您可以指定的選項取決於連線類型。如需其他值和範例,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
範例:
>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)
write_dynamic_frame_from_options
write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None,
format_options={}, transformation_ctx = "")
使用指定的連線和格式來撰寫並傳回 DynamicFrame。
frame– 所要撰寫的DynamicFrame。connection_type– 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver、oracle、kinesis和kafka。connection_options– 連線選項,例如路徑和資料庫資料表 (選用)。如果是connection_type的s3,會定義 Amazon S3 路徑。connection_options = {"path": "s3://aws-glue-target/temp"}如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。
警告
不建議在指令碼中存放密碼。考慮使用
boto3從 AWS Secrets Manager 或 AWS Glue Data Catalog 擷取它們。connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}dbtable屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
format– 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。format_options– 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。transformation_ctx– 所要使用的轉換細節 (選用)。
write_from_options
write_from_options(frame_or_dfc, connection_type,
connection_options={}, format={}, format_options={}, transformation_ctx = "")
寫入和傳回以指定的連線和格式資訊建立的 DynamicFrame 或 DynamicFrameCollection。
frame_or_dfc– 所要撰寫的DynamicFrame或DynamicFrameCollection。connection_type– 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3、mysql、postgresql、redshift、sqlserver及oracle。connection_options– 連線選項,例如路徑和資料庫資料表 (選用)。如果是connection_type的s3,會定義 Amazon S3 路徑。connection_options = {"path": "s3://aws-glue-target/temp"}如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。
警告
不建議在指令碼中存放密碼。考慮使用
boto3從 AWS Secrets Manager 或 AWS Glue Data Catalog 擷取它們。connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}dbtable屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。
format– 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。format_options– 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。transformation_ctx– 所要使用的轉換細節 (選用)。
write_dynamic_frame_from_catalog
write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)
使用來自 Data Catalog 資料庫和資料表的資訊寫入並傳回 DynamicFrame。
frame– 所要撰寫的DynamicFrame。Database– 包含資料表的 Data Catalog 資料庫。table_name– 與目標關聯的 Data Catalog 資料表名稱。redshift_tmp_dir– 所要使用的 Amazon Redshift 暫時目錄 (選用)。transformation_ctx– 欲使用的轉換細節 (選用)。-
additional_options– 選擇性的名稱/值對的集合。 catalog_id— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。
write_data_frame_from_catalog
write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir,
transformation_ctx = "", additional_options = {}, catalog_id = None)
使用來自 Data Catalog 資料庫和資料表的資訊寫入並傳回 DataFrame。此方法支援寫入資料湖格式 (Hudi、Iceberg 和 Delta Lake)。如需更多詳細資訊,請參閱 將資料湖架構與 AWS Glue ETL 任務搭配使用。
frame– 所要撰寫的DataFrame。Database– 包含資料表的 Data Catalog 資料庫。table_name– 與目標關聯的 Data Catalog 資料表名稱。redshift_tmp_dir:所要使用的 Amazon Redshift 臨時目錄 (選用)。transformation_ctx– 欲使用的轉換細節 (選用)。-
additional_options– 選擇性的名稱/值對的集合。-
useSparkDataSink– 當設定為 true 時,強制 AWS Glue 使用原生 Spark Data Sink API 來寫入資料表。啟用此選項時,您可以視需要將任何 Spark Data Source 選項新增至 additional_options。AWSGlue 將這些選項直接傳遞給 Spark 寫入器。
-
catalog_id– 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。如果您未指定值,則會使用發起人的預設帳戶 ID。
限制
使用 useSparkDataSink 選項時請考慮以下限制:
-
使用
useSparkDataSink選項時,不支援 enableUpdateCatalog 選項。
範例:使用 Spark Data Source 寫入器寫入 Hudi 資料表
hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name':<table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name':<table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':<database_name>, 'hoodie.datasource.hive_sync.table':<table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame =<df_product_inserts>, database =<database_name>, table_name =<table_name>, additional_options = hudi_options )
write_dynamic_frame_from_jdbc_conf
write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={},
redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
使用指定的 JDBC 連線資訊撰寫並傳回 DynamicFrame。
frame– 所要撰寫的DynamicFrame。catalog_connection– 所要使用的目錄連線。connection_options– 連線選項,例如路徑和資料庫資料表 (選用)。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。redshift_tmp_dir– 所要使用的 Amazon Redshift 暫時目錄 (選用)。transformation_ctx– 所要使用的轉換細節 (選用)。catalog_id— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。
write_from_jdbc_conf
write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
使用指定的 JDBC 連線資訊撰寫並傳回 DynamicFrame 或 DynamicFrameCollection。
frame_or_dfc– 所要撰寫的DynamicFrame或DynamicFrameCollection。catalog_connection– 所要使用的目錄連線。connection_options– 連線選項,例如路徑和資料庫資料表 (選用)。如需更多詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。redshift_tmp_dir– 所要使用的 Amazon Redshift 暫時目錄 (選用)。transformation_ctx– 所要使用的轉換細節 (選用)。catalog_id— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。