Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
AWS Glue Scala DynamicFrame-Klasse
Paket: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )Ein DynamicFrame ist eine verteilte Sammlung von selbstbeschreibenden DynamicRecord Objekten.
DynamicFrames wurden entwickelt, um ein flexibles Datenmodell für ETL-Operationen (Extrahieren, Transformieren und Laden) bereitzustellen. Sie benötigen kein Schema zum Erstellen, und Sie können damit Daten lesen und transformieren, die unstrukturierte oder inkonsistente Werte und Typen enthalten. Ein Schema kann bei Bedarf für solche Operationen berechnet werden, die eines benötigen.
DynamicFrames bieten eine Reihe von Transformationen für die Datenreinigung und ETL. Sie unterstützen zudem die Konvertierung von und aus SparkSQL DataFrames für die Integration in vorhandenen Code und die vielen Analysevorgänge, die DataFrames bereitstellen.
Die folgenden Parameter werden über viele AWS Glue-Transformationen hinweg geteilt, die DynamicFrames erstellen:
transformationContext– Der Bezeichner für diesenDynamicFrame. DertransformationContextwird als Schlüssel für den Auftrags-Lesezeichenstatus verwendet, der während der Ausführungen persistent ist.callSite– Liefert Kontextinformationen für die Fehlerberichterstattung. Diese Werte werden automatisch beim Aufruf von Python festgelegt.stageThreshold– Die maximale Anzahl der Fehlerdatensätze, die aufgrund der Berechnung diesesDynamicFramezulässig sind, ehe eine Ausnahme ausgelöst wird. Ausgenommen sind Datensätze aus dem vorherigenDynamicFrame.totalThreshold– Die maximale Anzahl der Gesamtfehlersätze, bevor eine Ausnahme ausgelöst wird, einschließlich derjenigen aus früheren Frames.
Val errorsCount
val errorsCount
Die Anzahl der Fehlerdatensätze in diesem DynamicFrame. Dazu zählen Fehler aus früheren Operationen.
Def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings– Eine Folge von Zuweisungen für die Erstellung eines neuenDynamicFrame.caseSensitive– Legt fest, ob bei Quellspalten die Groß-/Kleinschreibung beachtet wird. Die Festlegung auf „false“ kann bei der Integration in Datenspeicher wie dem AWS Glue Data Catalog helfen, bei dem die Groß-/Kleinschreibung nicht berücksichtigt wird.
Selektiert, projiziert und wandelt Spalten basierend auf Mappingreihenfolgen um.
Jede Zuweisung besteht aus einer Quellspalte und einem Typ und einer Zielspalte und einem Typ. Zuweisungen können entweder als „vierstelliger“ Tupel (source_path, source_type, target_path, target_type) oder als MappingSpec-Objekt, das dieselben Informationen enthält, angegeben werden.
Neben der Verwendung von Zuweisungen für einfache Projektionen und Umwandlungen können Sie diese auch zum Verschachteln oder Aufheben der Verschachtelung von Feldern verwenden, indem Sie Komponenten des Pfades mit „.“ (Punkt) trennen.
Angenommen, Sie haben einen DynamicFrame mit folgendem Schema:
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Mit dem folgenden Aufruf können Sie die Verschachtelung der Felder state und zip aufheben:
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
Das resultierende Schema lautet wie folgt:
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Sie können auch applyMapping verwenden, um Spalten neu zu verschachteln. Durch folgende Operation wird beispielsweise die vorherige Transformation umgekehrt und eine neue Struktur namens address am Ziel erstellt:
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Feldnamen, die „.“ (Punkt-)Zeichen enthalten, können mit Hilfe von Backticks (``) zitiert werden.
Anmerkung
Zurzeit können Sie die applyMapping-Methode nicht für die Zuweisung von Spalten verwenden, die unter Arrays verschachtelt sind.
Def assertErrorThreshold
def assertErrorThreshold : Unit
Eine Aktion, die eine Berechnung erzwingt und sicherstellt, dass die Anzahl der Fehlerdatensätze stageThreshold und totalThreshold nicht überschreitet. Löst eine Ausnahme aus, wenn eine der Bedingungen fehlschlägt.
Def count
lazy
def count
Gibt die Anzahl der Elemente in diesem DynamicFrame zurück.
Def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame zurück, bei dem die angegebene Spalte entfernt wurde.
Def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame zurück, bei dem die angegebenen Spalten entfernt wurden.
Sie können diese Methode verwenden, um verschachtelte Spalten zu löschen, einschließlich derjenigen in Arrays. Sie kann aber nicht eingesetzt werden, um bestimmte Array-Elemente zu verwerfen.
Def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Gibt einen neuen DynamicFrame zurück, bei dem alle Nullspalten entfernt sind.
Anmerkung
Dies entfernt nur Spalten des Typs NullType. Einzelne Nullwerte in anderen Spalten werden weder entfernt noch geändert.
Def errorsAsDynamicFrame
def errorsAsDynamicFrame
Gibt einen neuen DynamicFrame mit den Fehlersätzen aus diesem DynamicFrame zurück.
Def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Erstellt einen neuen DynamicFrame, der nur die Datensätze enthält, für die die Funktion „f“ true zurückgibt. Die Filterfunktion „f“ sollte den Eingabe-Datensatz nicht verändern.
Def getName
def getName : String
Gibt den Namen dieses DynamicFrame zurück.
Def getNumPartitions
def getNumPartitions
Gibt die Anzahl der Partitionen in diesem DynamicFrame zurück.
Def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
Gibt das Schema zurück, wenn es bereits berechnet wurde. Scannt die Daten nicht, wenn das Schema noch nicht berechnet wurde.
Def isSchemaComputed
def isSchemaComputed : Boolean
Gibt "true" zurück, wenn das Schema für diesen DynamicFrame bereits berechnet wurde, andernfalls "false". Wenn diese Methode "false" zurückgibt, erfordert der Aufruf der schema-Methode eine erneute Übergabe dieser Datensätze im DynamicFrame.
Def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
Def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1– Die Spalten in diesemDynamicFrame, die für den Join verwendet werden sollen.keys2– Die Spalten inframe2, die für den Join verwendet werden sollen. Muss die gleiche Länge haben wiekeys1.frame2– Der andereDynamicFramefür einen Join.
Gibt das Ergebnis der Durchführung eines equijoin mit frame2 über die angegebenen Schlüssel zurück.
Def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame zurück, der erstellt wird, indem die angegebene Funktion „f“ auf jeden Datensatz in diesem DynamicFrame angewendet wird.
Diese Methode kopiert jeden Datensatz, ehe die angegebene Funktion angewendet wird, sodass das Verändern der Datensätze sicher ist. Wenn die Zuweisungsfunktion eine Ausnahme für einen bestimmten Datensatz auslöst, wird der Datensatz als fehlerhaft gekennzeichnet und der Stack-Trace als Spalte im Fehlerdatensatz gespeichert.
Def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame– Der Staging-DynamicFrame, der zusammengeführt werden soll.primaryKeys– Die Liste der Primärschlüsselfelder, mit denen Datensätze aus der Quelle und Staging-DynamicFrames übereinstimmen.transformationContext– Eine eindeutige Zeichenfolge, die zum Abrufen von Metadaten über die aktuelle Transformation verwendet wird (optional).options– Eine Zeichenfolge von JSON-Name-Wert-Paaren, die zusätzliche Informationen für diese Transformation bereitstellen.callSite– Wird verwendet, um Kontextinformationen für Fehlerberichte bereitzustellen.stageThreshold– EinLong. Die Anzahl der Fehler in der angegebenen Transformation, für die die Verarbeitung fehlerhaft sein muss.totalThreshold– EinLong. Die Gesamtzahl der Fehler bis einschließlich dieser Transformation, bei denen die Verarbeitung fehlerhaft sein muss.
Führt dieses DynamicFrame mit einem Staging-DynamicFrame basierend auf den angegebenen Primärschlüsseln zusammen, um Datensätze zu identifizieren. Doppelte Datensätze (Datensätze mit denselben Primärschlüsseln) werden nicht dedupliziert. Wenn kein übereinstimmender Datensatz im Staging-Frame vorhanden ist, werden alle Datensätze (einschließlich Duplikate) von der Quelle beibehalten. Wenn der Staging-Frame übereinstimmende Datensätze enthält, überschreiben die Datensätze aus dem Staging-Frame die Datensätze in der Quelle in AWS Glue.
Der zurückgegebene DynamicFrame enthält Datensatz A in folgenden Fällen:
Wenn sowohl im Quell- als auch im Staging-Frame
Avorhanden ist, wirdAim Staging-Frame zurückgegeben.Wenn sich
Ain der Quelltabelle undA.primaryKeysnicht instagingDynamicFramebefindet (d. h.Awird nicht in der Staging-Tabelle aktualisiert).
Der Quell- und der Staging-Frame müssen nicht dasselbe Schema haben.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))Def printSchema
def printSchema : Unit
Druckt das Schema dieses DynamicFrame in einem lesbaren Format in stdout.
Def recomputeSchema
def recomputeSchema : Schema
Erzwingt eine Neuberechnung des Schemas. Dies erfordert einen Scan über die Daten, aber es kann das Schema „verschärfen“, wenn es einige Felder im aktuellen Schema gibt, die nicht in den Daten vorhanden sind.
Gibt das neu berechnete Schema zurück.
Def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName– Der Name für die BasisDynamicFramein der Ausgabe.DynamicFrames, die durch das Zusammenfassen von Arrays erstellt werden, beginnen mit diesem als Präfix.stagingPath– Der Amazon-S3-Pfad (Amazon Simple Storage Service) zum Schreiben von Zwischendaten.options– Relationalisierung von Optionen und Konfiguration. Derzeit nicht verwendet.
Gleicht alle verschachtelten Strukturen an und pivotiert Arrays in separate Tabellen.
Mit dieser Operation können Sie tief verschachtelte Daten für die Aufnahme in eine relationale Datenbank vorbereiten. Verschachtelte Strukturen werden genauso wie die Unnest-Transformation auf eine Ebene gebracht. Außerdem werden Arrays in separate Tabellen pivotiert. Dabei wird jedes Array-Element zu einer Zeile. Angenommen, Sie haben einen DynamicFrame mit den folgenden Daten:
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Führen Sie folgenden Code aus.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Es werden zwei Tabellen erstellt. Die erste Tabelle trägt den Namen „people“ und enthält Folgendes:
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Das Freunde-Array wurde durch einen automatisch generierten Join-Schlüssel ersetzt. Eine separate Tabelle namens people.friends mit folgendem Inhalt wird erstellt:
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
In dieser Tabelle ist „id“ ein Join-Schlüssel, der identifiziert, aus welchem Datensatz das Array-Element stammt, „index“ bezieht sich auf die Position im ursprünglichen Array und „val“ steht für den tatsächlichen Array-Eintrag.
Die relationalize-Methode gibt die Sequenz von DynamicFrames zurück, die durch rekursives Anwenden dieses Prozesses auf alle Arrays erzeugt werden.
Anmerkung
Die AWS Glue-Bibliothek generiert automatisch Join-Schlüssel für neue Tabellen. Um sicherzustellen, dass Join-Schlüssel über alle Auftragsausführungen hinweg eindeutig sind, müssen Sie Auftrags-Lesezeichen aktivieren.
Def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName– Der ursprüngliche Name der Spalte.newName– Der neue Name der Spalte.
Gibt einen neuen DynamicFrame zurück, wobei das angegebene Feld umbenannt ist.
Mit dieser Methode können Sie verschachtelte Felder umbenennen. Der folgende Code benennt beispielsweise innerhalb der Adressenstruktur state zu state_code um:
{{{ df.renameField("address.state", "address.state_code") }}}
Def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame mit numPartitions Partitionen zurück.
Def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption– Eine Aktion, die auf alleChoiceType-Spalten anzuwenden ist, die nicht in der Spezifikationsreihenfolge aufgeführt sind.database– Die Data-Catalog-Datenbank zur Verwendung mit dermatch_catalog-Aktion.tableName– Die Data-Catalog-Tabelle zur Verwendung mit dermatch_catalog-Aktion.
Gibt einen neuen DynamicFrame zurück, indem eine oder mehrere ChoiceTypes durch einen spezifischeren Typ ersetzt werden.
Es gibt zwei Möglichkeiten für die Verwendung von resolveChoice. Die erste besteht in der Angabe bestimmter Spalten und der Art, wie diese aufgelöst werden. Sie werden als Tupels, bestehend aus (Spalte, Aktion)-Paaren, angegeben.
Im Folgenden sind die möglichen Aktionen aufgeführt:
cast:type– Versucht, alle Werte in den angegebenen Typ umzuwandeln.make_cols– Konvertiert die einzelnen verschiedenen Typen in eine Spalte namenscolumnName_type.make_struct– Konvertiert eine Spalte in eine Struktur mit Schlüssel für die individuellen Typen.project:type– Behält nur Wert des angegebenen Typs bei.
Der andere Modus für resolveChoice dient zum Angeben einer einzigen Auflösung für alle ChoiceTypes. Sie können diesen verwenden, wenn die vollständige Liste der ChoiceTypes vor der Ausführung unbekannt ist. Zusätzlich zu den soeben aufgeführten Aktionen unterstützt dieser Modus noch die folgende Aktion:
match_catalogChoiceType– Versucht jeden in einen entsprechenden Typ in der angegebenen Katalogtabelle umzuwandeln.
Beispiele:
Lösen Sie die user.id-Spalte auf, indem Sie eine Umwandlung in ein „int“ durchführen, und sorgen Sie dafür, dass das address-Feld nur Strukturen beibehält:
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Lösen Sie alle ChoiceTypes auf, indem Sie jede Auswahl in eine eigene Spalte umwandeln:
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Lösen Sie alle ChoiceTypes auf, indem Sie diese in die Typen in der angegebenen Katalogtabelle umwandeln.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
Def schema
def schema : Schema
Gibt das Schema dieses DynamicFrame zurück.
Das zurückgegebene Schema enthält garantiert alle Felder in einem Datensatz in diesem DynamicFrame. In einigen wenigen Fällen kann es aber auch zusätzliche Felder enthalten. Die Unnest-Methode kann verwendet werden, um das Schema basierend auf den Datensätzen in diesem DynamicFramezu „straffen“.
Def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt ein einzelnes Feld als DynamicFrame zurück.
Def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths– Die Reihenfolge der zu wählenden Spaltennamen.
Gibt einen neuen DynamicFrame mit den angegebenen Spalten zurück.
Anmerkung
Sie können die selectFields-Methode nur verwenden, um Spalten der obersten Ebene auszuwählen. Sie können die applyMapping-Methode zum Auswählen verschachtelter Spalten einsetzen.
Def show
def show( numRows : Int = 20 ) : Unit
numRows– Die Anzahl der zu druckenden Zeilen.
Druckt Zeilen aus diesem DynamicFrame im JSON-Format.
Def simplifyDDBJson
DynamoDB-Exporte mit dem AWS Glue-DynamoDB-Export-Connector ergeben JSON-Dateien mit speziellen geschachtelten Strukturen. Weitere Informationen finden Sie unter Datenobjekte. simplifyDDBJson Vereinfacht geschachtelte Spalten in einem DynamicFrame dieses Datentyps und gibt einen neuen vereinfachten DynamicFrame zurück. Wenn in einem Listentyp mehrere Typen oder ein Zuordnungstyp enthalten sind, werden die Elemente in der Liste nicht vereinfacht. Diese Methode unterstützt nur Daten im DynamoDB-Export-JSON-Format. Erwägen Sie unnest, um ähnliche Änderungen an anderen Datenarten vorzunehmen.
def simplifyDDBJson() : DynamicFrame
Diese Methode verwendet keine Parameter.
Beispieleingabe
Betrachten Sie das folgende Schema, das durch einen DynamoDB-Export generiert wurde:
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Beispiel-Code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
Die Transformation simplifyDDBJson vereinfacht dies zu:
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
Def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Pass-Through-Transformation, die die gleichen Datensätze zurückgibt, darüber hinaus aber noch ein Subset an Datensätzen schreibt.
path– Der Pfad in Amazon S3 zum Schreiben der Ausgabe in der Forms3://bucket//path.options– Eine optionaleJsonOptions-Zuweisung, die das Sampling-Verhalten beschreibt.
Gibt einen DynamicFrame zurück, der die gleichen Datensätze wie dieser enthält.
Standardmäßig werden 100 willkürliche Datensätze in den durch path angegebenen Speicherort geschrieben. Sie können dieses Verhalten anpassen, indem Sie die options-Zuweisung verwenden. Gültige Schlüssel enthalten Folgendes.
topk– Gibt die Gesamtzahl der geschriebenen Datensätze an. Der Standardwert ist 100.prob– Gibt an, wie wahrscheinlich es ist (in Form einer Dezimalzahl), dass ein einzelner Datensatz enthalten ist. Standard = 1.
Beispielsweise würde der folgende Aufruf den Datensatz sampeln, indem er jeden Datensatz mit einer Wahrscheinlichkeit von 20 % auswählt und nach dem Schreiben von 200 Datensätzen stoppt:
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
Def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths– Die Pfade, die in den erstenDynamicFrameaufzunehmen sind.
Gibt eine Abfolge zweier DynamicFrames zurück. Die erste DynamicFrame enthält die angegebenen Pfade und die zweite alle anderen Spalten.
Beispiel
In diesem Beispiel wird ein DynamicFrame aus der persons-Tabelle in der legislators-Datenbank im AWS-Glue-Data Catalog erstellt und der DynamicFrame in zwei Teile aufgeteilt, wobei die angegebenen Felder in den ersten DynamicFrame und die übrigen Felder in einen zweiten DynamicFrame gehen. Das Beispiel wählt dann den ersten DynamicFrame aus dem Ergebnis aus.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
Def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Teilt Zeilen basierend auf Prädikaten, die Konstanten mit Spalten vergleichen, auf.
paths– Die Spalten, die zum Vergleich verwendet werden sollen.values– Die konstanten Werte, die zum Vergleich verwendet werden sollen.operators– Die zum Vergleich zu verwendenden Operatoren.
Gibt eine Abfolge zweier DynamicFrames zurück. Die erste enthält Zeilen, für die das Prädikat "true" ist, und die zweite enthält solche, bei denen es "false" ist.
Prädikate werden über drei Sequenzen spezifiziert: „paths“ enthält die (evtl. geschachtelten) Spaltennamen, „values“ enthält die zu vergleichenden konstanten Werte und „operators“ enthält die Operatoren, die zum Vergleich verwendet werden sollen. Alle drei Sequenzen müssen gleich lang sein: Der n. Operator wird verwendet, um die n. Spalte mit dem n. Wert zu vergleichen.
Jeder Operator muss "!=", "=", "<=", "<", ">=" oder ">" sein.
Beispielsweise teilt der folgende Aufruf einen DynamicFrame so, dass der erste Ausgabe-Frame die Datensätze von Personen über 65 aus den USA enthält und der zweite alle anderen:
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
Def stageErrorsCount
def stageErrorsCount
Gibt die Anzahl der Fehlerdatensätze zurück, die bei der Berechnung dieses DynamicFrame generiert wurden. Ausgenommen sind Fehler aus vorherigen Operationen, die diesem DynamicFrame als Eingabe übergeben wurden.
Def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Konvertiert DynamicFrame in einen Apache Spark SQL DataFrame mit demselben Schema und denselben Datensätzen.
Anmerkung
Da DataFrames ChoiceTypes nicht unterstützen, konvertiert diese Methode ChoiceType-Spalten automatisch in StructTypes. Weitere Informationen und Optionen zur Auflösung der Auswahl finden Sie unter resolveChoice.
Def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path– Die zu analysierende Spalte. Muss eine Zeichenfolge oder ein Binärwert sein.format– Das für die Analyse zu verwendende Format.optionString– An das Format zu übergebende Optionen, beispielsweise das CSV-Trennzeichen.
Analysiert eine eingebettete Zeichenfolge oder eine binäre Spalte entsprechend des angegebenen Formats. Analysierte Spalten werden unterhalb einer Struktur mit dem ursprünglichen Spaltennamen verschachtelt.
Angenommen, Sie haben eine CSV-Datei mit einer eingebetteten JSON-Spalte.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Nach einer ersten Analyse erhalten Sie einen DynamicFrame mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: string }}}
Sie können unbox für die Adressspalte aufrufen, um die einzelnen Komponenten zu analysieren.
{{{ df.unbox("address", "json") }}}
Dadurch erhalten wir einen DynamicFrame mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Gibt einen neuen DynamicFrame zurück, bei dem allen verschachtelten Strukturen auf eine Ebene gebracht wurden. Namen werden mit Hilfe des „.“ (Punkt-)Zeichens erstellt.
Angenommen, Sie haben einen DynamicFrame mit folgendem Schema:
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
Der folgende Aufruf hebt die Einbettung der Adressenstruktur auf:
{{{ df.unnest() }}}
Das resultierende Schema lautet wie folgt:
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Diese Methode verhindert auch verschachtelte Strukturen innerhalb von Arrays. Aus historischen Gründen werden den Namen solcher Felder jedoch der Name des umschließenden Arrays und „.val“ vorangestellt.
Def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Hebt die Verschachtelung der Spalten in einem DynamicFrame auf, die sich speziell in der DynamoDB-JSON-Struktur befinden, und gibt einen neuen, nicht verschachtelten DynamicFrame zurück. Bei Spalten, die aus einem Array von Strukturtypen bestehen, wird die Verschachtelung nicht aufgehoben. Dies ist ein spezieller Typ der Transformation zum Aufheben der Verschachtelung, der sich anders verhält als die reguläre unnest-Transformation und erfordert, dass sich die Daten bereits in der DynamoDB-JSON-Struktur befinden. Weitere Informationen finden Sie unter DYNAMODB JSON.
Das Schema eines Vorgangs zum Lesen eines Exports mit der DynamoDB-JSON-Struktur könnte beispielsweise wie folgt aussehen:
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
Die unnestDDBJson()-Transformation würde dies folgendermaßen umwandeln:
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
Das folgende Codebeispiel veranschaulicht, wie Sie den AWS-Glue-DynamoDB-Export-Connector verwenden, die Aufhebung einer DynamoDB-JSON-Verschachtelung aufrufen und die Anzahl der Partitionen ausdrucken:
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
Def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema– Eine Funktion, die das zu verwendende Schema zurückgibt. Wird als Null-Parameter-Funktion angegeben, um eine möglicherweise kostenintensive Berechnung zu verhindern.
Legt das Schema dieses DynamicFrame auf den angegebenen Wert fest. Dies wird in erster Linie intern verwendet, um eine kostspielige Neuberechnung des Schemas zu vermeiden. Das übergebene Schema muss alle Spalten enthalten, die in den Daten vorhanden sind.
Def withName
def withName( name : String ) : DynamicFrame
name– Der zu verwendende neue Name.
Gibt eine Kopie dieser DynamicFrame mit einem neuen Namen zurück.
Def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Gibt eine Kopie dieser DynamicFrame mit dem angegebenen Transformationskontext zurück.