

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Creare ed eseguire un servizio gestito per l'applicazione Apache Flink
<a name="get-started-exercise"></a>

In questo esercizio, viene creata un'applicazione del servizio gestito per Apache Flink con flussi di dati come origine e come sink.

**Topics**
+ [Crea due flussi di dati Amazon Kinesis](#get-started-exercise-1)
+ [Scrittura di record di esempio nel flusso di input](#get-started-exercise-2)
+ [Scarica ed esamina il codice Java per lo streaming di Apache Flink](#get-started-exercise-5)
+ [Compilate il codice dell'applicazione](#get-started-exercise-5.5)
+ [Caricate il codice Java di streaming Apache Flink](#get-started-exercise-6)
+ [Crea ed esegui l'applicazione Managed Service for Apache Flink](#get-started-exercise-7)

## Crea due flussi di dati Amazon Kinesis
<a name="get-started-exercise-1"></a>

Prima di creare un Amazon Managed Service per Apache Flink per questo esercizio, crea due flussi di dati Kinesis (e). `ExampleInputStream` `ExampleOutputStream` L'applicazione utilizza questi flussi per i flussi di origine e di destinazione dell'applicazione.

Puoi creare questi flussi utilizzando la console Amazon Kinesis o il comando AWS CLI seguente. Per istruzioni sulla console, consulta [Creazione e aggiornamento dei flussi di dati](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**Per creare i flussi di dati (AWS CLI)**

1. Per creare il primo stream (`ExampleInputStream`), usa il seguente comando Amazon Kinesis `create-stream` AWS CLI .

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. Per creare il secondo flusso utilizzato dall'applicazione per scrivere l'output, esegui lo stesso comando, modificando il nome del flusso in `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Scrittura di record di esempio nel flusso di input
<a name="get-started-exercise-2"></a>

In questa sezione, viene utilizzato uno script Python per scrivere record di esempio nel flusso per l'applicazione da elaborare.

**Nota**  
Questa sezione richiede [AWS SDK per Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Crea un file denominato `stock.py` con i seguenti contenuti:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Successivamente nel tutorial, esegui lo script `stock.py` per inviare dati all'applicazione. 

   ```
   $ python stock.py
   ```

## Scarica ed esamina il codice Java per lo streaming di Apache Flink
<a name="get-started-exercise-5"></a>

Il codice dell'applicazione Java per questi esempi è disponibile da. GitHub Per scaricare il codice dell'applicazione, esegui le operazioni descritte di seguito:

1. Clona il repository remoto con il comando seguente:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Passa alla directory `GettingStarted`.

Il codice dell'applicazione si trova nei file `CustomSinkStreamingJob.java` e `CloudWatchLogSink.java`. Tieni presente quanto segue riguardo al codice dell'applicazione:
+ L'applicazione utilizza un'origine Kinesis per leggere dal flusso di origine. Il seguente snippet crea il sink Kinesis:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compilate il codice dell'applicazione
<a name="get-started-exercise-5.5"></a>

In questa sezione, viene utilizzato il compilatore Apache Maven per creare il codice Java per l'applicazione. Per ulteriori informazioni sull'installazione di Apache Maven e Java Development Kit (JDK), consulta [Prerequisiti per il completamento degli esercizi](tutorial-stock-data.md#setting-up-prerequisites).

L'applicazione Java richiede i componenti seguenti:
+ Un file [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html). Questo file contiene informazioni sulla configurazione e le dipendenze dell'applicazione, tra cui le librerie Amazon Managed Service per Apache Flink.
+ Un metodo `main` contenente la logica dell'applicazione.

**Nota**  
**Per utilizzare il connettore Kinesis per la seguente applicazione, è necessario scaricare il codice sorgente del connettore e crearlo come descritto nella documentazione di [Apache](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html) Flink.**

**Per creare e compilare il codice dell'applicazione**

1. Crea un' Java/Maven applicazione nel tuo ambiente di sviluppo. Per ulteriori informazioni su come creare un'applicazione, consulta la documentazione per l'ambiente di sviluppo:
   + [ Creazione del primo progetto Java (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Creazione, esecuzione e compressione della prima applicazione Java (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Utilizzare il codice seguente per un file denominato `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Notare quanto segue riguardo l'esempio di codice precedente:
   + Questo file contiene il metodo `main` che definisce la funzionalità dell'applicazione.
   + L'applicazione crea connettori di origine e sink per accedere alle risorse esterne utilizzando un oggetto `StreamExecutionEnvironment`. 
   + L'applicazione crea connettori di origine e sink utilizzando proprietà statiche. Per usare proprietà dell'applicazione dinamiche, utilizza i metodi `createSourceFromApplicationProperties` e `createSinkFromApplicationProperties` per creare i connettori. Questi metodi leggono le proprietà dell'applicazione per configurare il connettori.

1. Per usare il codice dell'applicazione, compila il codice e comprimilo in un file JAR. È possibile compilare e creare un pacchetto del codice in uno di due modi:
   + Utilizzare lo strumento Maven della riga di comando. Crea il file JAR eseguendo il comando seguente nella directory che contiene il file `pom.xml`:

     ```
     mvn package
     ```
   + Utilizza il tuo ambiente di sviluppo. Per informazioni dettagliate, consulta la documentazione relativa all'ambiente di sviluppo.

   È possibile caricare il pacchetto come un file JAR, oppure comprimere il pacchetto e caricarlo come un file ZIP. Se create l'applicazione utilizzando AWS CLI, specificate il tipo di contenuto del codice (JAR o ZIP).

1. Se si verificano errori durante la compilazione, verifica che la variabile di ambiente `JAVA_HOME` sia impostata correttamente.

Se l'applicazione viene compilata correttamente, viene creato il seguente file:

`target/java-getting-started-1.0.jar`

## Caricate il codice Java di streaming Apache Flink
<a name="get-started-exercise-6"></a>

In questa sezione, viene creato un bucket Amazon Simple Storage Service (Amazon S3) e caricato il codice dell'applicazione.

**Per caricare il codice dell'applicazione**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Seleziona **Crea bucket**.

1. Inserisci **ka-app-code-*<username>*** nel campo **Nome bucket**. Aggiungi un suffisso al nome del bucket, ad esempio il nome utente, per renderlo globalmente univoco. Scegli **Next (Successivo)**.

1. Nella fase **Configura opzioni**, non modificare le impostazioni e scegli **Successivo**.

1. Nella fase **Imposta autorizzazioni**, non modificare le impostazioni e scegli **Successivo**.

1. Seleziona **Crea bucket**.

1. **Nella console Amazon S3, scegli il *<username>* bucket **ka-app-code-** e scegli Carica.**

1. Nella fase **Seleziona file**, scegli **Aggiungi file**. Individua il file `java-getting-started-1.0.jar` creato nella fase precedente. Scegli **Next (Successivo)**.

1. Nella fase **Imposta autorizzazioni**, non modificare le impostazioni. Scegli **Next (Successivo)**.

1. Nella fase **Imposta proprietà**, non modificare le impostazioni. Scegli **Carica**.

Il codice dell'applicazione è ora archiviato in un bucket Amazon S3 accessibile dall'applicazione.

## Crea ed esegui l'applicazione Managed Service for Apache Flink
<a name="get-started-exercise-7"></a>

È possibile creare ed eseguire un'applicazione del servizio gestito per Apache Flink utilizzando la console o la AWS CLI.

**Nota**  
Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, crei queste risorse separatamente.

**Topics**
+ [Crea ed esegui l'applicazione (Console)](#get-started-exercise-7-console)
+ [Crea ed esegui l'applicazione (AWS CLI)](#get-started-exercise-7-cli)

### Crea ed esegui l'applicazione (Console)
<a name="get-started-exercise-7-console"></a>

Segui questi passaggi per creare, configurare, aggiornare ed eseguire l'applicazione utilizzando la console.

#### Creazione dell’applicazione
<a name="get-started-exercise-7-console-create"></a>

1. [Apri la console Kinesis in /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Sul pannello di controllo di Amazon Kinesis, scegli **Crea applicazione di analisi**.

1. Nella pagina **Kinesis Analytics - Create application (Kinesis Analytics - Crea applicazione)**, fornire i dettagli dell'applicazione come segue:
   + Per **Nome applicazione**, immetti **MyApplication**.
   + Per **Descrizione**, inserisci **My java test app**.
   + Per **Runtime**, scegliere **Apache Flink 1.6**.

1. Per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2` per il ruolo IAM**.

1. Scegli **Crea applicazione**.

**Nota**  
Quando crei un'applicazione Amazon Managed Service for Apache Flink utilizzando la console, hai la possibilità di creare un ruolo e una policy IAM per la tua applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Ruolo: `kinesis-analytics-MyApplication-us-west-2`

#### Modifica la policy IAM
<a name="get-started-exercise-7-console-iam"></a>

Modifica la policy IAM per aggiungere le autorizzazioni per accedere ai flussi di dati Kinesis.

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Seleziona **Policy**. Scegli la policy **`kinesis-analytics-service-MyApplication-us-west-2`** creata dalla console nella sezione precedente. 

1. Nella pagina **Riepilogo**, scegli **Modifica policy**. Scegli la scheda **JSON**.

1. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'account di esempio IDs (*012345678901*) con l'ID del tuo account.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configura l'applicazione
<a name="get-started-exercise-7-console-configure"></a>

1. Nella **MyApplication**pagina, scegli **Configura**.

1. Nella pagina **Configura applicazione**, fornisci la **Posizione del codice**:
   + Per **Bucket Amazon S3**, inserisci **ka-app-code-*<username>***.
   + Per **Percorso dell'oggetto Amazon S3**, inserisci **java-getting-started-1.0.jar**

1. In **Accesso alle risorse dell'applicazione**, per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-west-2` per il ruolo IAM**.

1. In **Proprietà**, per **ID gruppo**, inserisci **ProducerConfigProperties**.

1. Immetti i valori e le proprietà dell'applicazione seguenti:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/it_it/streams/latest/dev/get-started-exercise.html)

1. In **Monitoraggio**, accertati che il **Monitoraggio del livello dei parametri** sia impostato su **Applicazione**.

1. Per la **CloudWatch registrazione**, seleziona la casella di controllo **Abilita**.

1. Scegliere **Aggiorna**.

**Nota**  
Quando scegli di abilitare la CloudWatch registrazione, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:   
Gruppo di log: `/aws/kinesis-analytics/MyApplication`
Flusso di log: `kinesis-analytics-log-stream`

#### Esecuzione dell'applicazione.
<a name="get-started-exercise-7-console-run"></a>

1. **Nella **MyApplication**pagina, scegli Esegui.** Conferma l'operazione.

1. Quando l'applicazione è in esecuzione, aggiorna la pagina. La console mostra il **Grafico dell'applicazione**.

#### Arresta l'applicazione
<a name="get-started-exercise-7-console-stop"></a>

Nella **MyApplication**pagina, scegli **Stop**. Conferma l'operazione.

#### Aggiornamento dell'applicazione
<a name="get-started-exercise-7-console-update"></a>

Tramite la console, puoi aggiornare le impostazioni dell'applicazione, ad esempio le proprietà dell'applicazione, le impostazioni di monitoraggio e la posizione o il nome di file del JAR dell'applicazione. Puoi anche ricaricare il JAR dell'applicazione dal bucket Amazon S3 se è necessario aggiornare il codice dell'applicazione.

Nella **MyApplication**pagina, scegli **Configura**. Aggiorna le impostazioni dell'applicazione e scegli **Aggiorna**.

### Crea ed esegui l'applicazione (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

In questa sezione, si utilizza AWS CLI per creare ed eseguire l'applicazione Managed Service for Apache Flink. Managed Service for Apache Flink utilizza il `kinesisanalyticsv2` AWS CLI comando per creare e interagire con le applicazioni Managed Service for Apache Flink.

#### Creazione di una policy di autorizzazione
<a name="get-started-exercise-7-cli-policy"></a>

Innanzitutto, crea una policy di autorizzazione con due istruzioni: una che concede le autorizzazioni per l'operazione `read` sul flusso di origine e un'altra che concede le autorizzazioni per operazioni `write` sul flusso di sink. Collega quindi la policy a un ruolo IAM (che verrà creato nella sezione successiva). Pertanto, quando il servizio gestito per Apache Flink assume il ruolo, il servizio disporrà delle autorizzazioni necessarie per leggere dal flusso di origine e scrivere nel flusso di sink.

Utilizza il codice seguente per creare la policy di autorizzazione `KAReadSourceStreamWriteSinkStream`. Sostituisci `username` con il nome utente utilizzato per creare il bucket Amazon S3 per archiviare il codice dell'applicazione. Sostituisci l'ID dell'account in Amazon Resource Names (ARNs) (`012345678901`) con l'ID del tuo account.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

Per step-by-step istruzioni su come creare una politica di autorizzazioni, consulta il [Tutorial: Create and Attach Your First Customer Managed Policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) nella *IAM User Guide*.

**Nota**  
Per accedere ad altri AWS servizi, puoi utilizzare il AWS SDK per Java. Il servizio gestito per Apache Flink imposta automaticamente le credenziali richieste dall'SDK su quelle del ruolo IAM di esecuzione del servizio associato all'applicazione. Non sono richieste fasi aggiuntive.

#### Creazione di un ruolo IAM
<a name="get-started-exercise-7-cli-role"></a>

In questa sezione, crei un ruolo IAM che Managed Service for Apache Flink può assumere per leggere un flusso di origine e scrivere nel flusso sink.

Il servizio gestito per Apache Flink non può accedere al tuo flusso senza autorizzazioni. Queste autorizzazioni possono essere assegnate con un ruolo IAM. Ad ogni ruolo IAM sono collegate due policy. La policy di attendibilità concede al servizio gestito per Apache Flink l'autorizzazione per assumere il ruolo e la policy di autorizzazione determina cosa può fare il servizio assumendo questo ruolo.

Collega la policy di autorizzazione creata nella sezione precedente a questo ruolo.

**Per creare un ruolo IAM**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Nel riquadro di navigazione, seleziona **Ruoli**, quindi **Crea nuovo ruolo**.

1. In **Seleziona tipo di identità attendibile**, scegli **Servizio AWS **. In **Scegli il servizio che utilizzerà questo ruolo**, scegli **Kinesis**. In **Seleziona il tuo caso d'uso**, scegli **Analisi dei dati Kinesis**.

   Scegli **Successivo: Autorizzazioni**.

1. Nella pagina **Allega policy di autorizzazione**, seleziona **Successivo: esamina**. Collega le policy di autorizzazione dopo aver creato il ruolo.

1. Nella pagina **Crea ruolo**, immetti **KA-stream-rw-role** per **Nome ruolo**. Scegli **Crea ruolo**.

   È stato creato un nuovo ruolo IAM denominato `KA-stream-rw-role`. Successivamente, aggiorna le policy di attendibilità e di autorizzazione per il ruolo.

1. Collega la policy di autorizzazione al ruolo.
**Nota**  
Per questo esercizio, il servizio gestito per Apache Flink assume questo ruolo per la lettura di dati da un flusso di dati Kinesis (origine) e la scrittura dell'output in un altro flusso di dati Kinesis. Pertanto, devi collegare la policy creata nella fase precedente, [Creazione di una policy di autorizzazione](#get-started-exercise-7-cli-policy).

   1. Nella pagina **Riepilogo**, scegli la scheda **Autorizzazioni**.

   1. Scegliere **Collega policy**.

   1. Nella casella di ricerca, immetti **KAReadSourceStreamWriteSinkStream** (la policy creata nella sezione precedente).

   1. Scegli la policy **KAReadInputStreamWriteOutputStream** e seleziona **Collega policy**.

È stato creato il ruolo di esecuzione del servizio che l'applicazione utilizzerà per accedere alle risorse. Prendi nota dell'ARN del nuovo ruolo.

Per step-by-step istruzioni sulla creazione di un ruolo, consulta [Creating an IAM Role (Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) nella *IAM User Guide*.

#### Creazione dell'applicazione del servizio gestito per Apache Flink
<a name="get-started-exercise-7-cli-create"></a>

1. Salvare il seguente codice JSON in un file denominato `create_request.json`. Sostituisci l'ARN del ruolo di esempio con l'ARN per il ruolo creato in precedenza. Sostituisci il suffisso dell'ARN del bucket (`username`) con il suffisso scelto nella sezione precedente. Sostituisci l'ID account di esempio (`012345678901`) nel ruolo di esecuzione del servizio con il tuo ID account.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) con la richiesta precedente per creare l'applicazione: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

L'applicazione è ora creata. Avvia l'applicazione nella fase successiva.

#### Avvio dell'applicazione
<a name="get-started-exercise-7-cli-start"></a>

In questa sezione, viene utilizzata l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) per avviare l'applicazione.

**Per avviare l'applicazione**

1. Salvare il seguente codice JSON in un file denominato `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) con la richiesta precedente per avviare l'applicazione:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

L'applicazione è ora in esecuzione. Puoi controllare i parametri del servizio gestito per Apache Flink sulla CloudWatch console Amazon per verificare che l'applicazione funzioni.

#### Interruzione dell'applicazione
<a name="get-started-exercise-7-cli-stop"></a>

In questa sezione, viene utilizzata l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) per interrompere l'applicazione.

**Per interrompere l'applicazione**

1. Salvare il seguente codice JSON in un file denominato `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Esegui l'operazione [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) con la seguente richiesta di interrompere l'applicazione:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

L'applicazione è ora interrotta.