

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Utilice un clúster de Delta Lake con Spark
<a name="Deltausing-cluster-spark"></a>

A partir de la versión 6.9.0 de Amazon EMR, puede usar Delta Lake con su clúster de Spark sin necesidad de incluir acciones de arranque. Para los lanzamientos 6.8.0 y anteriores de Amazon EMR, puede utilizar acciones de arranque para preinstalar todas las dependencias necesarias. 

En los siguientes ejemplos, se utiliza AWS CLI para trabajar con Delta Lake en un clúster de Amazon EMR Spark. 

Para usar Delta Lake en Amazon EMR con AWS Command Line Interface, primero cree un clúster. Para obtener información sobre cómo especificar la clasificación de Delta Lake con AWS Command Line Interface, consulte [Suministrar una configuración mediante la AWS Command Line Interface al crear un clúster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli) o [Proporcionar una configuración con el SDK de Java al crear un clúster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk).

1. Cree un archivo, `configurations.json`, con el siguiente contenido:

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. Cree un clúster con la siguiente configuración y sustituya el ejemplo de Amazon S3 **bucket path** y **subnet ID** por el suyo propio.

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   Como alternativa, puede crear un clúster de Amazon EMR y una aplicación de Spark con los siguientes archivos como dependencias de JAR en un trabajo de Spark:

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**nota**  
Si utiliza Amazon EMR versión 7.0.0 o superior, `/usr/share/aws/delta/lib/delta-spark.jar` utilícela en lugar de. `/usr/share/aws/delta/lib/delta-core.jar`

   Para obtener más información, consulte [Envío de aplicaciones](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications).

   Para incluir una dependencia jar en un trabajo de Spark, agregue las siguientes propiedades de configuración a la aplicación Spark:

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   Para obtener más información sobre las dependencias de trabajos de Spark, consulte [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management).

   Si utiliza Amazon EMR versión 7.0.0 o superior, añada la `/usr/share/aws/delta/lib/delta-spark.jar` configuración en su lugar.

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## Inicio de sesión de Spark para Delta Lake
<a name="Deltainitialize-spark-session"></a>

En los siguientes ejemplos se muestra cómo iniciar el intérprete de comandos interactivo de Spark, utilizar el envío de Spark o usar Cuadernos de Amazon EMR para trabajar con Delta Lake en Amazon EMR.

------
#### [ spark-shell ]

1. Conéctese al nodo principal mediante SSH. Para obtener más información, consulte [Conectarse al nodo principal mediante SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) en la *Guía de administración de Amazon EMR*.

1. Introduzca el siguiente comando para iniciar el shell de Spark. Para usar la PySpark carcasa, sustitúyala por`spark-shell`. `pyspark`

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Si utiliza Amazon EMR versión 6.15.0 o posteriores, también debe usar las siguientes configuraciones para usar un control de acceso detallado basado en Lake Formation con Delta Lake.

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. Conéctese al nodo principal mediante SSH. Para obtener más información, consulte [Conectarse al nodo principal mediante SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) en la *Guía de administración de Amazon EMR*.

1. Introduzca el siguiente comando para iniciar la sesión de Spark para Delta Lake.

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Si utiliza Amazon EMR versión 6.15.0 o posteriores, también debe usar las siguientes configuraciones para usar un control de acceso detallado basado en Lake Formation con Delta Lake.

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

Para iniciar una sesión de Spark con Amazon EMR Studio configure su sesión de Spark con el comando mágico **%%configure** de su cuaderno de Amazon EMR, como en el siguiente ejemplo. Para obtener más información, consulte [Usar la magia de Cuadernos de Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics) en la *Guía de administración de Amazon EMR*.

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

Si utiliza Amazon EMR versión 6.15.0 o posteriores, también debe usar las siguientes configuraciones para usar un control de acceso detallado basado en Lake Formation con Delta Lake.

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## Escritura en una tabla de Delta Lake
<a name="Deltawrite-to-table"></a>

El siguiente ejemplo muestra cómo crear un conjunto de datos de Delta Lake DataFrame y cómo escribirlo como tal. El ejemplo muestra cómo trabajar con conjuntos de datos con el intérprete de comandos de Spark mientras está conectado al nodo principal usando SSH como usuario predeterminado de Hadoop. 

**nota**  
Para pegar muestras de código en el intérprete de comandos de Spark, escriba :paste en el símbolo del sistema, pegue el ejemplo y, a continuación, pulse CTRL \$1 D.

------
#### [ PySpark ]

Spark incluye un intérprete de comandos basado en Python, `pyspark`, que puede utilizar para crear prototipos de programas de Spark escritos en Python. Al igual que con `spark-shell`, invoque `pyspark` en el nodo principal.

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## Lectura de una tabla Delta Lake
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------