Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
AWS Glue – Classe DynamicFrame Scala
Package : com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )Un DynamicFrame est une collection distribuée d'objets DynamicRecord à description automatique.
Le DynamicFrame est conçu pour fournir un modèle de données flexible pour les opérations ETL (extraction, transformation et chargement). Il ne nécessite pas de créer un schéma et peut être utilisé pour lire et transformer les données contenant des valeurs et des types incohérents ou complexes. Un schéma peut être calculé à la demande pour les opérations qui en nécessitent un.
Les DynamicFrame fournissent une plage de transformations pour le nettoyage des données et ETL. Ils prennent également en charge la conversion vers et depuis les DataFrames SparkSQL afin d'intégrer le code existant et les nombreuses opérations d'analyse que fournissent les DataFrames.
Les paramètres suivants sont partagés entre plusieurs transformations AWS Glue qui construisent les DynamicFrames :
transformationContext– Identificateur pour ceDynamicFrame. LetransformationContextest utilisé en tant que clé pour l'état de marque-page de tâche conservé d'une exécution à l'autre.callSite— fournit les informations de contexte pour le signalement d'erreurs. Ces valeurs sont automatiquement définies lors de l'appel à partir de Python.stageThreshold— Nombre maximal d'enregistrements d'erreurs autorisés depuis le calcul duDynamicFrameavant de lever une exception, à l'exclusion des enregistrements présents dans leDynamicFrameprécédent.totalThreshold— Nombre maximal d'enregistrements d'erreur avant qu'une exception ne soit levée, y compris ceux des images précédentes.
val errorsCount
val errorsCount
Nombre d'enregistrements d'erreur dans le DynamicFrame. Les erreurs des opérations précédentes sont incluses dans le nombre.
def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings– Séquence des mappages pour construire un nouveauDynamicFrame.caseSensitive— indique si les colonnes sources sont considérées comme sensibles à la casse. L'attribution de la valeur false à ce paramètre peut aider lors de l'intégration de magasins insensibles à la casse comme AWS Glue Data Catalog.
Sélectionne, projette et convertit les colonnes en fonction d'une séquence de mappages.
Chaque mappage se compose d'une colonne source et d'un type, ainsi que d'une colonne cible et d'un type. Les mappages peuvent être spécifiés sous forme de quatre tuples (source_path, source_type, target_path, target_type) ou d'un objet MappingSpec contenant les mêmes informations.
En plus des projections simples et de la conversion, les mappages peuvent être utilisés pour imbriquer ou désimbriquer les champs en séparant les composants du chemin d'accès par « . » (point).
Par exemple, supposons que vous ayez une trame DynamicFrame avec le schéma suivant.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
Vous pouvez effectuer l'appel suivant pour désimbriquer les champs state et zip.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
Le schéma obtenu est le suivant.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
Vous pouvez également utiliser applyMapping pour réimbriquer les colonnes. Par exemple, ce qui suit inverse la transformation précédente et crée une structure nommée address dans la cible.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
Les noms de champ qui contiennent des caractères « . » (point) peuvent être placés entre guillemets (``).
Note
La méthode applyMapping ne peut pas être utilisée actuellement pour mapper les colonnes imbriquées sous des tableaux.
def assertErrorThreshold
def assertErrorThreshold : Unit
Action qui oblige le calcul et vérifie que le nombre d'enregistrements d'erreur est inférieur à stageThreshold et totalThreshold. Lève une exception si l'une ou l'autre condition échoue.
def count
lazy
def count
Retourne le nombre d'éléments dans le DynamicFrame.
def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame avec la colonne spécifiée supprimée.
def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame avec les colonnes spécifiées supprimées.
Vous pouvez utiliser cette méthode pour supprimer les colonnes imbriquées, y compris celles à l'intérieur de tableaux, mais pas pour supprimer des éléments de tableau spécifiques.
def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
Renvoie un nouveau DynamicFrame avec toutes les colonnes null supprimées.
Note
Seules les colonnes de type NullType sont supprimées. Les valeurs null des autres colonnes ne sont pas supprimées ou modifiées.
def errorsAsDynamicFrame
def errorsAsDynamicFrame
Renvoie un nouveau DynamicFramecontenant les enregistrements d'erreur de ce DynamicFrame.
def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Crée un nouveau DynamicFrame contenant uniquement les enregistrements pour lesquels la fonction 'f' renvoie la valeur true. La fonction de filtre 'f' ne doit pas muter l'enregistrement d'entrée.
def getName
def getName : String
Renvoie le nom du DynamicFrame.
def getNumPartitions
def getNumPartitions
Retourne le nombre de partitions du DynamicFrame.
def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
Renvoie le schéma s'il a déjà été calculé. N'analyse pas les données si le schéma n'a pas déjà été calculé.
def isSchemaComputed
def isSchemaComputed : Boolean
Renvoie la valeur true si le schéma a été calculé pour ce DynamicFrame, ou false dans le cas contraire. Si la méthode renvoie la valeur false, l'appel de la méthode schema nécessite un autre passage sur les enregistrements du DynamicFrame.
def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1— Colonnes de ceDynamicFrameà utiliser pour la jointure.keys2— colonnes deframe2à utiliser pour la jointure. Doit être de la même longueur quekeys1.frame2— AutreDynamicFrameà joindre.
Renvoie le résultat de l'exécution d'une équijointure avec frame2 à l'aide des clés spécifiées.
def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame construit en appliquant la fonction spécifiée 'f' à chaque enregistrement du DynamicFrame.
Comme cette méthode copie chaque enregistrement avant d'appliquer la fonction spécifiée, elle est sécurisée pour muter les enregistrements. Si la fonction de mappage lève une exception sur un enregistrement donné, celui-ci est marqué comme erreur, et le suivi de la pile est enregistré en tant que colonne dans l'enregistrement d'erreur.
def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame— trameDynamicFrameintermédiaire à fusionner.primaryKeys— liste des champs de clé primaire permettant de faire correspondre les enregistrements des tramesDynamicFramesource et intermédiaire.transformationContext— chaîne unique utilisée pour récupérer les métadonnées relatives à la transformation en cours (facultatif).options— chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour cette transformation.callSite— permet de fournir des informations contextuelles pour le signalement d'erreurs.stageThreshold– UneLong. Nombre d'erreurs identifiées dans la transformation donnée et à corriger lors du traitement.totalThreshold– UneLong. Nombre total d'erreurs identifiées dans la transformation donnée et qui doivent être corrigées lors du traitement.
Fusionne cette trame DynamicFrame avec une trame DynamicFrame intermédiaire basée sur les clés primaires spécifiées pour identifier les enregistrements. Les registres en double (registres avec les mêmes clés primaires) ne sont pas dédupliqués. Si aucun enregistrement ne correspond dans la trame intermédiaire, tous les enregistrements (y compris les doublons) sont conservés dans la source. Si la trame intermédiaire contient des enregistrements correspondants, les enregistrements de la trame intermédiaire remplacent ceux de la source dans AWS Glue.
La trame DynamicFrame renvoyée contient l'enregistrement A dans les cas suivants :
Si
Ase trouve à la fois dans la trame source et la trame intermédiaire, c'est la valeurAde la trame intermédiaire qui est renvoyée.Si
Ase trouve dans la table source et siA.primaryKeysne se trouve pas dans la tramestagingDynamicFrame(en d'autres termes,An'est pas mis à jour dans la table intermédiaire).
La trame source et la trame intermédiaire n'ont pas besoin d'avoir le même schéma.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))def printSchema
def printSchema : Unit
Imprime le schéma du DynamicFrame sur stdout dans un format compréhensible par les utilisateurs.
def recomputeSchema
def recomputeSchema : Schema
Force le recalcul d'un schéma. Ceci nécessite une analyse des données, mais peut « resserrer » le schéma si certains champs du schéma actuel ne sont pas présents dans les données.
Renvoie le schéma recalculé.
def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName— nom à utiliser pour leDynamicFramede base dans la sortie. Les tramesDynamicFramequi sont créés par les tableaux pivotants commencent avec ce préfixe.stagingPath— chemin Amazon Simple Storage Service (Amazon S3) pour écrire des données intermédiaires.options— crée les relations entre les options et la configuration. Non utilisé actuellement.
Aplanit toutes les structures imbriquées et pivote les tableaux en tables distinctes.
Vous pouvez utiliser cette opération pour préparer les données profondément imbriquées en vue de leur ingestion dans une base de données relationnelle. Les structs imbriquées sont mises à plat de la même manière que la transformation unnest. De plus, les tableaux sont pivotés en tables distinctes et chaque élément du tableau devient une ligne. Par exemple, supposons que vous ayez une trame DynamicFrame avec les données suivantes.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
Exécutez le code suivant.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
Il génère deux tables. La première table est nommée « people » et contient les éléments suivants.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
Ici, le tableau « friends » a été remplacé par une clé de jointure générée automatiquement. Une table séparée nommée people.friends est créée avec le contenu suivant.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
Dans ce tableau, « id » est une clé de jointure qui identifie de quel enregistrement provenait l'élément de tableau, « index » fait référence à la position dans le tableau d'origine et « val » est l'entrée réelle du tableau.
La méthode relationalize renvoie la séquence de DynamicFrame créée en appliquant ce processus de façon récursive à tous les tableaux.
Note
La bibliothèque AWS Glue génère automatiquement les clés de jointure des nouvelles tables. Pour vous assurer que les clés de jointure sont uniques au travers des exécutions de tâche, vous devez activer les marque-pages de tâche.
def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName— Nom d'origine de la colonne.newName— Nouveau nom de la colonne.
Renvoie un nouveau DynamicFrame contenant le champ spécifié renommé.
Cette méthode peut être utilisée pour renommer les champs imbriqués. Par exemple, le code suivant remplace le nom state par state_code dans la structure de l'adresse.
{{{ df.renameField("address.state", "address.state_code") }}}
def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame avec les partitions numPartitions.
def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption— Action à appliquer aux colonnesChoiceTypenon listées dans la séquence des spécifications.database— base de données Data Catalog à utiliser avec l'actionmatch_catalog.tableName— table Data Catalog à utiliser avec l'actionmatch_catalog.
Renvoie un nouveau DynamicFrame en remplaçant un ou plusieurs ChoiceType avec un type plus spécifique.
Il existe deux façons d'utiliser resolveChoice. La première consiste à spécifier une séquence de colonnes spécifiques et la façon de les résoudre. Celles-ci sont spécifiés en tant que tuples composés de paires (colonne, action).
Les actions possibles sont les suivantes :
cast:type— Tente de convertir toutes les valeurs dans le type spécifié.make_cols– Convertit chaque type distinct en une colonne portant le nomcolumnName_type.make_struct— Convertit une colonne en une structure avec des clés pour chaque type distinct.project:type— Retient uniquement les valeurs du type spécifié.
L'autre mode pour resolveChoice consiste à spécifier une seule résolution pour tous les ChoiceTypes. Vous pouvez utiliser cela lorsque la liste complète des ChoiceTypes est inconnue avant l'exécution. En plus des actions répertoriées précédemment, ce mode prend également en charge l'action suivante :
match_catalogChoiceType— Tente de convertir chaque dans le type correspondant de la table de catalogue spécifiée.
Exemples :
Résolvez la colonne user.id en la convertissant en int et faites que le champ address conserve uniquement les structs.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
Résolvez tous les objets ChoiceType en convertissant chaque choix en colonne séparée.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
Résolvez tous les objets ChoiceType en les convertissant dans les types de la table de catalogue spécifiée.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
def schema
def schema : Schema
Renvoie le schéma du DynamicFrame.
Le schéma renvoyé est assuré de contenir chaque champ présent dans un enregistrement de ce DynamicFrame. Mais dans un petit nombre de cas, il peut aussi contenir des champs supplémentaires. La méthode unnest peut être utilisée pour « resserrer » le schéma basé sur les enregistrements du DynamicFrame.
def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un champ unique comme DynamicFrame.
def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths— Séquence de noms de colonnes à sélectionner.
Renvoie un nouveau DynamicFrame contenant les colonnes spécifiées.
Note
La méthode selectFields peut uniquement être utilisée pour sélectionner les colonnes de niveau supérieur. La méthode applyMapping peut être utilisée pour sélectionner les colonnes imbriquées.
def show
def show( numRows : Int = 20 ) : Unit
numRows— Nombre de lignes à imprimer.
Imprime les lignes du DynamicFrame au format JSON.
Def simplifyDDBJson
Les exportations DynamoDB avec le connecteur d’exportation AWS Glue DynamoDB donnent lieu à des fichiers JSON de structures imbriquées spécifiques. Pour en savoir plus, consultez Data objects. simplifyDDBJson Simplifie les colonnes imbriquées dans un DynamicFrame contenant ce type de données et renvoie un nouveau DynamicFrame simplifié. S’il existe plusieurs types ou un type Carte contenu dans un type Liste, les éléments de la liste ne seront pas simplifiés. Cette méthode prend uniquement en charge les données au format JSON d’exportation DynamoDB. Envisagez unnest pour effectuer des modifications similaires sur d’autres types de données.
def simplifyDDBJson() : DynamicFrame
Cette méthode n’utilise aucun paramètre.
Exemple d'entrée
Tenez compte du schéma suivant généré par une exportation DynamoDB :
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
Exemple de code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
La transformation simplifyDDBJson simplifie cette exportation ainsi :
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Transmettez la transformation qui renvoie les mêmes enregistrements, mais écrit un sous-ensemble d'enregistrements en tant qu'effet secondaire.
path— chemin d'accès dans Amazon S3 dans lequel écrire la sortie, sous la formes3://bucket//path.options— CarteJsonOptionsfacultative décrivant le comportement d'échantillonnage.
Renvoie un DynamicFrame contenant les mêmes enregistrements que celui-ci.
Par défaut, écrit 100 enregistrements arbitraires à l'emplacement spécifié par path. Ce comportement peut être personnalisé à l'aide de la carte options. Les clés valides incluent les suivantes :
topk— spécifie le nombre total d'enregistrements écrits. La valeur par défaut est 100.prob— indique la probabilité (sous forme de décimale) qu'un enregistrement individuel soit inclus. La valeur par défaut est 1.
Par exemple, l'appel suivant échantillonne l'ensemble de données en sélectionnant chaque enregistrement avec une probabilité de 20 % et en s'arrêtant après l'écriture de 200 enregistrements.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths— Chemins à inclure dans le premierDynamicFrame.
Renvoie une séquence de deux DynamicFrames. Le premier DynamicFramecontient les chemins d'accès spécifiés et le deuxième contient toutes les autres colonnes.
Exemple
Cet exemple montre comment utiliser un DynamicFrame créé à partir de la persons table dans la legislators base de données du AWS Catalogue de données Glue et divise le DynamicFrame en deux, les champs spécifiés entrant dans le premier DynamicFrame et les autres champs entrant dans un deuxième DynamicFrame. L'exemple choisit ensuite le premier DynamicFrame dans le résultat.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
Fractionne les lignes en fonction des prédicats qui comparent les colonnes aux constantes.
paths— Colonnes à utiliser pour la comparaison.values— Valeurs constantes à utiliser pour la comparaison.operators— Opérateurs à utiliser pour la comparaison.
Renvoie une séquence de deux DynamicFrames. Le premier contient les lignes pour lesquelles le prédicat a la valeur true et le deuxième contient celles pour lesquelles la valeur est false.
Les prédicats sont spécifiés en utilisant trois séquences : « paths » contient les noms de colonne (possiblement imbriqués), « values » contient les valeurs constantes de comparaison et « operators » contient les opérateurs à utiliser pour la comparaison. Les trois séquences doivent être de la même longueur : le nème opérateur est utilisé pour comparer la nème colonne à la nème valeur.
Chaque opérateur doit être « != », « = », « <= », « < », « >= » ou « > ».
Par exemple, l'appel suivant scinde une trame DynamicFrame afin que la première trame de sortie contienne les enregistrements des personnes de plus de 65 ans des États-Unis et que la deuxième contienne tous les autres enregistrements.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
def stageErrorsCount
def stageErrorsCount
Renvoie le nombre d'enregistrements d'erreur créés pendant le calcul du DynamicFrame. Cela exclut les erreurs des opérations précédentes qui ont été transmises au DynamicFrame comme entrée.
def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
Convertit le DynamicFrame en un DataFrame Apache Spark SQL avec le même schéma et les mêmes enregistrements.
Note
Étant donné que les DataFrames ne prennent pas en charge les ChoiceTypes, cette méthode convertit automatiquement les colonnes ChoiceType en StructTypes. Pour plus d'informations et pour connaître les options de résolution des choix, consultez resolveChoice.
def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path— colonne à analyser. Doit être de type string (chaîne) ou binary (binaire).format— Le format à utiliser pour l'analyse.optionString— Options à transmettre au format, telles que le séparateur CSV.
Analyse une chaîne ou une colonne binaire intégrée selon le format spécifié. Les colonnes analysées sont imbriquées sous une structure avec le nom de colonne d'origine.
Par exemple, supposons que vous ayez un fichier CSV avec une colonne JSON imbriquée.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
Après une analyse initiale, vous obtenez une trame DynamicFrame avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: string }}}
Vous pouvez appeler unbox au niveau de la colonne « address » pour analyser les composants spécifiques.
{{{ df.unbox("address", "json") }}}
Vous obtenez ainsi une trame DynamicFrame avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
Renvoie un nouveau DynamicFrame avec tous les structures imbriquées mises à plat. Les noms sont construits à l'aide du caractère « . » (point).
Par exemple, supposons que vous ayez une trame DynamicFrame avec le schéma suivant.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
L'appel suivant désimbrique la struct « address ».
{{{ df.unnest() }}}
Le schéma obtenu est le suivant.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
Cette méthode désimbrique également les structs imbriquées à l'intérieur des tableaux. Mais pour des raisons historiques, le nom de ces champs est précédé par le nom du tableau englobant et par « .val ».
def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
Supprime l'imbrication des colonnes imbriquées dans un DynamicFrame qui se trouvent spécifiquement dans la structure JSON DynamoDB, et renvoie une nouvelle version non imbriquée DynamicFrame. Les colonnes d'un tableau de types de structure ne seront pas non-imbriquées. Notez qu'il s'agit d'un type spécifique de transformation non imbriquée qui se comporte différemment de la transformation unnest normale et nécessite que les données soient déjà dans la structure JSON DynamoDB. Pour plus d'informations, consultez JSON DynamoDB.
Par exemple, le schéma d'une lecture d'exportation avec la structure JSON DynamoDB pourrait ressembler à ce qui suit :
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
La transformation unnestDDBJson() convertirait ceci en :
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
L'exemple de code suivant montre comment utiliser le connecteur d'exportation DynamoDB AWS Glue, comment utiliser la suppression de l'imbrication de JSON DynamoDB, puis imprimer le nombre de partitions :
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema— fonction qui renvoie le schéma à utiliser. Spécifiée en tant que fonction de paramètre zéro pour reporter un calcul potentiellement onéreux.
Définit le schéma du DynamicFrame sur la valeur spécifiée. La fonction est principalement utilisée en interne pour éviter un recalcul du schéma coûteux. Le schéma transmis doit contenir toutes les colonnes présentes dans les données.
def withName
def withName( name : String ) : DynamicFrame
name— Nouveau nom à utiliser.
Renvoie une copie du DynamicFrame avec un nouveau nom.
def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
Renvoie une copie du DynamicFrame avec le contexte de transformation spécifié.