本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS Glue Scala GlueContext API
Package: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )GlueContext 是讀取和寫入 DynamicFrame 至 Amazon Simple Storage Service (Amazon S3)、 AWS Glue Data Catalog 、JDBC 等的進入點。此類別提供公用程式函數來建立 DataSource 特徵 和 DataSink 物件,從而用於讀取和寫入 DynamicFrame。
如果從來源建立的分割區數低於分割區的閾值下限 (預設 10),您也可以使用 GlueContext 來設定在 DynamicFrame 中的分割區目標數 (預設 20)。
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
附加擷取時間欄 (如 ingest_year、ingest_month、ingest_day、ingest_hour、ingest_minute) 到輸入 DataFrame。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時,此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割,而不需要輸入資料中的明確擷取時間欄。
-
dataFrame– 要將擷取時間欄附加到的dataFrame。 -
timeGranularity– 時間欄的精密程度。有效值為 "day"、"hour" 和 "minute"。例如:如果 "hour" 被傳遞給函數,原始dataFrame會附加上 "ingest_year"、"ingest_month"、"ingest_day" 和 "ingest_hour" 時間欄。
傳回附加時間粒度欄後的資料框架。
範例:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
傳回使用指定的連線和格式建立的 DataFrame。此函數僅適用於 Glue AWS 串流來源。
connectionType– 串流連線類型。有效值包括kinesis與kafka。-
connectionOptions– 連線選項,這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 AWS Glue for Spark 中 ETL 的連線類型和選項 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處:-
Kinesis 串流來源需要
streamARN、startingPosition、inferSchema以及classification。 -
Kafka 串流來源需要
connectionName、topicName、startingOffsets、inferSchema以及classification。
-
transformationContext– 要使用的轉換細節 (選用)。format– 格式化規格 (選用)。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項formatOptions– 指定格式的格式選項。如需支援格式選項的詳細資訊,請參閱 資料格式選項。
Amazon Kinesis 串流來源範例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Kafka 串流來源範例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
將傳入的 batch_function 套用至從串流來源讀取的每個微批次。
-
frame– 包含目前微批次的 DataFrame。 -
batch_function– 將套用至每個微批次的函數。 -
options– 索引鍵/值配對的集合,其中包含如何處理微批次的相關資訊。下列選項是必要的:-
windowSize– 處理每個批次的時間量。 -
checkpointLocation- 串流 ETL 任務的檢查點儲存位置。 -
batchMaxRetries– 如果失敗,可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及以上版本上才可設定。
-
範例:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
建立 DataSink,以便寫入 Data Catalog 中定義之資料表中指定的位置。
database— Data Catalog 中的資料庫名稱。tableName— Data Catalog 中的資料表名稱。redshiftTmpDir— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。additionalOptions– 提供給 AWS Glue 的額外選項。catalogId— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
傳回 DataSink。
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
建立 DataSource 特徵,以便從 Data Catalog 中的資料表定義中讀取資料。
database— Data Catalog 中的資料庫名稱。tableName— Data Catalog 中的資料表名稱。redshiftTmpDir— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。pushDownPredicate– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需詳細資訊,請參閱使用 pushdown 述詞預先篩選。additionalOptions– 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了endpointUrl、streamName、bootstrap.servers、security.protocol、topicName、classification以及delimiter。另一個支援的選項是catalogPartitionPredicate:catalogPartitionPredicate— 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊,請參閱 AWS Glue 分割區索引。注意push_down_predicate和catalogPartitionPredicate使用不同的語法。前者使用 Spark SQL 標準語法,後者使用 JSQL 剖析器。catalogId— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
傳回 DataSource。
串流來源範例
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
建立 DataSink,以便寫入 Data Catalog 中 Connection 物件所指定的 JDBC 資料庫。此 Connection 物件擁有用來對 JDBC 目的地連線的資訊 (包括 URL、使用者名稱、密碼、VPC、子網路和安全群組)。
catalogConnection— Data Catalog 中的連線名稱,其中包含要做為寫入目的地之 JDBC URL。options— JSON 名稱值組的字串,可提供寫入 JDBC 資料存放區所需的其他資訊。其中包含:dbtable (必要) — JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定
schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。以下範例說明 options 參數,它會指向資料庫test_db中名為test的結構描述和名為test_table的資料表。options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")database (必要) — JDBC 資料庫的名稱。
任何其他選項都會直接傳遞至 SparkSQL JDBC 寫入器。如需詳細資訊,請參閱 Spark 的 Redshift 資料來源
。
redshiftTmpDir— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。catalogId— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
範例程式碼:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
傳回 DataSink。
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
建立 DataSink,將資料寫入 Amazon Simple Storage Service (Amazon S3)、JDBC 或 AWS Glue Data Catalog 等目的地,或 Apache Kafka 或 Amazon Kinesis 資料串流。
-
connectionType— 連線的類型。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。 -
connectionOptions— JSON 名稱值組的字串,可提供與資料目的地建立連線的額外資料。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。 -
transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
傳回 DataSink。
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
建立 DataSink,將資料寫入 Amazon S3、JDBC 或 Data Catalog 等目的地,或 Apache Kafka 或 Amazon Kinesis 資料串流。也會設定要寫入目的地的資料格式。
connectionType— 連線的類型。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。-
options— JSON 名稱值組的字串,可提供與資料目的地建立連線的額外資料。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。 transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。format— 要從目的地寫出的資料格式。formatOptions— JSON 名稱值組的字串,會提供在目的地格式化資料的其他選項。請參閱 資料格式選項。
傳回 DataSink。
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
建立從 Amazon S3、JDBC 或 Glue Data Catalog AWS 等來源DataSource 特徵讀取資料的 。也支援 Kafka 和 Kinesis 串流資料來源。
connectionType— 資料來源的類型。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。-
connectionOptions— JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。如需詳細資訊,請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。Kinesis 串流來源需要下列連線選項:
streamARN、startingPosition、inferSchema及classification。Kafka 串流來源需要以下連線選項:
connectionName、topicName、startingOffsets、inferSchema及classification。 transformationContext— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。pushDownPredicate— 分割區欄上的述詞。
傳回 DataSource。
Amazon Kinesis 串流來源範例:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Kafka 串流來源範例:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
建立從 Amazon S3、JDBC 或 AWS Glue Data Catalog 等來源DataSource 特徵讀取資料的 ,並設定存放在來源中的資料格式。
connectionType– 資料來源的類型。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。-
options– JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。 transformationContext– 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。format– 來源中所存放資料的格式。當connectionType為「s3」時,您也可以指定format。可以是「avro」、「csv」、「grokLog」、「ion」、「json」、「xml」、「parquet」或「orc」其中之一。formatOptions– JSON 名稱值組的字串,會提供在來源剖析資料的其他選項。請參閱 資料格式選項。
傳回 DataSource。
範例
從 Amazon S3 上逗號分隔值 (CSV) 檔案的資料來源建立 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
從使用 JDBC 連線的 PostgreSQL 資料來源建立 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
從使用 JDBC 連線的 MySQL 資料來源建立 DynamicFrame:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
取得與此 GlueContext 相關聯的 SparkSession 物件。使用此 SparkSession 物件以將資料表與 UDF 註冊為與從 DynamicFrames 建立的 DataFrame 搭配使用。
傳回 SparkSession。
def startTransaction
def startTransaction(readOnly: Boolean):String
開始新交易。內部呼叫 Lake Formation startTransaction API。
readOnly– (布林值) 指出此交易應該是唯讀,還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。
傳回交易 ID。
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
嘗試遞交指定的交易。commitTransaction 可能會在交易完成遞交之前返回。內部呼叫 Lake Formation commitTransaction API。
transactionId– (字串) 要遞交的交易。waitForCommit– (布林值) 決定commitTransaction是否立即傳回。預設值為 true。如為 False,commitTransaction輪詢並等待,直到交易完成遞交。使用指數退避時,等待時間長度限制為 1 分鐘,最多可嘗試 6 次重試。
傳回一個布林值,指示遞交是否完成。
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
嘗試取消指定的交易。內部呼叫 Lake Formation CancelTransaction API。
transactionId– (字串) 要取消的交易。
如果交易先前已遞交,傳回 TransactionCommittedException 例外狀況。
def 此
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
使用指定的 SparkContext、最小分割區和分割區目標來建立 GlueContext 物件。
sc—SparkContext。minPartitions— 分割區最小數。targetPartitions— 分割區目標數。
傳回 GlueContext。
def 此
def this( sc : SparkContext )
透過提供的 SparkContext 建立 GlueContext 物件。將分割區的最小值設為 10,目標分割區設為 20。
sc—SparkContext。
傳回 GlueContext。
def 此
def this( sparkContext : JavaSparkContext )
透過提供的 JavaSparkContext 建立 GlueContext 物件。將分割區的最小值設為 10,目標分割區設為 20。
sparkContext—JavaSparkContext。
傳回 GlueContext。