Connettore DynamoDB con supporto Spark DataFrame - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connettore DynamoDB con supporto Spark DataFrame

Il connettore DynamoDB con supporto DataFrame Spark consente di leggere e scrivere su tabelle in DynamoDB utilizzando Spark. DataFrame APIs I passaggi di configurazione dei connettori sono gli stessi del connettore DynamicFrame basato e sono disponibili qui.

Per caricare la libreria di connettori DataFrame basata, assicurati di collegare una connessione DynamoDB al job Glue.

Nota

L'interfaccia utente della console Glue attualmente non supporta la creazione di una connessione DynamoDB. Puoi usare Glue CLI (CreateConnection) per creare una connessione DynamoDB:

aws glue create-connection \ --connection-input '{ "Name": "my-dynamodb-connection", "ConnectionType": "DYNAMODB", "ConnectionProperties": {}, "ValidateCredentials": false, "ValidateForComputeEnvironments": ["SPARK"] }'

Dopo aver creato la connessione DynamoDB, puoi collegarla al tuo lavoro Glue tramite CLI CreateJob(UpdateJob,) o direttamente nella pagina «Dettagli del lavoro»:

Dopo esserti assicurato che una connessione con DYNAMODB Type sia collegata al tuo lavoro Glue, puoi utilizzare le seguenti operazioni di lettura, scrittura ed esportazione dal DataFrame connettore basato.

Lettura e scrittura su DynamoDB con il connettore basato DataFrame

I seguenti esempi di codice mostrano come leggere e scrivere su tabelle DynamoDB tramite DataFrame il connettore basato. Mostrano la lettura da una tabella e la scrittura su un'altra tabella.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) spark = glueContext.spark_session job = Job(glue_context) job.init(args["JOB_NAME"], args) # Read from DynamoDB df = spark.read.format("dynamodb") \ .option("dynamodb.input.tableName", "test-source") \ .option("dynamodb.throughput.read.ratio", "0.5") \ .option("dynamodb.consistentRead", "false") \ .load() print(df.rdd.getNumPartitions()) # Write to DynamoDB df.write \ .format("dynamodb") \ .option("dynamodb.output.tableName", "test-sink") \ .option("dynamodb.throughput.write.ratio", "0.5") \ .option("dynamodb.item.size.check.enabled", "true") \ .save() job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val spark = glueContext.getSparkSession val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val df = spark.read .format("dynamodb") .option("dynamodb.input.tableName", "test-source") .option("dynamodb.throughput.read.ratio", "0.5") .option("dynamodb.consistentRead", "false") .load() print(df.rdd.getNumPartitions) df.write .format("dynamodb") .option("dynamodb.output.tableName", "test-sink") .option("dynamodb.throughput.write.ratio", "0.5") .option("dynamodb.item.size.check.enabled", "true") .save() job.commit() } }

Utilizzo dell'esportazione DynamoDB tramite il connettore basato DataFrame

L'operazione di esportazione è preferita all'operazione di lettura per tabelle DynamoDB di dimensioni superiori a 80 GB. I seguenti esempi di codice mostrano come leggere da una tabella, esportare in S3 e stampare il numero di partizioni tramite il connettore basato. DataFrame

Nota

La funzionalità di esportazione DynamoDB è disponibile tramite l'oggetto Scala. DynamoDBExport Gli utenti Python possono accedervi tramite l'interoperabilità JVM di Spark o utilizzare l'SDK AWS per Python (boto3) con l'API DynamoDB. ExportTableToPointInTime

Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.{GlueArgParser, Job} import org.apache.spark.SparkContext import glue.spark.dynamodb.DynamoDBExport import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val spark = glueContext.getSparkSession val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val options = Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "arn:aws:dynamodb:us-east-1:123456789012:table/my-table", "dynamodb.s3.bucket" -> "my-s3-bucket", "dynamodb.s3.prefix" -> "my-s3-prefix", "dynamodb.simplifyDDBJson" -> "true" ) val df = DynamoDBExport.fullExport(spark, options) print(df.rdd.getNumPartitions) df.count() Job.commit() } }

Opzioni di configurazione

Opzioni di lettura

Opzione Description Predefinito
dynamodb.input.tableName Nome della tabella DynamoDB (obbligatorio) -
dynamodb.throughput.read Le unità di capacità di lettura (RCU) da utilizzare. Se non specificato, dynamodb.throughput.read.ratio viene utilizzato per il calcolo. -
dynamodb.throughput.read.ratio Il rapporto tra unità di capacità di lettura (RCU) da utilizzare 0,5
dynamodb.table.read.capacity La capacità di lettura della tabella su richiesta utilizzata per il calcolo della velocità effettiva. Questo parametro è valido solo nelle tabelle di capacità su richiesta. L'impostazione predefinita sono le unità di lettura a throughput caldo. -
dynamodb.splits Definisce il numero di segmenti utilizzati nelle operazioni di scansione parallela. Se non viene fornito, il connettore calcolerà un valore predefinito ragionevole. -
dynamodb.consistentRead Se utilizzare letture fortemente coerenti FALSE
dynamodb.input.retry Definisce quanti tentativi eseguiamo quando esiste un'eccezione riprovabile. 10

Opzioni di scrittura

Opzione Description Predefinito
dynamodb.output.tableName Nome della tabella DynamoDB (obbligatorio) -
dynamodb.throughput.write Le unità di capacità di scrittura (WCU) da utilizzare. Se non specificato, dynamodb.throughput.write.ratio viene utilizzato per il calcolo. -
dynamodb.throughput.write.ratio Il rapporto tra unità di capacità di scrittura (WCU) da utilizzare 0,5
dynamodb.table.write.capacity La capacità di scrittura della tabella su richiesta utilizzata per il calcolo della velocità effettiva. Questo parametro è valido solo nelle tabelle di capacità su richiesta. L'impostazione predefinita sono le unità di scrittura con throughput caldo. -
dynamodb.item.size.check.enabled Se vero, il connettore calcola la dimensione dell'elemento e interrompe l'operazione se la dimensione supera la dimensione massima, prima di scrivere nella tabella DynamoDB. TRUE
dynamodb.output.retry Definisce il numero di tentativi da eseguire in presenza di un'eccezione riutilizzabile. 10

Opzioni di esportazione

Opzione Description Predefinito
dynamodb.export Se impostato su, ddb abilita il connettore di esportazione AWS Glue DynamoDB dove verrà richiamato un ExportTableToPointInTimeRequet nuovo connettore durante il AWS processo Glue. Verrà generata una nuova esportazione con la posizione passata da dynamodb.s3.bucket e dynamodb.s3.prefix. Se impostato su, s3 abilita il connettore di esportazione AWS Glue, DynamoDB, ma salta la creazione di una nuova esportazione DynamoDB e utilizza invece l'and dynamodb.s3.bucket come posizione Amazon S3 dynamodb.s3.prefix del passato esportato da quella tabella. ddb
dynamodb.tableArn La tabella DynamoDB da cui leggere. Obbligatorio se dynamodb.export è impostato su ddb.
dynamodb.simplifyDDBJson Se impostato sutrue, esegue una trasformazione per semplificare lo schema della struttura JSON di DynamoDB presente nelle esportazioni. FALSE
dynamodb.s3.bucket Il bucket S3 per archiviare i dati temporanei durante l'esportazione in DynamoDB (obbligatorio)
dynamodb.s3.prefix Il prefisso S3 per archiviare dati temporanei durante l'esportazione in DynamoDB
dynamodb.s3.bucketOwner Indicare il proprietario del bucket necessario per l'accesso ad Amazon S3 su più account
dynamodb.s3.sse.algorithm Tipo di crittografia utilizzato nel bucket in cui verranno archiviati i dati temporanei. I valori validi sono AES256 e KMS.
dynamodb.s3.sse.kmsKeyId L'ID della chiave AWS KMS gestita utilizzata per crittografare il bucket S3 in cui verranno archiviati i dati temporanei (se applicabile).
dynamodb.exportTime point-in-timeA in cui deve essere effettuata l'esportazione. Valori validi: stringhe che rappresentano istanti ISO-8601.

Opzioni generali

Opzione Description Predefinito
dynamodb.sts.roleArn Il ruolo IAM (ARN) da assumere per l'accesso tra account diversi -
dynamodb.sts.roleSessionName Nome della sessione STS glue-dynamodb-sts-session
dynamodb.sts.region Regione per il client STS (per l'assunzione di ruoli interregionali) Uguale all'opzione region