

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo di un cluster con Delta Lake installato
<a name="Deltausing-cluster"></a>

**Topics**
+ [Utilizzo di un cluster Delta Lake con Flink](Deltacluster-flink.md)
+ [Utilizzo di un cluster Delta Lake con Trino](Deltacluster-trino.md)
+ [Utilizzo di un cluster Delta Lake con Spark](Deltausing-cluster-spark.md)
+ [Usa un cluster Delta Lake con Spark and Glue AWS](Deltacluster-spark-glue.md)

# Utilizzo di un cluster Delta Lake con Flink
<a name="Deltacluster-flink"></a>

A partire da Amazon EMR rilascio 6.11, puoi utilizzare Delta Lake con il cluster Flink. Gli esempi seguenti lo utilizzano AWS CLI per lavorare con Delta Lake su un cluster Amazon EMR Flink.

**Nota**  
Amazon EMR supporta l' DataStream API Flink quando usi Delta Lake con un cluster Flink.

## Creazione di un cluster Delta Lake
<a name="Deltacreate-a-delta-cluster"></a>

1. Creare un file, `delta_configurations.json`, con i seguenti contenuti:

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. Crea un cluster con la seguente configurazione. Sostituisci il `example Amazon S3 bucket path` e l'`subnet ID` con i valori effettivi.

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## Inizializzazione di una sessione Flink yarn
<a name="Deltainit-flink-yarn"></a>

Per inizializzare una sessione Flink yarn, esegui il comando riportato di seguito:

```
flink-yarn-session -d
```

## Compilazione di un processo Flink con Delta Lake
<a name="Deltabuild-flink-with-delta-lake"></a>

Gli esempi seguenti mostrano come usare sbt o Maven per compilare il processo Flink con Delta Lake.

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) è uno strumento di compilazione per Scala che puoi usare con poca o nessuna configurazione quando disponi di piccoli progetti. 

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) è uno strumento di automazione della compilazione open-source della Apache Software Foundation. Con Maven, puoi compilare, pubblicare e distribuire un processo Flink con Delta Lake su Amazon EMR.

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## Scrittura in una tabella Delta con l'API Flink Datastream
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

Usa l'esempio seguente per creare un oggetto DeltaSink da scrivere sulla tabella con un `deltaTablePath:`

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Lettura da una tabella Delta con l'API Flink Datastream
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

Utilizzate l'esempio seguente per creare un valore limitato DeltaSource da leggere dalla tabella con un `deltaTablePath:`

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## Creazione di sink con supporto multi-cluster per Delta Lake standalone
<a name="Deltasink-creation-with-multi-cluster"></a>

Usa l'esempio seguente per DeltaSink creare una tabella su cui scrivere con [supporto a `deltaTablePath` e multicluster](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup):

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Esecuzione del processo Flink
<a name="Deltarun-flink-job"></a>

Per eseguire il processo, utilizza il comando seguente:

```
flink run FlinkJob.jar
```

# Utilizzo di un cluster Delta Lake con Trino
<a name="Deltacluster-trino"></a>

Con Amazon EMR rilascio 6.9.0 e successivi, puoi utilizzare Delta Lake con il cluster Trino.

In questo tutorial, useremo il AWS CLI per lavorare con Delta Lake sul cluster Amazon EMR Trino.

## 
<a name="Deltacluster-trino-create"></a>

**Creazione di un cluster Delta Lake**

1. Crea un file, `delta_configurations.json`, e imposta i valori per il catalogo scelto. Ad esempio, se desideri utilizzare il metastore Hive come catalogo, il file deve avere il seguente contenuto:

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}},  
       {"Classification":"trino-connector-delta",  
       "Properties":{"hive.metastore.uri":"thrift://localhost:9083"}}]
   ```

   Se desideri utilizzare il AWS Glue Catalog come negozio, il file deve avere il seguente contenuto:

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}},  
       {"Classification":"trino-connector-delta",  
       "Properties":{"hive.metastore":"glue"}}]
   ```

1. Crea un cluster con la seguente configurazione, sostituendo **example Amazon S3 bucket path** e **subnet ID** con i tuoi valori.

   ```
   aws emr create-cluster 
       --release-label emr-6.9.0   
       --applications Name=Trino  
       --configurations file://delta_configurations.json   
       --region us-east-1  --name My_Spark_Delta_Cluster  
       --log-uri  s3://amzn-s3-demo-bucket/  
       --instance-type m5.xlarge  
       --instance-count 2   
       --service-role EMR_DefaultRole_V2  
       --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## Inizializzazione della sessione Trino per Delta Lake
<a name="Deltainitialize-trino"></a>

Per inizializzare una sessione Trino, esegui il comando seguente

```
trino-cli --catalog delta
```

## Scrittura in una tabella Delta Lake
<a name="Deltatrino-write-table"></a>

Crea e scrivi nella tabella utilizzando i comandi SQL seguenti:

```
SHOW SCHEMAS;

CREATE TABLE default.delta_table (id  int, data varchar, category varchar) WITH 
( location =  's3://amzn-s3-demo-bucket/<prefix>');

INSERT INTO default.delta_table VALUES  (1,'a','c1'), (2,'b','c2'), (3,'c','c3');
```

## Lettura da una tabella Delta Lake
<a name="Deltatrino-read-table"></a>

Leggi dalla tabella con il comando SQL seguente:

```
SELECT * from default.delta_table;
```

# Utilizzo di un cluster Delta Lake con Spark
<a name="Deltausing-cluster-spark"></a>

A partire da Amazon EMR versione 6.9.0, puoi utilizzare Delta Lake con il cluster Spark senza dover eseguire operazioni di bootstrap. Per Amazon EMR rilascio 6.8.0 e precedenti, puoi utilizzare le operazioni di bootstrap per preinstallare le dipendenze necessarie. 

Gli esempi seguenti lo utilizzano AWS CLI per lavorare con Delta Lake su un cluster Amazon EMR Spark. 

Per utilizzare Delta Lake su Amazon EMR con AWS Command Line Interface, crea innanzitutto un cluster. Per informazioni su come specificare la classificazione Delta Lake con AWS Command Line Interface, consulta [Fornire una configurazione utilizzando AWS Command Line Interface quando si crea un cluster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli) o [Fornire una configurazione con Java SDK quando si crea un](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk) cluster.

1. Creare un file, `configurations.json`, con i seguenti contenuti:

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. Crea un cluster con la seguente configurazione, sostituendo **bucket path** e **subnet ID** di Amazon S3 di esempio con i tuoi valori.

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   In alternativa, puoi creare un cluster Amazon EMR e l'applicazione Spark con i file seguenti come dipendenze JAR in un processo Spark:

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**Nota**  
Se utilizzi Amazon EMR versione 7.0.0 o successive, usa invece di. `/usr/share/aws/delta/lib/delta-spark.jar` `/usr/share/aws/delta/lib/delta-core.jar`

   Per ulteriori informazioni, consulta [Invio di applicazioni](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications).

   Per includere una dipendenza jar in un processo Spark, è possibile aggiungere le seguenti proprietà di configurazione all'applicazione Spark:

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   Per ulteriori informazioni sulle dipendenze dei processi Spark, consulta la sezione [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management) (Gestione delle dipendenze).

   Se utilizzi Amazon EMR versione 7.0.0 o successive, aggiungi invece la configurazione. `/usr/share/aws/delta/lib/delta-spark.jar`

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## Inizializzazione della sessione Spark per Delta Lake
<a name="Deltainitialize-spark-session"></a>

Negli esempi seguenti viene illustrato come avviare la shell (interprete di comandi) interattiva Spark, utilizzare Spark-submit o utilizzare i notebook Amazon EMR per lavorare con Delta Lake su Amazon EMR.

------
#### [ spark-shell ]

1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione [Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) (Connessione al nodo primario tramite SSH) nella *Guida alla gestione di Amazon EMR*.

1. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, sostituiscila con. `spark-shell` `pyspark`

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Se esegui le versioni 6.15.0 o successive di Amazon EMR, devi anche utilizzare le seguenti configurazioni per utilizzare un controllo granulare degli accessi basato su Lake Formation with Delta Lake.

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione [Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) (Connessione al nodo primario tramite SSH) nella *Guida alla gestione di Amazon EMR*.

1. Inserisci il seguente comando per avviare la sessione Spark per Delta Lake.

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Se esegui le versioni 6.15.0 o successive di Amazon EMR, devi anche utilizzare le seguenti configurazioni per utilizzare un controllo granulare degli accessi basato su Lake Formation with Delta Lake.

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

Per inizializzare una sessione Spark utilizzando i notebook Amazon EMR Studio, configura la sessione Spark utilizzando il comando **%%configure** magic nel notebook Amazon EMR, come nell'esempio seguente. Per ulteriori informazioni, consulta [Utilizzo di EMR Notebooks magics](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics) nella *Guida alla gestione di Amazon EMR*.

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

Se esegui le versioni 6.15.0 o successive di Amazon EMR, devi anche utilizzare le seguenti configurazioni per utilizzare un controllo granulare degli accessi basato su Lake Formation with Delta Lake.

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## Scrittura in una tabella Delta Lake
<a name="Deltawrite-to-table"></a>

L'esempio seguente mostra come creare DataFrame e scrivere un set di dati Delta Lake. In questa sezione, gli esempi illustrano l'utilizzo di set di dati con la shell (interprete di comandi) Spark connessi al nodo primario utilizzando SSH come utente hadoop predefinito. 

**Nota**  
Per incollare gli esempi di codice nella shell Spark, digitare :paste al prompt, incollare l'esempio e premere CTRL \$1 D.

------
#### [ PySpark ]

Spark include anche una shell (interprete di comandi) basata su Python, `pyspark`, che puoi utilizzare per realizzare prototipi di programmi Spark scritti in Python. Proprio come con `spark-shell`, invoca `pyspark` sul nodo primario.

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## Lettura da una tabella Delta Lake
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------

# Usa un cluster Delta Lake con Spark and Glue AWS
<a name="Deltacluster-spark-glue"></a>

Per utilizzare AWS Glue Catalog come Metastore per le tabelle Delta Lake, crea un cluster con i seguenti passaggi. Per informazioni su come specificare la classificazione Delta Lake utilizzando AWS Command Line Interface, consulta [Fornire una configurazione utilizzando AWS Command Line Interface quando si crea un cluster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli) o [Fornire una configurazione utilizzando Java SDK quando si](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk) crea un cluster.

**Creazione di un cluster Delta Lake**

1. Creare un file, `configurations.json`, con i seguenti contenuti:

   ```
   [{"Classification":"delta-defaults",  
   "Properties":{"delta.enabled":"true"}},
   {"Classification":"spark-hive-site",
   "Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]
   ```

1. Crea un cluster con la seguente configurazione, sostituendo **example Amazon S3 bucket path** e **subnet ID** con i tuoi valori.

   ```
   aws emr create-cluster 
       --release-label  emr-6.9.0  
       --applications Name=Spark  
       --configurations file://delta_configurations.json 
       --region us-east-1  
       --name My_Spark_Delta_Cluster  
       --log-uri  s3://amzn-s3-demo-bucket/  
       --instance-type m5.xlarge  
       --instance-count 2   
       --service-role EMR_DefaultRole_V2  
       --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```