AWS Glue Scala DynamicFrame 類別 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Glue Scala DynamicFrame 類別

Package: com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame 是自我描述 DynamicRecord 物件的分散式集合。

DynamicFrame 旨在為 ETL (擷取、轉換和載入) 操作提供靈活的資料模型。它們不需要結構描述即可建立,並可用於讀取和轉換內含雜亂或不一致值和類型的資料。您可以為需要結構描述的操作隨需運算結構描述。

DynamicFrame 提供各種轉換以進行資料洗滌和 ETL。它們還支援與 SparkSQL DataFrames 的相互轉換,以整合現有程式碼以及 DataFrames 提供許多分析操作。

以下參數在許多 AWS Glue 轉換之間共用以建構 DynamicFrame

  • transformationContext – 此 DynamicFrame 的識別碼。transformationContext 做為在執行之間持續存在之任務書籤狀態的金鑰使用。

  • callSite – 提供錯誤報告的內容資訊。從 Python 呼叫時,會自動設定這些值。

  • stageThreshold – 此 DynamicFrame 運算在擲回例外狀況之前允許的最大錯誤記錄數,不包含於先前 DynamicFrame 中存在的記錄。

  • totalThreshold – 在擲回例外狀況之前,最大的錯誤記錄總計 (包括之前框架的數量)。

Val errorsCount

val errorsCount

DynamicFrame 中的錯誤記錄數量。這包括之前操作的錯誤。

Def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings – 用來建構新 DynamicFrame 的映射序列。

  • caseSensitive – 是否將來源欄位視為區分大小寫。將此設定設為 false 可能有助於整合區分大小寫的存放區,例如 AWS Glue Data Catalog。

依據映射序列選取、投影及投射欄位。

每個映射皆由來源欄位和類型以及目標欄位和類型所組成。映射可能會指定為四元組 (source_pathsource_type target_pathtarget_type) 或包含相同資訊的 MappingSpec 物件。

映射除了可用來進行簡單的投影與投射,還可以用來將欄位巢狀化或解除巢狀化 (藉由使用「.」(句點) 分隔路徑元件來達成)。

例如,假設您有內含結構描述如下的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

您可以進行以下呼叫來將 statezip 欄位解除巢狀化。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

產生的結構描述如下。

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

您也可以使用 applyMapping 來將欄位重新巢狀化。例如,以下會反轉之前的轉換並在目標中建立名為 address 的結構。

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

可使用反引號 (``) 來括住包含「.」(句點) 字元的欄位名稱。

注意

您目前無法使用 applyMapping 方法來映射於陣列下的巢狀欄位。

Def assertErrorThreshold

def assertErrorThreshold : Unit

強制運算與驗證錯誤記錄數低於 stageThresholdtotalThreshold 的動作。如果任一條件失敗,將會擲出例外狀況。

Def count

lazy def count

傳回此 DynamicFrame 中的元素數量。

Def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳回已移除指定欄位的新 DynamicFrame

Def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳回已移除指定欄位的新 DynamicFrame

您可以使用這個方法來刪除巢狀欄位 (包括陣列中的巢狀欄位),但不能丟棄特定陣列元素。

Def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

傳回新的 DynamicFrame 並移除所有 null 欄位。

注意

這只會移除 NullType 類型的欄位。其他欄位中的個別 null 值不會被移除或修改。

Def errorsAsDynamicFrame

def errorsAsDynamicFrame

傳回包含此 DynamicFrame 錯誤記錄的新 DynamicFrame

Def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

建構新的 DynamicFrame,其中僅包含函數「f」傳回 true 的那些記錄。篩選條件函數「f」不應使輸入記錄產生變化。

Def getName

def getName : String

傳回此 DynamicFrame 的名稱。

Def getNumPartitions

def getNumPartitions

傳回此 DynamicFrame 中的分割區數量。

Def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

如果結構描述已經計算,即傳回結構描述。如果結構描述尚未計算,則不掃描資料。

Def isSchemaComputed

def isSchemaComputed : Boolean

如果此 DynamicFrame 的結構描述已經計算,即傳回 true,否則傳回 false。如果此方法傳回 false,則呼叫 schema 方法需要另一個結構描述來在此 DynamicFrame 中傳遞記錄。

Def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

Def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 – 此 DynamicFrame 中要用於聯結的欄位。

  • keys2 – frame2 中要用於聯結的欄位。長度必須與 keys1 相同。

  • frame2 – 要據以加入的 DynamicFrame

傳回使用指定金鑰以 frame2 執行對等聯結的結果。

Def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳回藉由將指定函數「f」套用至此 DynamicFrame 中各個記錄而建構的新 DynamicFrame

此方法會在套用指定的函數之前複製每個記錄,因此可以安全地改變記錄。如果映射函數在指定的記錄擲出例外狀況,會將該記錄標示為錯誤,而會將堆疊追蹤儲存為錯誤記錄中的欄位。

Def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame – 要合併的暫存 DynamicFrame

  • primaryKeys – 要從來源和暫存 DynamicFrame 比對記錄的主索引鍵欄位清單。

  • transformationContext – 用來擷取目前轉換之中繼資料的唯一字串 (選用)。

  • options – JSON 名稱值組的字串,可提供此轉換的額外資料。

  • callSite – 用於提供錯誤報告的內容資訊。

  • stageThreshold – A Long。在給定轉換中的錯誤數量,其處理需要輸出錯誤。

  • totalThreshold – A Long。在此轉換之前 (包括在此轉換中) 的錯誤總數,其處理需要輸出錯誤。

根據指定的主索引鍵來合併此 DynamicFrame 與暫存 DynamicFrame 以識別記錄。重複的記錄 (具有相同主索引鍵的記錄) 不會被刪除重複資料。如果暫存影格中沒有相符的記錄,則會保留來源中的所有記錄 (包括重複項)。如果暫存影格具有相符的記錄,則暫存影格中的記錄會覆寫 AWS Glue 中來源的記錄。

在下列情況下,傳回的 DynamicFrame 包含記錄 A:

  1. 如果 A 同時存在於來源影格和暫存影格,則會傳回暫存影格中的 A

  2. 如果 A 位於來源資料表中而 A.primaryKeys 不在 stagingDynamicFrame 中 (這表示 A 未在暫存資料表中更新)。

來源影格和暫存影格不需要具有相同的結構描述。

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

Def printSchema

def printSchema : Unit

以人類可讀取的格式,將此 DynamicFrame 的結構描述列印至 stdout

Def recomputeSchema

def recomputeSchema : Schema

強制結構描述重新計算。這需要掃描資料,但如果目前的結構描述中有一些欄位不存在於資料中,則可能會「限鎖」結構描述。

傳回重新計算的結構描述。

Def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName – 在輸出中用於基本 DynamicFrame 的名稱。藉由旋轉陣列所建立的 DynamicFrame 會以此做為字首。

  • stagingPath – Amazon Simple Storage Service (Amazon S3) 路徑,用來寫入中繼資料。

  • options – 關聯化選項和組態。目前未使用。

將所有巢狀結構平面化並將陣列旋轉為單獨的資料表。

您可以使用此操作來準備深度巢狀資料,以將該資料擷取至關聯式資料庫。巢狀結構以相同於 Unnest 轉換的方式平面化。此外,系統會將陣列旋轉為單獨的資料表,每個陣列元素都將成為資料列。例如,假設您有含以下資料的 DynamicFrame

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

執行下列程式碼。

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

這會產生兩個資料表。第一個資料表名為「people」,並包含下列項目。

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

在此,friends 陣列已替換為自動產生的聯結索引鍵。建立名為 people.friends 的個別資料表,內含以下內容。

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

在此資料表中,「id」是一種聯結索引鍵,可識別陣列元素來自哪些記錄,「index」會參照原始陣列中的位置,而「val」則是實際的陣列項目。

relationalize 方法會傳回藉由將此程序遞迴套用至所有陣列而建立的一系列 DynamicFrame

注意

AWS Glue 程式庫會為新表格自動產生聯結索引鍵。為了確保聯結索引鍵在任務執行中是唯一的,您必須啟用任務書籤。

Def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName – 欄位的原始名稱。

  • newName – 欄位的新名稱。

傳回已重新命名指定欄位的新 DynamicFrame

您可以使用這個方法來重新命名巢狀欄位。例如,以下程式碼會將地址結構中的 state 重新命名為 state_code

{{{ df.renameField("address.state", "address.state_code") }}}

Def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳回包含 numPartitions 分割區的新 DynamicFrame

Def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption – 套用到所有未在規格序列中列出之 ChoiceType 欄位的動作。

  • database – 搭配 match_catalog 動作使用的 Data Catalog 資料庫。

  • tableName – 搭配 match_catalog 動作使用的 Data Catalog 資料表。

使用更為特定的類型取代一或多個 ChoiceType 以傳回新 DynamicFrame

有兩種方式可以使用 resolveChoice。第一種是指定一系列的特定的欄以及解析它們的方式。這些是指定為由 (欄位、動作) 配對所組成的元組。

可行的動作如下:

  • cast:type – 嘗試將所有值投射至指定類型。

  • make_cols – 將每個不同的類型轉換為具有 columnName_type 名稱的欄位。

  • make_struct – 將欄位轉換為每個不同類型皆有金鑰的結構。

  • project:type – 僅保留指定類型的值。

resolveChoice 的其他模式可為所有 ChoiceType 指定單一解析度。您可以在 ChoiceType 的完整清單在執行之前是未知的情況下使用此模式。除了以上列出的動作,此模式也支援下列動作:

  • match_catalogChoiceType – 嘗試將每個 投射至指定目錄資料表中的對應類型。

範例:

藉由投射至 int 以解析 user.id 欄位,並且讓 address 欄位僅保留結構。

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

藉由將每個選擇轉換單獨的欄位以解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

藉由投射至指定目錄資料表中的類型以解析所有 ChoiceType

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

Def schema

def schema : Schema

傳回此 DynamicFrame 的結構描述。

傳回的結構描述會保證包含於此 DynamicFrame 中之記錄存在的每個欄位。但在少數情況下,它也可能包含額外的欄位。您可以使用 Unnest 方法,依據此 DynamicFrame 中的記錄來「限縮」結構描述。

Def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

DynamicFrame 傳回單一欄位。

Def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths – 要選取的欄位名稱序列。

傳回包含指定欄位的新 DynamicFrame

注意

您只能使用 selectFields 方法來選取最上層欄位。您可以使用 applyMapping 方法來選取巢狀欄位。

Def show

def show( numRows : Int = 20 ) : Unit
  • numRows – 要列印的資料列數。

以 JSON 格式列印此 DynamicFrame 的資料列。

Def simplifyDDBJson

DynamoDB 會使用 AWS Glue DynamoDB 匯出連接器進行匯出,這會產生具有特定巢套結構的 JSON 檔案。如需詳細資訊,請參閱資料物件simplifyDDBJson簡化此類資料的 DynamicFrame 中的巢狀資料欄,並傳回新的簡化 DynamicFrame。如果 List 類型中包含多種類型或 Map 類型,則 List 中的元素不會進行簡化。此方法僅支援 DynamoDB 匯出 JSON 格式的資料。考慮 unnest 對其他類型的資料執行類似的變更。

def simplifyDDBJson() : DynamicFrame

此方法不接受任何參數。

範例輸入

請考慮由 DynamoDB 匯出產生的下列結構描述:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

範例程式碼

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

simplifyDDBJson 轉換將此簡化為:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳遞轉換以傳回相同的記錄,但副作用是寫出部分記錄。

  • path – 以 s3://bucket//path 格式將輸出寫入至 Amazon S3 中的路徑。

  • options – 描述取樣行為的選用 JsonOptions 映射。

傳回包含與此相同記錄的 DynamicFrame

在預設情況下,寫入 100 任意記錄到 path 指定的位置。您可以使用 options 對應來自訂此行為。有效索引鍵包括下列:

  • topk – 指定寫出的記錄總數。預設為 100。

  • prob – 指定包含個別記錄的機率 (以小數表示)。預設值為 1。

例如,以下呼叫取樣資料集的方式是以 20% 的可能性選取每個記錄,並在已寫入 200 個記錄之後停止。

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

Def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths – 要包含在第一個 DynamicFrame 中的路徑。

傳回兩個 DynamicFrame 的序列。第一個 DynamicFrame 包含指定的路徑,第二個包含所有其他欄。

範例

在 AWS Glue Data Catalog 的 legislators 資料庫中,此範例取得根據 persons 資料表建立的 DynamicFrame,將此 DynamicFrame 分割成兩個,其中指定的欄位進入第一個 DynamicFrame,其餘欄位進入第二個 DynamicFrame。然後,該範例從結果中選擇第一個 DynamicFrame。

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

Def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

根據比較欄位與常數的述詞來分割列。

  • paths – 用於比較的欄位。

  • values – 用於比較的常數值。

  • operators – 用於比較的運算子。

傳回兩個 DynamicFrame 的序列。第一個包含述詞為 true 的列,第二個包含述詞為 false 的列。

使用三個序列指定述詞:「paths」包含 (可能為巢狀) 欄位名稱、「values」包含要比較的常數值,以及「operators」包含用於比較的運算子。這三個序列的長度必須相同:第 n 個運算子會用於比較第 n 個欄位與第 n 個值。

每個運算子都必須是「!=」、「=」、「<=」、「<」、「>=」或「>」其中之一。

舉例來說,以下呼叫會分割 DynamicFrame,因此第一個輸出框架會包含來自美國超過 65 人的記錄,第二個會包含所有其他記錄。

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

Def stageErrorsCount

def stageErrorsCount

傳回運算此 DynamicFrame 時建立的錯誤記錄的數量。這會排除之前傳遞至此 DynamicFrame 做為輸入之操作的錯誤。

Def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

以相同的結構描述和記錄,將此 DynamicFrame 轉換為 Apache Spark SQL DataFrame

注意

由於 DataFrame 不支援 ChoiceType,因此這個方法會自動將 ChoiceType 欄轉換成 StructType。如需有關解析選擇的詳細資訊和選項,請參閱resolveChoice

Def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path – 要剖析的欄。必須為字串或二進位。

  • format – 用於剖析的格式。

  • optionString – 傳送格式的選項,例如 CSV 分隔符號。

根據指定的格式,剖析嵌入字串或二進位欄位。剖析的欄位是具有原始資料欄名稱結構的巢狀欄位。

例如,假設您有 CSV 檔案與內嵌 JSON 欄位。

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

完成初始剖析後,您會取得具有下列結構描述的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: string }}}

您可以呼叫地址欄位上的 unbox 以剖析特定元件。

{{{ df.unbox("address", "json") }}}

如此將提供我們具有下列結構描述的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

Def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

傳回其所有巢狀結構皆已平面化的新 DynamicFrame。使用「.」(句點) 字元建構名稱。

例如,假設您有內含結構描述如下的 DynamicFrame

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

以下呼叫將會解巢狀地址結構。

{{{ df.unnest() }}}

產生的結構描述如下。

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

此方法也會解巢狀陣列中的巢狀結構。但因為歷史因素,這類欄位的名稱會附加封閉陣列和「.val」的名稱。

Def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

解除專屬於 DynamoDB JSON 結構中 DynamicFrame 內的巢狀欄的巢狀化,並傳回新的解巢狀 DynamicFrame。結構類型陣列的欄將不是解巢狀狀態。請注意,這是一種特定類型的解除巢狀化轉換,其行為與常規 unnest 轉換不同,且資料必須已經位於 DynamoDB JSON 結構中。如需詳細資訊,請參閱 DynamoDB JSON

例如,讀取 DynamoDB JSON 結構的匯出結構描述與以下類似:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson() 轉換會將此轉換為:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

下列程式碼範例演示如何使用 AWS Glue DynamoDB 匯出連接器,呼叫 DynamoDB JSON 解巢狀,並列印分割區數量:

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

Def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema – 傳回結構描述以供使用的函數。指定為零參數函數以延遲可能昂貴的運算。

將此 DynamicFrame 的結構描述設定為指定的值。這主要用於內部以避免昂貴的結構描述重新計算。傳入的結構描述必須包含存在於資料中的所有資料欄位。

Def withName

def withName( name : String ) : DynamicFrame
  • name – 要使用的新名稱。

傳回此具有新名稱的 DynamicFrame 的副本。

Def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

傳回此具有指定轉換內容的 DynamicFrame 的副本。