Uso de OSS de Delta Lake con EMR sin servidor - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de OSS de Delta Lake con EMR sin servidor

Versiones 6.9.0 y posteriores de Amazon EMR

nota

Las versiones 7.0.0 y posteriores de Amazon EMR utilizan Delta Lake 3.0.0, que cambia el nombre del archivo delta-core.jar a delta-spark.jar. Si utiliza Amazon EMR 7.0.0 o posterior, asegúrese de especificar delta-spark.jar en la configuración.

Las versiones Amazon EMR 6.9.0 y versiones posteriores incluyen Delta Lake, por lo que ya no tiene que empaquetar Delta Lake usted mismo ni proporcionar la marca --packages con sus trabajos de EMR sin servidor.

  1. Cuando envíe trabajos EMR sin servidor, asegúrese de tener las siguientes propiedades de configuración e incluir los siguientes parámetros en el campo sparkSubmitParameters.

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. Cree un delta_sample.py local para probar la creación y lectura de una tabla Delta.

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. Con el AWS CLI, sube el delta_sample.py archivo a tu bucket de Amazon S3. A continuación, utilice el comando start-job-run para enviar un trabajo a una aplicación EMR sin servidor existente.

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Para usar bibliotecas de Python con Delta Lake, puede agregar la biblioteca delta-core empaquetándola como una dependencia o usándola como una imagen personalizada.

Alternativamente, puede usar SparkContext.addPyFile para agregar las bibliotecas de Python desde el archivo JAR delta-core:

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Versiones 6.8.0 y posteriores de Amazon EMR

Si utiliza Amazon EMR 6.8.0 o una versión anterior, siga estos pasos para usar Delta Lake OSS con sus aplicaciones EMR sin servidor.

  1. Para crear una versión de código abierto de Delta Lake que sea compatible con la versión de Spark de su aplicación Amazon EMR Serverless, vaya a Delta GitHub y siga las instrucciones.

  2. Cargue las bibliotecas de Delta Lake en un bucket de Amazon S3 de su Cuenta de AWS.

  3. Cuando envíe trabajos EMR sin servidor en la configuración de la aplicación, incluya los archivos JAR de Delta Lake que se encuentran ahora en el bucket.

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. Para asegurarse de que puede leer y escribir en una tabla de Delta, realice una PySpark prueba de muestra.

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show