

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Crea ed esegui un'applicazione Managed Service for Apache Flink
<a name="gs-table-create"></a>

In questo esercizio, creerai un servizio gestito per l'applicazione Apache Flink con flussi di dati Kinesis come origine e sink.

**Topics**
+ [Crea risorse dipendenti](#gs-table-resources)
+ [Configurazione dell'ambiente di sviluppo locale](#gs-table-2)
+ [Scarica ed esamina il codice Java per lo streaming di Apache Flink](#gs-table-5)
+ [Esegui l'applicazione localmente](#gs-table-run-locally)
+ [Osserva l'applicazione che scrive dati in un bucket S3](#gs-table-input-output)
+ [Interrompi l'esecuzione locale dell'applicazione](#gs-table-stop)
+ [Compila e impacchetta il codice dell'applicazione](#gs-table-5.5)
+ [Caricate il file JAR del codice dell'applicazione](#gs-table-6)
+ [Crea e configura l'applicazione Managed Service for Apache Flink](#gs-table-7)

## Crea risorse dipendenti
<a name="gs-table-resources"></a>

Prima di creare un servizio gestito per Apache Flink per questo esercizio, devi creare le risorse dipendenti seguenti: 
+ Un bucket Amazon S3 per archiviare il codice dell'applicazione e scrivere l'output dell'applicazione.
**Nota**  
Questo tutorial presuppone che l'applicazione venga distribuita nella regione us-east-1. Se si utilizza un'altra regione, è necessario adattare tutti i passaggi di conseguenza.

### Crea un bucket Amazon S3
<a name="gs-table-resources-s3"></a>

Puoi creare un bucket Amazon S3 utilizzando la relativa console. Per istruzioni per la creazione di questa risorsa, consulta gli argomenti riportati di seguito:
+ [Come si crea un bucket S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) nella *Guida per l'utente di Amazon Simple Storage Service*. Assegna al bucket Amazon S3 un nome univoco globale aggiungendo il tuo nome di accesso.
**Nota**  
Assicurati di creare il bucket nella regione che utilizzi per questo tutorial. L'impostazione predefinita per il tutorial è us-east-1.

### Altre risorse
<a name="gs-table-resources-cw"></a>

Quando crei la tua applicazione, Managed Service for Apache Flink crea le seguenti CloudWatch risorse Amazon se non esistono già:
+ Un gruppo di log chiamato `/AWS/KinesisAnalytics-java/<my-application>`.
+ Un flusso di log denominato `kinesis-analytics-log-stream`.

## Configurazione dell'ambiente di sviluppo locale
<a name="gs-table-2"></a>

Per lo sviluppo e il debug, puoi eseguire l'applicazione Apache Flink sulla tua macchina, direttamente dal tuo IDE preferito. Tutte le dipendenze di Apache Flink vengono gestite come normali dipendenze Java utilizzando Maven.

**Nota**  
Sulla tua macchina di sviluppo, devi avere Java JDK 11, Maven e Git installati. [Ti consigliamo di utilizzare un ambiente di sviluppo come [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) o IntelliJ IDEA.](https://www.jetbrains.com/idea/) Per verificare di soddisfare tutti i prerequisiti, vedere. [Soddisfa i prerequisiti per completare gli esercizi](getting-started.md#setting-up-prerequisites) **Non** è necessario installare un cluster Apache Flink sul computer. 

### Autentica la tua sessione AWS
<a name="get-started-exercise-table-authenticate"></a>

L'applicazione utilizza i flussi di dati Kinesis per pubblicare i dati. Quando si esegue localmente, è necessario disporre di una sessione AWS autenticata valida con autorizzazioni di scrittura nel flusso di dati Kinesis. Utilizza i seguenti passaggi per autenticare la sessione:

1. Se non hai configurato il profilo AWS CLI e un profilo denominato con credenziali valide, consulta. [Configura () AWS Command Line Interface AWS CLI](setup-awscli.md)

1. Se l'IDE dispone di un plug-in con cui integrarsi AWS, è possibile utilizzarlo per passare le credenziali all'applicazione in esecuzione nell'IDE. Per ulteriori informazioni, vedere [AWS Toolkit for IntelliJ](https://aws.amazon.com/intellij/) IDEA [AWS e Toolkit per compilare l'](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)applicazione o eseguire Eclipse.

## Scarica ed esamina il codice Java per lo streaming di Apache Flink
<a name="gs-table-5"></a>

Il codice dell'applicazione per questo esempio è disponibile da. GitHub 

**Per scaricare il codice dell'applicazione Java**

1. Clona il repository remoto con il comando seguente:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Passa alla directory `./java/GettingStartedTable`.

### Esamina i componenti dell'applicazione
<a name="get-started-exercise-table-components"></a>

L'applicazione è interamente implementata nella `com.amazonaws.services.msf.BasicTableJob` classe. Il `main()` metodo definisce sorgenti, trasformazioni e pozzi. L'esecuzione viene avviata da un'istruzione di esecuzione alla fine di questo metodo.

**Nota**  
Per un'esperienza di sviluppo ottimale, l'applicazione è progettata per funzionare senza modifiche al codice sia su Amazon Managed Service per Apache Flink che localmente, per lo sviluppo nel tuo IDE.
+ Per leggere la configurazione di runtime in modo che funzioni durante l'esecuzione in Amazon Managed Service for Apache Flink e nel tuo IDE, l'applicazione rileva automaticamente se è in esecuzione autonoma localmente nell'IDE. In tal caso, l'applicazione carica la configurazione di runtime in modo diverso:

  1. Quando l'applicazione rileva che è in esecuzione in modalità autonoma nell'IDE, forma il `application_properties.json` file incluso nella cartella delle **risorse** del progetto. Segue il contenuto del file.

  1. Quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink, il comportamento predefinito carica la configurazione dell'applicazione dalle proprietà di runtime che definirai nell'applicazione Amazon Managed Service for Apache Flink. Per informazioni, consulta [Crea e configura l'applicazione Managed Service for Apache Flink](get-started-exercise.md#get-started-exercise-7).

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ Il `main()` metodo definisce il flusso di dati dell'applicazione e lo esegue. 
  + Inizializza gli ambienti di streaming predefiniti. In questo esempio, mostriamo come creare sia l'API `StreamExecutionEnvironment` da utilizzare con l' DataStream API, sia `StreamTableEnvironment` quello da utilizzare con SQL e l'API Table. I due oggetti di ambiente sono due riferimenti separati allo stesso ambiente di runtime, per utilizzare API diverse.

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + Caricate i parametri di configurazione dell'applicazione. Questo li caricherà automaticamente dalla posizione corretta, a seconda di dove è in esecuzione l'applicazione:

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + [Il [connettore FileSystem sink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) utilizzato dall'applicazione per scrivere risultati nei file di output di Amazon S3 quando Flink completa un checkpoint.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing) È necessario abilitare i checkpoint per scrivere file nella destinazione. Quando l'applicazione è in esecuzione in Amazon Managed Service for Apache Flink, la configurazione dell'applicazione controlla il checkpoint e lo abilita per impostazione predefinita. Al contrario, quando vengono eseguiti localmente, i checkpoint sono disabilitati per impostazione predefinita. L'applicazione rileva che viene eseguita localmente e configura il checkpoint ogni 5.000 ms. 

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + Questa applicazione non riceve dati da una fonte esterna effettiva. Genera dati casuali da elaborare tramite il [DataGen connettore](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/). Questo connettore è disponibile per DataStream API, SQL e Table API. Per dimostrare l'integrazione tra le API, l'applicazione utilizza la versione DataStram API perché offre maggiore flessibilità. Ogni record viene generato da una funzione generatrice chiamata `StockPriceGeneratorFunction` in questo caso, in cui è possibile inserire una logica personalizzata. 

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + Nell' DataStream API, i record possono avere classi personalizzate. Le classi devono seguire regole specifiche in modo che Flink possa usarle come record. Per ulteriori informazioni, consulta [Tipi di dati supportati](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types). In questo esempio, la `StockPrice` classe è un [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). 
  + La sorgente viene quindi collegata all'ambiente di esecuzione, generando un file `DataStream` di`StockPrice`. Questa applicazione non utilizza la [semantica degli eventi](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time) e non genera una filigrana. Esegui il DataGenerator codice sorgente con un parallelismo pari a 1, indipendentemente dal parallelismo del resto dell'applicazione.

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + Quanto segue nel flusso di elaborazione dei dati viene definito utilizzando l'API Table e SQL. Per fare ciò, convertiamo l' DataStream of StockPrices in una tabella. Lo schema della tabella viene automaticamente dedotto dalla `StockPrice` classe. 

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + Il seguente frammento di codice mostra come definire una vista e una query utilizzando l'API programmatica Table:

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + Viene definita una tabella sink per scrivere i risultati in un bucket Amazon S3 come file JSON. Per illustrare la differenza rispetto alla definizione di una vista a livello di codice, con l'API Table la tabella sink viene definita utilizzando SQL.

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + L'ultimo passaggio consiste nell'inserire la visualizzazione filtrata dei prezzi delle azioni nella tabella sink. `executeInsert()` Questo metodo avvia l'esecuzione del flusso di dati che abbiamo definito finora. 

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### Usa il file pom.xml
<a name="get-started-exercise-5-2"></a>

Il file pom.xml definisce tutte le dipendenze richieste dall'applicazione e configura il plugin Maven Shade per creare il fat-jar che contiene tutte le dipendenze richieste da Flink. 
+ `provided`Alcune dipendenze hanno un ambito. Queste dipendenze sono disponibili automaticamente quando l'applicazione viene eseguita in Amazon Managed Service for Apache Flink. Sono necessarie per l'applicazione o per l'applicazione localmente nel tuo IDE. Per ulteriori informazioni, consulta (aggiornamento a TableAPI). [Esegui l'applicazione localmente](get-started-exercise.md#get-started-exercise-5-run) Assicurati di utilizzare la stessa versione di Flink del runtime che utilizzerai in Amazon Managed Service for Apache Flink. Per utilizzare TableAPI e SQL, devi includere `flink-table-planner-loader` e`flink-table-runtime-dependencies`, entrambi con ambito. `provided` 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ È necessario aggiungere ulteriori dipendenze Apache Flink al pom con l'ambito predefinito. [Ad esempio, il [DataGen connettore, il connettore FileSystem ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)[SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/) e il formato JSON.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/) 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ Per scrivere su Amazon S3 durante l'esecuzione locale, nell'ambito è incluso anche il file system S3 Hadoop. `provided`

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Il plugin Maven Java Compiler assicura che il codice sia compilato con Java 11, la versione JDK attualmente supportata da Apache Flink. 
+ Il plugin Maven Shade impacchetta il fat-jar, escludendo alcune librerie fornite dal runtime. Inoltre specifica due trasformatori: e. `ServicesResourceTransformer` `ManifestResourceTransformer` Quest'ultimo configura la classe contenente il `main` metodo per avviare l'applicazione. Se rinomini la classe principale, non dimenticare di aggiornare questo trasformatore.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## Esegui l'applicazione localmente
<a name="gs-table-run-locally"></a>

Puoi eseguire ed eseguire il debug dell'applicazione Flink localmente nel tuo IDE.

**Nota**  
Prima di continuare, verificate che i flussi di input e output siano disponibili. Per informazioni, consulta [Crea due flussi di dati Amazon Kinesis](get-started-exercise.md#get-started-exercise-1). Inoltre, verifica di disporre dell'autorizzazione per leggere e scrivere da entrambi gli stream. Per informazioni, consulta [Autentica la tua sessione AWS](get-started-exercise.md#get-started-exercise-2-5).   
La configurazione dell'ambiente di sviluppo locale richiede Java 11 JDK, Apache Maven e un IDE per lo sviluppo in Java. Verifica di soddisfare i prerequisiti richiesti. Per informazioni, consulta [Soddisfa i prerequisiti per completare gli esercizi](getting-started.md#setting-up-prerequisites).

### Importa il progetto Java nel tuo IDE
<a name="gs-table-import"></a>

Per iniziare a lavorare sull'applicazione nel tuo IDE, devi importarla come progetto Java. 

Il repository che avete clonato contiene diversi esempi. Ogni esempio è un progetto separato. Per questo tutorial, importa il contenuto della `./jave/GettingStartedTable` sottodirectory nel tuo IDE. 

Inserisci il codice come progetto Java esistente usando Maven.

**Nota**  
Il processo esatto per importare un nuovo progetto Java varia a seconda dell'IDE in uso.

### Modificate la configurazione locale dell'applicazione
<a name="gs-table-modify-2"></a>

Quando viene eseguita localmente, l'applicazione utilizza la configurazione contenuta nel `application_properties.json` file nella cartella delle risorse del progetto sotto`./src/main/resources`. Per questa applicazione tutorial, i parametri di configurazione sono il nome del bucket e il percorso in cui verranno scritti i dati.

Modifica la configurazione e modifica il nome del bucket Amazon S3 in modo che corrisponda al bucket creato all'inizio di questo tutorial.

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**Nota**  
La proprietà di configurazione `name` deve contenere solo il nome del bucket, ad esempio. `my-bucket-name` Non includere alcun prefisso, ad esempio una `s3://` barra finale.  
Se modificate il percorso, omettete le barre iniziali o finali.

### Imposta la configurazione di esecuzione dell'IDE
<a name="gs-table-setupIDE"></a>

Puoi eseguire ed eseguire il debug dell'applicazione Flink dal tuo IDE direttamente eseguendo la classe principale`com.amazonaws.services.msf.BasicTableJob`, come faresti con qualsiasi applicazione Java. Prima di eseguire l'applicazione, è necessario configurare la configurazione Run. La configurazione dipende dall'IDE in uso. Ad esempio, vedere [Run/debug le configurazioni nella documentazione](https://www.jetbrains.com/help/idea/run-debug-configuration.html) di IntelliJ IDEA. In particolare, è necessario configurare quanto segue:

1. **Aggiungi le `provided` dipendenze al classpath**. Ciò è necessario per assicurarsi che le dipendenze con `provided` scope vengano passate all'applicazione durante l'esecuzione locale. Senza questa configurazione, l'applicazione visualizza immediatamente un `class not found` errore. 

1. **Passa AWS le credenziali per accedere agli stream Kinesis all'**applicazione. Il modo più veloce è usare [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/). Utilizzando questo plugin IDE nella configurazione Run, è possibile selezionare un profilo specifico AWS . AWS l'autenticazione avviene utilizzando questo profilo. Non è necessario passare direttamente AWS le credenziali. 

1. Verifica che l'IDE esegua l'applicazione utilizzando **JDK 11**.

### Esegui l'applicazione nel tuo IDE
<a name="gs-table-runIDE"></a>

Dopo aver impostato la configurazione Run per`BasicTableJob`, potete eseguirla o eseguire il debug come una normale applicazione Java. 

**Nota**  
Non è possibile eseguire il fat-jar generato da Maven direttamente dalla riga di `java -jar ...` comando. Questo jar non contiene le dipendenze principali di Flink necessarie per eseguire l'applicazione in modo autonomo.

Quando l'applicazione viene avviata correttamente, registra alcune informazioni sul minicluster autonomo e sull'inizializzazione dei connettori. Seguono una serie di registri INFO e alcuni registri WARN che Flink normalmente emette all'avvio dell'applicazione.

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

Una volta completata l'inizializzazione, l'applicazione non emette ulteriori voci di registro. **Durante il flusso di dati, non viene emesso alcun registro.**

Per verificare se l'applicazione elabora correttamente i dati, potete controllare il contenuto del bucket di output, come descritto nella sezione seguente.

**Nota**  
 Il comportamento normale di un'applicazione Flink è quello di non emettere registri relativi al flusso di dati. L'emissione di registri su ogni record può essere utile per il debug, ma può comportare un notevole sovraccarico durante l'esecuzione in produzione. 

## Osserva l'applicazione che scrive dati in un bucket S3
<a name="gs-table-input-output"></a>

Questa applicazione di esempio genera dati casuali internamente e li scrive nel bucket S3 di destinazione che hai configurato. A meno che non sia stato modificato il percorso di configurazione predefinito, i dati verranno scritti nel `output` percorso seguito dal partizionamento di dati e ore, nel formato. `./output/<yyyy-MM-dd>/<HH>`

Il [connettore FileSystem sink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) crea nuovi file sul checkpoint Flink. Quando viene eseguita localmente, l'applicazione esegue un checkpoint ogni 5 secondi (5.000 millisecondi), come specificato nel codice. 

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**Per sfogliare il bucket S3 e osservare il file scritto dall'applicazione**

1. 

   1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Scegli il bucket che hai creato in precedenza. 

1. Passa al `output` percorso, quindi alle cartelle di data e ora che corrispondono all'ora corrente nel fuso orario UTC. 

1. Aggiorna periodicamente per osservare la comparsa di nuovi file ogni 5 secondi.

1. Seleziona e scarica un file per osservarne il contenuto.
**Nota**  
Per impostazione predefinita, i file non hanno estensioni. Il contenuto è formattato come JSON. Puoi aprire i file con qualsiasi editor di testo per esaminarne il contenuto.

## Interrompi l'esecuzione locale dell'applicazione
<a name="gs-table-stop"></a>

Arresta l'esecuzione dell'applicazione nel tuo IDE. L'IDE di solito fornisce un'opzione di «stop». La posizione e il metodo esatti dipendono dall'IDE. 

## Compila e impacchetta il codice dell'applicazione
<a name="gs-table-5.5"></a>

In questa sezione, si utilizza Apache Maven per compilare il codice Java e impacchettarlo in un file JAR. Puoi compilare e impacchettare il codice usando lo strumento da riga di comando Maven o il tuo IDE.

**Per compilare e impacchettare utilizzando la riga di comando Maven**

Passa alla directory che contiene il GettingStarted progetto Jave ed esegui il seguente comando:

```
$ mvn package
```

**Per compilare e impacchettare usando il tuo IDE**

Esegui `mvn package` dalla tua integrazione IDE Maven.

In entrambi i casi, `target/amazon-msf-java-table-app-1.0.jar` viene creato il file JAR.

**Nota**  
L'esecuzione di un *progetto di compilazione* dal tuo IDE potrebbe non creare il file JAR.

## Caricate il file JAR del codice dell'applicazione
<a name="gs-table-6"></a>

In questa sezione, carichi il file JAR creato nella sezione precedente nel bucket Amazon S3 creato all'inizio di questo tutorial. Se l'hai già fatto, completa. [Crea un bucket Amazon S3](#gs-table-resources-s3)

**Per caricare il codice dell'applicazione**

1. Apri la console Amazon S3 all'indirizzo. [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)

1. Scegli il bucket che hai creato in precedenza per il codice dell'applicazione.

1. Scegli il campo **Carica**. 

1. Scegliere **Add files (Aggiungi file)**.

1. Vai al file JAR generato nella sezione precedente:`target/amazon-msf-java-table-app-1.0.jar`.

1. Scegli **Carica** senza modificare altre impostazioni.
**avvertimento**  
 Assicurati di selezionare il file JAR corretto in`<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar`.   
La directory di destinazione contiene anche altri file JAR che non è necessario caricare. 

## Crea e configura l'applicazione Managed Service for Apache Flink
<a name="gs-table-7"></a>

È possibile creare e configurare un'applicazione Managed Service for Apache Flink utilizzando la console o il. AWS CLI Per questo tutorial, utilizzerai la console.

**Nota**  
Quando crei l'applicazione utilizzando la console, le tue risorse AWS Identity and Access Management (IAM) e Amazon CloudWatch Logs vengono create automaticamente. Quando crei l'applicazione utilizzando AWS CLI, devi creare queste risorse separatamente.

### Creazione dell’applicazione
<a name="gs-table-7-console-create"></a>

1. Accedi a e apri la console Amazon MSF all'indirizzo Console di gestione AWS https://console.aws.amazon.com/flink.

1. Verifica che sia selezionata la regione corretta: Stati Uniti orientali (Virginia settentrionale) us-east-1.

1. **Nel menu a destra, scegli Applicazioni **Apache Flink, quindi scegli Crea applicazione** di streaming.** In alternativa, scegli **Crea applicazione di streaming** nella sezione **Guida introduttiva** della pagina iniziale. 

1. Nella pagina **Crea applicazione di streaming**, completa quanto segue:
   + Per **Scegli un metodo per configurare l'applicazione di elaborazione dello stream**, scegli **Crea da zero**.
   + Per la **configurazione di Apache Flink, versione Application Flink**, scegli **Apache** Flink 1.19.
   + Nella sezione **Configurazione dell'applicazione, completa quanto segue**:
     + Per **Nome applicazione**, immetti **MyApplication**.
     + Per **Descrizione**, inserisci **My Java Table API test app**.
     + Per **accedere alle risorse dell'applicazione**, scegli **Crea/aggiorna il ruolo IAM kinesis-analytics- MyApplication-us-east-1 con** le politiche richieste.
   + In **Modello per le impostazioni dell'applicazione**, completa quanto segue:
     + Per **Modelli**, scegliete **Sviluppo**.

1. Scegli **Crea applicazione di streaming**.

**Nota**  
Quando crei un'applicazione del servizio gestito per Apache Flink tramite la console, hai la possibilità di avere un ruolo e una policy IAM creati per l'applicazione. L'applicazione utilizza questo ruolo e questa policy per accedere alle sue risorse dipendenti. Queste risorse IAM sono denominate utilizzando il nome dell'applicazione e la Regione come segue:  
Policy: `kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
Ruolo: `kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### Modifica la policy IAM
<a name="gs-table-7-console-iam"></a>

Modifica la policy IAM per aggiungere le autorizzazioni per accedere al bucket Amazon S3.

**Modifica della policy IAM per aggiungere le autorizzazioni per i bucket S3**

1. Aprire la console IAM all'indirizzo [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Seleziona **Policy**. Scegli la policy **`kinesis-analytics-service-MyApplication-us-east-1`** creata dalla console nella sezione precedente. 

1. Scegli **Modifica**, quindi scegli la scheda **JSON**.

1. Aggiungi alla policy la sezione evidenziata del seguente esempio di policy. Sostituisci l'ID account di esempio ({{012345678901}}) con l'ID del tuo account e {{<bucket-name>}} con il nome del bucket S3 che hai creato.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  Scegli **Avanti** e quindi seleziona **Salva modifiche**.

### Configura l'applicazione
<a name="gs-table-7-console-configure"></a>

Modificate l'applicazione per impostare l'elemento del codice dell'applicazione.

**Per configurare l'applicazione**

1. **Nella **MyApplication**pagina, scegli Configura.**

1. **Nella sezione **Posizione del codice dell'applicazione**, scegli Configura.**
   + Per il bucket **Amazon S3, seleziona il bucket** creato in precedenza per il codice dell'applicazione. **Scegli **Sfoglia** e seleziona il bucket corretto, quindi scegli Scegli.** Non fare clic sul nome del bucket. 
   + Per **Percorso dell'oggetto Amazon S3**, inserisci **amazon-msf-java-table-app-1.0.jar**

1. Per **Autorizzazioni di accesso**, scegli **Crea/aggiorna `kinesis-analytics-MyApplication-us-east-1`** per il ruolo IAM.

1. Nella sezione **Proprietà di runtime**, aggiungi le seguenti proprietà. 

1. Scegli **Aggiungi nuovo elemento** e aggiungi ciascuno dei seguenti parametri:    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/it_it/managed-flink/latest/java/gs-table-create.html)

1. Non modificare nessun'altra impostazione.

1. Scegli **Save changes** (Salva modifiche).

**Nota**  
Quando scegli di abilitare la CloudWatch registrazione di Amazon, Managed Service for Apache Flink crea un gruppo di log e un flusso di log per te. I nomi di tali risorse sono i seguenti:   
Gruppo di log: `/aws/kinesis-analytics/MyApplication`
Flusso di log: `kinesis-analytics-log-stream`

### Esecuzione dell'applicazione.
<a name="gs-table-7-console-run"></a>

L'applicazione è ora configurata e pronta per l'esecuzione.

**Per eseguire l'applicazione**

1. Torna alla pagina della console in Amazon Managed Service for Apache Flink e scegli. **MyApplication**

1. Scegli **Esegui** per avviare l'applicazione.

1. Nella **configurazione di ripristino dell'applicazione**, scegli **Esegui con l'ultima istantanea.**

1. Scegli **Esegui**.

1. Lo **stato** nell'**applicazione descrive in dettaglio** le transizioni da `Starting` e `Ready` a `Running` dopo l'avvio dell'applicazione. 

Quando l'applicazione è in `Running` stato, puoi aprire la dashboard di Flink. 

**Per aprire la dashboard e visualizzare il lavoro**

1. Scegli **Open Apache Flink dashboard**. La dashboard si apre in una nuova pagina.

1. Nell'elenco **Running Jobs**, scegli il singolo lavoro che puoi vedere.
**Nota**  
Se hai impostato le proprietà di runtime o hai modificato le policy IAM in modo errato, lo stato dell'applicazione potrebbe cambiare in`Running`, ma la dashboard di Flink mostra che il job viene riavviato continuamente. Si tratta di uno scenario di errore comune quando l'applicazione non è configurata correttamente o non dispone delle autorizzazioni per accedere alle risorse esterne.   
Quando ciò accade, controlla la scheda **Eccezioni** nella dashboard di Flink per indagare sulla causa del problema.

### Osserva le metriche dell'applicazione in esecuzione
<a name="gs-observe-metrics"></a>

Nella **MyApplication**pagina, nella sezione **Amazon CloudWatch metrics**, puoi vedere alcune delle metriche fondamentali dell'applicazione in esecuzione. 

**Per visualizzare le metriche**

1. Accanto al pulsante **Aggiorna**, seleziona **10 secondi** dall'elenco a discesa.

1. Quando l'applicazione è in esecuzione ed è integra, puoi vedere la metrica di **uptime aumentare continuamente**.

1. La metrica **fullrestarts deve** essere zero. Se è in aumento, la configurazione potrebbe presentare dei problemi. Consulta la scheda **Eccezioni** nella dashboard di Flink per esaminare il problema.

1. La metrica **del numero di checkpoint non riusciti** deve essere pari a zero in un'applicazione integra. 
**Nota**  
Questa dashboard mostra un set fisso di metriche con una granularità di 5 minuti. Puoi creare una dashboard applicativa personalizzata con qualsiasi metrica nella dashboard. CloudWatch 

### Osserva l'applicazione che scrive i dati nel bucket di destinazione
<a name="gs-observe-output"></a>

Ora puoi osservare l'applicazione in esecuzione in Amazon Managed Service for Apache Flink mentre scrive file su Amazon S3.

Per osservare i file, segui la stessa procedura utilizzata per verificare che i file fossero scritti quando l'applicazione era in esecuzione localmente. Per informazioni, consulta [Osserva l'applicazione che scrive dati in un bucket S3](#gs-table-input-output). 

Ricordate che l'applicazione scrive nuovi file sul checkpoint Flink. Quando è in esecuzione su Amazon Managed Service for Apache Flink, i checkpoint sono abilitati per impostazione predefinita e vengono eseguiti ogni 60 secondi. L'applicazione crea nuovi file all'incirca ogni minuto.

### Arresta l'applicazione
<a name="gs-table-7-console-stop"></a>

Per interrompere l'applicazione, vai alla pagina della console dell'applicazione Managed Service for Apache Flink denominata. `MyApplication`

**Per interrompere l'applicazione**

1. **Dall'elenco a discesa **Azione**, scegli Stop.**

1. Lo **stato** nell'**applicazione descrive in dettaglio** le transizioni da `Running` e quindi a `Ready` quando l'applicazione viene completamente interrotta. `Stopping` 
**Nota**  
Non dimenticare di interrompere anche l'invio di dati al flusso di input dallo script Python o dal Kinesis Data Generator.