

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Connessione a DynamoDB con Amazon EMR Serverless
<a name="using-ddb-connector"></a>

In questo tutorial, carichi un sottoinsieme di dati dal [Board on Geographic Names degli Stati Uniti su](https://www.usgs.gov/us-board-on-geographic-names) un bucket Amazon S3 e poi usi Hive o Spark su Amazon EMR Serverless per copiare i dati in una tabella Amazon DynamoDB per l'esecuzione di query. 

## Fase 1: caricare i dati in un bucket Amazon S3
<a name="using-ddb-connector-s3"></a>

Per creare un bucket Amazon S3, segui le istruzioni in [Creazione di un bucket nella Guida per l'utente della console](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-bucket.html) di *Amazon Simple Storage Service*. Sostituisci i riferimenti a `amzn-s3-demo-bucket` con il nome del bucket appena creato. Ora la tua applicazione EMR Serverless è pronta per eseguire i lavori.

1. Scarica l'archivio di dati di esempio `features.zip` con il seguente comando.

   ```
   wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
   ```

1. Estrai il `features.txt` file dall'archivio e accedi alle prime righe del file:

   ```
   unzip features.zip
   head features.txt
   ```

   Il risultato dovrebbe apparire simile al seguente.

   ```
   1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
   875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
   1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
   26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
   1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
   1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558
   1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024
   533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0
   829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671
   541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
   ```

   I campi in ogni riga qui indicano un identificatore univoco, un nome, un tipo di elemento naturale, lo stato, la latitudine in gradi, la longitudine in gradi e l'altezza in piedi.

1. Carica i tuoi dati su Amazon S3

   ```
   aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/
   ```

## Fase 2: Creare una tabella Hive
<a name="using-ddb-connector-create-table"></a>

Usa Apache Spark o Hive per creare una nuova tabella Hive che contenga i dati caricati in Amazon S3.

------
#### [ Spark ]

Per creare una tabella Hive con Spark, esegui il seguente comando.

```
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

sparkSession.sql("CREATE TABLE hive_features \
    (feature_id BIGINT, \
    feature_name STRING, \
    feature_class STRING, \
    state_alpha STRING, \
    prim_lat_dec DOUBLE, \
    prim_long_dec DOUBLE, \
    elev_in_ft BIGINT) \
    ROW FORMAT DELIMITED \
    FIELDS TERMINATED BY '|' \
    LINES TERMINATED BY '\n' \
    LOCATION 's3://amzn-s3-demo-bucket/features';")
```

Ora hai una tabella Hive popolata con i dati del file. `features.txt` Per verificare che i dati siano presenti nella tabella, esegui una query SQL Spark come mostrato nell'esempio seguente.

```
sparkSession.sql(
    "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
```

------
#### [ Hive ]

Per creare una tabella Hive con Hive, esegui il comando seguente.

```
CREATE TABLE hive_features
    (feature_id             BIGINT,
    feature_name            STRING ,
    feature_class           STRING ,
    state_alpha             STRING,
    prim_lat_dec            DOUBLE ,
    prim_long_dec           DOUBLE ,
    elev_in_ft              BIGINT)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    LINES TERMINATED BY '\n'
    LOCATION 's3://amzn-s3-demo-bucket/features';
```

Ora hai una tabella Hive che contiene i dati del file. `features.txt` Per verificare che i dati siano presenti nella tabella, esegui una query HiveQL, come mostrato nell'esempio seguente.

```
SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;
```

------

## Fase 3: Copiare i dati su DynamoDB
<a name="using-ddb-connector-copy"></a>

Usa Spark o Hive per copiare i dati in una nuova tabella DynamoDB.

------
#### [ Spark ]

[Per copiare i dati dalla tabella Hive creata nel passaggio precedente in DynamoDB, **segui i passaggi** 1-3 in Copiare i dati su DynamoDB.](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) Questo crea una nuova tabella DynamoDB chiamata. `Features` È quindi possibile leggere i dati direttamente dal file di testo e copiarli nella tabella DynamoDB, come illustrato nell'esempio seguente.

```
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext

import scala.collection.JavaConverters._

object EmrServerlessDynamoDbTest {

    def main(args: Array[String]): Unit = {
    
        jobConf.set("dynamodb.input.tableName", "Features")
        jobConf.set("dynamodb.output.tableName", "Features")
        jobConf.set("dynamodb.region", "region")

        jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
        jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    
        val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/")
            .map(row => {
                val line = row.split("\\|")
                val item = new DynamoDBItemWritable()
                
                val elevInFt = if (line.length > 6) {
                    new AttributeValue().withN(line(6))
                } else {
                    new AttributeValue().withNULL(true)
                }
                
                item.setItem(Map(
                    "feature_id" -> new AttributeValue().withN(line(0)), 
                    "feature_name" -> new AttributeValue(line(1)), 
                    "feature_class" -> new AttributeValue(line(2)), 
                    "state_alpha" -> new AttributeValue(line(3)), 
                    "prim_lat_dec" -> new AttributeValue().withN(line(4)), 
                    "prim_long_dec" -> new AttributeValue().withN(line(5)),
                    "elev_in_ft" -> elevInFt)
                    .asJava)
                (new Text(""), item)
        })
        rdd.saveAsHadoopDataset(jobConf)
    }
}
```

------
#### [ Hive ]

Per copiare i dati dalla tabella Hive creata nel passaggio precedente in DynamoDB, segui le istruzioni in [Copiare](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) i dati su DynamoDB.

------

## Fase 4: Interrogare i dati da DynamoDB
<a name="using-ddb-connector-query"></a>

Usa Spark o Hive per interrogare la tua tabella DynamoDB.

------
#### [ Spark ]

Per interrogare i dati dalla tabella DynamoDB creata nel passaggio precedente, usa Spark SQL o l'API Spark. MapReduce 

**Example — Interroga la tua tabella DynamoDB con Spark SQL**  
La seguente query SQL Spark restituisce un elenco di tutti i tipi di funzionalità in ordine alfabetico.  

```
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
    FROM ddb_features \
    ORDER BY feature_class;")
```
*La seguente query SQL Spark restituisce un elenco di tutti i lake che iniziano con la lettera M.*  

```
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
    FROM ddb_features \
    WHERE feature_class = 'Lake' \
    AND feature_name LIKE 'M%' \
    ORDER BY feature_name;")
```
La seguente query SQL Spark restituisce un elenco di tutti gli stati con almeno tre funzionalità superiori a un miglio.  

```
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
    FROM ddb_features \
    WHERE elev_in_ft > 5280 \
    GROUP by state_alpha, feature_class \
    HAVING COUNT(*) >= 3 \
    ORDER BY state_alpha, feature_class;")
```

**Example — Interroga la tua tabella DynamoDB con l'API Spark MapReduce**  
La seguente MapReduce query restituisce un elenco di tutti i tipi di feature in ordine alfabetico.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .map(pair => pair._2.get("feature_class").getS)
    .distinct()
    .sortBy(value => value)
    .toDF("feature_class")
```
La seguente MapReduce query restituisce un elenco di tutti i laghi che iniziano con la lettera *M.*  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
    .filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
    .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
    .sortBy(_._1)
    .toDF("feature_name", "state_alpha")
```
La seguente MapReduce query restituisce un elenco di tutti gli stati con almeno tre caratteristiche superiori a un miglio.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => pair._2.getItem)
    .filter(pair => pair.get("elev_in_ft").getN != null)
    .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
    .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
    .filter(pair => pair._2.size >= 3)
    .map(pair => (pair._1._1, pair._1._2, pair._2.size))
    .sortBy(pair => (pair._1, pair._2))
    .toDF("state_alpha", "feature_class", "count")
```

------
#### [ Hive ]

Per interrogare i dati dalla tabella DynamoDB creata nel passaggio precedente, segui le istruzioni in [Interrogare i dati nella](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.QueryDataInDynamoDB.html) tabella DynamoDB.

------

# Configurazione dell'accesso multi-account
<a name="using-ddb-connector-xaccount"></a>

Per configurare l'accesso tra più account per EMR Serverless, completare i seguenti passaggi. Nell'esempio, `AccountA` è l'account in cui hai creato l'applicazione Amazon EMR Serverless ed `AccountB` è l'account in cui si trova Amazon DynamoDB.

1. Crea una tabella DynamoDB in. `AccountB` Per ulteriori informazioni, consulta [Fase 1: Creare una tabella](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html).

1. Crea un ruolo `Cross-Account-Role-B` IAM in `AccountB` grado di accedere alla tabella DynamoDB.

   1. Accedi Console di gestione AWS e apri la console IAM all'indirizzo. [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)

   1. Scegli **Ruoli** e crea un nuovo ruolo chiamato`Cross-Account-Role-B`. Per ulteriori informazioni su come creare ruoli IAM, consulta [Creazione di ruoli IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html) nella *guida per l'utente*.

   1. Crea una policy IAM che conceda le autorizzazioni per accedere alla tabella DynamoDB tra account. Quindi, allega la policy IAM a `Cross-Account-Role-B`.

      Di seguito è riportata una politica che concede l'accesso a una tabella DynamoDB. `CrossAccountTable`

   1. Modifica la relazione di fiducia per il ruolo `Cross-Account-Role-B`.

      Per configurare la relazione di trust per il ruolo, scegli la scheda **Trust Relationships** nella console IAM per il ruolo che hai creato nel *passaggio* 2:. Cross-Account-Role-B

      Seleziona **Modifica relazione di fiducia**, quindi aggiungi il seguente documento politico. Questo documento consente `Job-Execution-Role-A` `AccountA` di assumere questo `Cross-Account-Role-B` ruolo.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "sts:AssumeRole"
            ],
            "Resource": "arn:aws:iam::123456789012:role/Job-Execution-Role-A",
            "Sid": "AllowSTSAssumerole"
          }
        ]
      }
      ```

------

   1. Concedi `Job-Execution-Role-A` `AccountA` con `- STS Assume role` le autorizzazioni da assumere`Cross-Account-Role-B`.

      Nella console IAM per Account AWS `AccountA`, seleziona`Job-Execution-Role-A`. Aggiungi la seguente istruzione di policy a `Job-Execution-Role-A` per autorizzare l'operazione `AssumeRole` nel ruolo `Cross-Account-Role-B`.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "sts:AssumeRole"
            ],
            "Resource": [
              "arn:aws:iam::123456789012:role/Cross-Account-Role-B"
            ],
            "Sid": "AllowSTSAssumerole"
          }
        ]
      }
      ```

------

   1. Imposta la `dynamodb.customAWSCredentialsProvider` proprietà con il valore come `com.amazonaws.emr.AssumeRoleAWSCredentialsProvider` nella classificazione del sito principale. Imposta la variabile di ambiente `ASSUME_ROLE_CREDENTIALS_ROLE_ARN` con il valore ARN di. `Cross-Account-Role-B`

1. Esegui il job Spark o Hive utilizzando. `Job-Execution-Role-A`

# Considerazioni
<a name="using-ddb-connector-considerations"></a>

Nota questi comportamenti e limitazioni quando utilizzi il connettore DynamoDB con Apache Spark o Apache Hive.

## Considerazioni sull'utilizzo del connettore DynamoDB con Apache Spark
<a name="ddb-spark-considerations"></a>
+ Spark SQL non supporta la creazione di una tabella Hive con l'opzione storage-handler. Per ulteriori informazioni, consulta [Specificare il formato di archiviazione per le tabelle Hive](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#specifying-storage-format-for-hive-tables) nella documentazione di Apache Spark.
+ Spark SQL non supporta l'operazione con lo storage handler. `STORED BY` Se desideri interagire con una tabella DynamoDB tramite una tabella Hive esterna, usa Hive per creare prima la tabella.
+ *Per tradurre una query in una query DynamoDB, il connettore DynamoDB utilizza il pushdown dei predicati.* Predicate pushdown filtra i dati in base a una colonna mappata alla chiave di partizione di una tabella DynamoDB. Il predicate pushdown funziona solo quando si utilizza il connettore con Spark SQL e non con l'API. MapReduce 

## Considerazioni sull'utilizzo del connettore DynamoDB con Apache Hive
<a name="ddb-hive-considerations"></a>

**Ottimizzazione del numero massimo di mappatori**
+ Se si utilizza la `SELECT` query per leggere i dati da una tabella Hive esterna mappata a DynamoDB, il numero di attività di mappa su EMR Serverless viene calcolato come il throughput di lettura totale configurato per la tabella DynamoDB, diviso per il throughput per task di mappa. La velocità effettiva predefinita per attività di mappa è 100. 
+ Il job Hive può utilizzare il numero di attività di mappatura oltre al numero massimo di contenitori configurati per applicazione EMR Serverless, a seconda del throughput di lettura configurato per DynamoDB. Inoltre, una query Hive a esecuzione prolungata può consumare tutta la capacità di lettura assegnata alla tabella DynamoDB. Ciò ha un impatto negativo sugli altri utenti.
+ È possibile utilizzare la `dynamodb.max.map.tasks` proprietà per impostare un limite superiore per le attività di mappatura. È inoltre possibile utilizzare questa proprietà per ottimizzare la quantità di dati letti da ogni attività della mappa in base alla dimensione del contenitore dell'attività.
+ È possibile impostare la `dynamodb.max.map.tasks` proprietà a livello di query Hive o nella `hive-site` classificazione del **start-job-run** comando. Questo valore deve essere maggiore o uguale a 1. Quando Hive elabora la tua query, il job Hive risultante utilizza solo i valori di `dynamodb.max.map.tasks` quando legge dalla tabella DynamoDB. 

**Ottimizzazione della velocità di scrittura per attività**
+ Il throughput di scrittura per attività su EMR Serverless viene calcolato come il throughput di scrittura totale configurato per una tabella DynamoDB, diviso per il valore della proprietà. `mapreduce.job.maps` Per Hive, il valore predefinito di questa proprietà è 2. Pertanto, le prime due attività nella fase finale del processo Hive possono consumare tutta la velocità di scrittura. Ciò comporta una riduzione delle scritture di altre attività nello stesso lavoro o in altri lavori.
+ Per evitare la limitazione della scrittura, impostate il valore della `mapreduce.job.maps` proprietà in base al numero di attività nella fase finale o alla velocità di scrittura che desiderate allocare per attività. Imposta questa proprietà nella `mapred-site` classificazione del **start-job-run** comando su EMR Serverless.