

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Crear y ejecutar una aplicación de Managed Service para Apache Flink
<a name="gs-table-create"></a>

En este ejercicio, se crea una aplicación de Managed Service para Apache Flink con flujos de datos como origen y receptor.

**Topics**
+ [Creación de recursos dependientes](#gs-table-resources)
+ [Configuración de su entorno de desarrollo local](#gs-table-2)
+ [Descargar y consultar el código de Java de streaming de Apache Flink](#gs-table-5)
+ [Ejecución de la aplicación a nivel local](#gs-table-run-locally)
+ [Observe cómo la aplicación escribe datos en un bucket de S3](#gs-table-input-output)
+ [Detención de la ejecución de la aplicación de forma local](#gs-table-stop)
+ [Compilación y empaquetado del código de la aplicación](#gs-table-5.5)
+ [Carga del archivo JAR del código de la aplicación](#gs-table-6)
+ [Creación y ejecución de la aplicación de Managed Service para Apache Flink](#gs-table-7)

## Creación de recursos dependientes
<a name="gs-table-resources"></a>

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes: 
+ Un bucket de Amazon S3 para almacenar el código de la aplicación y escribir los resultados de la aplicación.
**nota**  
En este tutorial se supone que está implementando su aplicación en la región us-east-1. Si se utiliza otra región, se deben adaptar todos los pasos en consecuencia.

### Crear un bucket de Amazon S3
<a name="gs-table-resources-s3"></a>

Se puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:
+ [¿Cómo se puede crear un bucket de S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) en la *Guía de usuario de Amazon Simple Storage Service*. Asigne al bucket de Amazon S3 un nombre único globalmente mediante el agregado de su nombre de inicio de sesión.
**nota**  
Asegúrese de crear el bucket en la región que utilice para este tutorial. El valor predeterminado del tutorial es us-east-1.

### Otros recursos de
<a name="gs-table-resources-cw"></a>

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:
+ Un grupo de registro llamado `/AWS/KinesisAnalytics-java/<my-application>`.
+ Un flujo de registro llamado `kinesis-analytics-log-stream`.

## Configuración de su entorno de desarrollo local
<a name="gs-table-2"></a>

Para el desarrollo y la depuración, se puede ejecutar la aplicación Apache Flink en su máquina directamente desde el IDE que prefiera. Todas las dependencias de Apache Flink se administran como las dependencias normales de Java con Apache Maven.

**nota**  
En su máquina de desarrollo, debe tener instalados Java JDK 11, Maven y Git. Se recomienda utilizar un entorno de desarrollo como [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) o [IntelliJ IDEA](https://www.jetbrains.com/idea/). Para verificar que cumple con todos los requisitos previos, consulte. [Cumplimiento de los requisitos previos para realizar los ejercicios](getting-started.md#setting-up-prerequisites) **No** se necesita instalar un clúster de Apache Flink en su máquina. 

### Autentica tu sesión AWS
<a name="get-started-exercise-table-authenticate"></a>

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, debe tener una sesión AWS autenticada válida con permisos para escribir en la transmisión de datos de Kinesis. Siga los pasos siguientes para autenticar su sesión:

1. Si no tiene configurado el AWS CLI perfil con un nombre específico con una credencial válida, consulte. [Configura el () AWS Command Line Interface AWS CLI](setup-awscli.md)

1. Si su IDE tiene un complemento con el que integrarse AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en el IDE. Para obtener más información, consulte [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) y [AWS Toolkit for compiling the application or running Eclipse.](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)

## Descargar y consultar el código de Java de streaming de Apache Flink
<a name="gs-table-5"></a>

El código de aplicación de este ejemplo está disponible en GitHub. 

**Cómo descargar el código de la aplicación de Java**

1. Clone el repositorio remoto con el siguiente comando:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Vaya al directorio `./java/GettingStartedTable`.

### Revisión de los componentes de la aplicación
<a name="get-started-exercise-table-components"></a>

La aplicación está completamente implementada en la clase `com.amazonaws.services.msf.BasicTableJob`. El método `main()` define las fuentes, las transformaciones y los receptores. La ejecución se inicia mediante una sentencia de ejecución al final de este método.

**nota**  
Para una experiencia de desarrollador óptima, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su IDE.
+ Para leer la configuración del tiempo de ejecución para que funcione cuando se ejecute en Amazon Managed Service para Apache Flink y en su IDE, la aplicación detecta de manera automática si se ejecuta de forma independiente de forma local en el IDE. En ese caso, la aplicación carga la configuración del tiempo de ejecución de forma diferente:

  1. Cuando la aplicación detecte que se está ejecutando en modo independiente en tu IDE, crea el archivo `application_properties.json` incluido en la carpeta de **recursos** del proyecto. El contenido del archivo es el siguiente.

  1. Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, el comportamiento predeterminado carga la configuración de la aplicación desde las propiedades de tiempo de ejecución que se defina en la aplicación Amazon Managed Service para Apache Flink. Consulte [Creación y ejecución de la aplicación de Managed Service para Apache Flink](get-started-exercise.md#get-started-exercise-7).

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ El método `main()` define el flujo de datos de la aplicación y lo ejecuta. 
  + Inicializa los entornos de streaming predeterminados. En este ejemplo, mostramos cómo crear tanto la API `StreamExecutionEnvironment` para usarla con la DataStream API como la que se va `StreamTableEnvironment` a usar con SQL y la API de tablas. Los dos objetos de entorno son dos referencias independientes al mismo entorno de tiempo de ejecución, para utilizar diferentes API.

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + Cargue los parámetros de configuración de la aplicación. Esto los cargará de manera automática desde el lugar correcto, según el lugar en el que se ejecute la aplicación:

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + El [conector FileSystem receptor](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) que la aplicación utiliza para escribir los resultados en los archivos de salida de Amazon S3 cuando Flink completa un punto de [control](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing). Se deben habilitar los puntos de control para escribir archivos en el destino. Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, la configuración de la aplicación controla el punto de control y lo habilita de forma predeterminada. Por el contrario, cuando se ejecuta de forma local, los puntos de control están deshabilitados de forma predeterminada. La aplicación detecta que se ejecuta localmente y configura los puntos de control cada 5000 ms. 

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + Esta aplicación no recibe datos de una fuente externa real. [Genera datos aleatorios para procesarlos a través del DataGen conector.](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/) Este conector está disponible para DataStream API, SQL y Table API. Para demostrar la integración entre las API, la aplicación utiliza la versión de la DataStram API porque proporciona más flexibilidad. Cada registro se genera mediante una función generadora denominada `StockPriceGeneratorFunction` en este caso, en la que se puede poner una lógica personalizada. 

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + En la DataStream API, los registros pueden tener clases personalizadas. Las clases deben seguir reglas específicas para que Flink pueda usarlas como registro. Para obtener más información, consulte [Tipos de datos admitidos](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types). En este ejemplo, la clase `StockPrice` es un [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). 
  + Luego, la fuente se conecta al entorno de ejecución, lo que genera un `DataStream` de `StockPrice`. Esta aplicación no utiliza la [semántica del momento del evento](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time) y no genera una marca de agua. Ejecute la DataGenerator fuente con un paralelismo de 1, independiente del paralelismo del resto de la aplicación.

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + Lo que sigue en el flujo de procesamiento de datos se define mediante la API de tabla y SQL. Para ello, convertimos el de en una tabla. DataStream StockPrices El esquema de la tabla se deduce automáticamente de la clase `StockPrice`. 

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + El siguiente fragmento de código muestra cómo definir una vista y una consulta mediante la API de tabla de programación:

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + Se define una tabla de receptor para escribir los resultados en un bucket de Amazon S3 como archivos JSON. Para ilustrar la diferencia con la definición de una vista mediante programación, con la API de tablas, la tabla de destino se define mediante SQL.

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + El último paso consiste en un `executeInsert()` que inserta los precios de las acciones filtradas en la tabla de receptor. Este método inicia la ejecución del flujo de datos que hemos definido hasta ahora. 

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### Uso del archivo pom.xml
<a name="get-started-exercise-5-2"></a>

El archivo pom.xml define todas las dependencias requeridas por la aplicación y configura el complemento Maven Shade para crear el fat-jar que contiene todas las dependencias exigidas por Flink. 
+ Algunas dependencias tienen alcance `provided`. Estas dependencias están disponibles de manera automática cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink. Se requieren para la aplicación o para la aplicación local en su IDE. Para obtener más información, consulte (actualice a TableAPI) [Ejecución de la aplicación a nivel local](get-started-exercise.md#get-started-exercise-5-run). Asegúrese de utilizar la misma versión de Flink que el tiempo de ejecución que usará en Amazon Managed Service para Apache Flink. Para usar TableAPI y SQL, debe incluir los caracteres `flink-table-planner-loader` y `flink-table-runtime-dependencies`, ambos con el alcance `provided`. 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Se deben añadir dependencias adicionales de Apache Flink al pom con el ámbito predeterminado. Por ejemplo, el [DataGen conector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/), el [conector FileSystem SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/) y el [formato JSON](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/). 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ Para escribir en Amazon S3 cuando se ejecuta localmente, el sistema de archivos Hadoop S3 también se incluye con alcance `provided`.

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ El complemento Maven Java Compiler garantiza que el código esté compilado en Java 11, la versión de JDK actualmente compatible con Apache Flink. 
+ El complemento Maven Shade empaqueta el fat-jar, y excluye algunas bibliotecas que proporciona el tiempo de ejecución. También especifica dos transformadores: `ServicesResourceTransformer` y `ManifestResourceTransformer`. Este último configura la clase que contiene el método `main` para iniciar la aplicación. Si se cambia el nombre de la clase principal, no olvide actualizar este transformador.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## Ejecución de la aplicación a nivel local
<a name="gs-table-run-locally"></a>

Se puede ejecutar y depurar la aplicación Flink de forma local en su IDE.

**nota**  
Antes de continuar, compruebe que las secuencias de entrada y salida estén disponibles. Consulte [Crear dos Amazon Kinesis Data Streams](get-started-exercise.md#get-started-exercise-1). Además, compruebe que tiene permiso para leer y escribir en ambas secuencias. Consulte [Autentica tu sesión AWS](get-started-exercise.md#get-started-exercise-2-5).   
La configuración del entorno de desarrollo local requiere el JDK de Java 11, Apache Maven y un IDE para el desarrollo de Java. Verifique que cumple los requisitos previos requeridos. Consulte [Cumplimiento de los requisitos previos para realizar los ejercicios](getting-started.md#setting-up-prerequisites).

### Importación del proyecto Java a su IDE
<a name="gs-table-import"></a>

Para empezar a trabajar en la aplicación en su IDE, debe importarla como un proyecto Java. 

El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, importe el contenido del subdirectorio`./jave/GettingStartedTable` a su IDE. 

Inserte el código como un proyecto Java existente con Maven.

**nota**  
El proceso exacto para importar un nuevo proyecto de Java varía según el IDE que se utilice.

### Modificación de la configuración de la aplicación local
<a name="gs-table-modify-2"></a>

Cuando se ejecuta localmente, la aplicación utiliza la configuración del archivo `application_properties.json` de la carpeta de recursos del proyecto en `./src/main/resources`. Para esta aplicación de tutorial, los parámetros de configuración son el nombre del bucket y la ruta en la que se escribirán los datos.

Edite la configuración y modifique el nombre del bucket de Amazon S3 para que coincida con el bucket que creó al principio de este tutorial.

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**nota**  
La propiedad de configuración `name` debe contener solo el nombre del bucket, por ejemplo `my-bucket-name`. No incluya ningún prefijo, como `s3://` o una barra al final.  
Si se modifica la ruta, omita las barras diagonales iniciales o finales.

### Establecimiento de la configuración de ejecución del IDE
<a name="gs-table-setupIDE"></a>

Se puede ejecutar y depurar la aplicación Flink desde su IDE directamente al ejecutar la clase principal `com.amazonaws.services.msf.BasicTableJob`, como lo haría con cualquier aplicación Java. Antes de ejecutar la aplicación, se debe configurar la configuración de ejecución. La configuración depende del IDE que se utilice. Por ejemplo, consulte [Run/debug las configuraciones](https://www.jetbrains.com/help/idea/run-debug-configuration.html) en la documentación de IntelliJ IDEA. Es especialmente importante que se configure lo siguiente:

1. **Añada las dependencias `provided` a la ruta de clases**. Esto es necesario para garantizar que las dependencias con alcance `provided` se transfieran a la aplicación cuando se ejecuta localmente. Sin esta configuración, la aplicación muestra un error `class not found` de inmediato. 

1. **Pase las AWS credenciales para acceder a las transmisiones de Kinesis a la aplicación**. La forma más rápida es utilizar el [Kit de herramientas de AWS para IntelliJ IDEA](https://aws.amazon.com/intellij/). Con este complemento IDE en la configuración de ejecución, puede seleccionar un AWS perfil específico. AWS la autenticación se realiza con este perfil. No es necesario transmitir las credenciales AWS directamente. 

1. Compruebe que el IDE ejecute la aplicación mediante el **JDK 11**.

### Ejecución la aplicación en su IDE
<a name="gs-table-runIDE"></a>

Tras establecer la configuración de ejecución para el `BasicTableJob`, se lo puede ejecutar o depurar como una aplicación Java normal. 

**nota**  
No se puede ejecutar el fat-jar generado por Maven directamente con `java -jar ...` desde la línea de comandos. Este contenedor no contiene las dependencias principales de Flink necesarias para ejecutar la aplicación de forma independiente.

Cuando la aplicación se inicia correctamente, registra cierta información sobre el minicluster independiente y la inicialización de los conectores. A esto le siguen varios registros INFO y WARN que Flink por lo común emite cuando se inicia la aplicación.

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

Una vez completada la inicialización, la aplicación no emite más entradas de registro. **Mientras los datos fluyen, no se emite ningún registro.**

Para verificar si la aplicación procesa los datos correctamente, se puede inspeccionar el contenido del bucket de salida, tal y como se describe en la siguiente sección.

**nota**  
 No emitir registros sobre el flujo de datos es lo normal en una aplicación de Flink. Emitir registros en cada registro puede ser conveniente para la depuración, pero puede suponer una sobrecarga considerable cuando se ejecuta en producción. 

## Observe cómo la aplicación escribe datos en un bucket de S3
<a name="gs-table-input-output"></a>

Esta aplicación de ejemplo genera datos aleatorios internamente y los escribe en el bucket S3 de destino que se configuró. A menos que se modifique la ruta de configuración predeterminada, los datos se escribirán en la ruta de `output` seguida de la partición de datos y horas, en ese formato `./output/<yyyy-MM-dd>/<HH>`.

El [conector FileSystem colector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) crea nuevos archivos en el punto de control de Flink. Cuando se ejecuta localmente, la aplicación ejecuta un punto de control cada 5 segundos (5000 milisegundos), tal y como se especifica en el código. 

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**Navegación por el bucket de S3 y observación del archivo escrito por la aplicación**

1. 

   1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija el bucket que creó anteriormente. 

1. Navegue hasta la ruta `output` y, a continuación, hasta las carpetas de fecha y hora que corresponden a la hora actual en la zona horaria UTC. 

1. Actualice periódicamente para observar la aparición de nuevos archivos cada 5 segundos.

1. Seleccione y descargue un archivo para observar su contenido.
**nota**  
De forma predeterminada, los archivos no tienen extensiones. El contenido tiene formato JSON. Se pueden abrir los archivos con cualquier editor de texto para inspeccionar el contenido.

## Detención de la ejecución de la aplicación de forma local
<a name="gs-table-stop"></a>

Detenga la aplicación que se está ejecutando en el IDE. El IDE normalmente ofrece una opción de “parada”. La ubicación y el método exactos dependen del IDE. 

## Compilación y empaquetado del código de la aplicación
<a name="gs-table-5.5"></a>

En esta sección, se utilizará Apache Maven para compilar el código Java y empaquetarlo en un archivo JAR. Se puede compilar y empaquetar su código con la herramienta de línea de comandos de Maven o su IDE.

**Compilación y empaquetado con la línea de comandos de Maven**

Diríjase al directorio que contiene el GettingStarted proyecto Jave y ejecute el siguiente comando:

```
$ mvn package
```

**Compilación y empaquetado mediante su IDE**

Ejecute `mvn package` desde su integración de IDE con Maven.

En ambos casos, se crea el archivo JAR `target/amazon-msf-java-table-app-1.0.jar`.

**nota**  
Al ejecutar un *proyecto de compilación* desde el IDE, es posible que no se cree el archivo JAR.

## Carga del archivo JAR del código de la aplicación
<a name="gs-table-6"></a>

En esta sección, se cargará el archivo JAR que creó en la sección anterior en el bucket de Amazon S3 que creó al principio de este tutorial. Si ya lo ha hecho, complete [Crear un bucket de Amazon S3](#gs-table-resources-s3).

**Cómo cargar el código de la aplicación**

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija el bucket que creó anteriormente para el código de la aplicación.

1. Elija el campo **Cargar**. 

1. Elija **Add files**.

1. Navegue hasta el archivo JAR generado en la sección anterior:`target/amazon-msf-java-table-app-1.0.jar`.

1. Elija **Cargar** sin cambiar ninguna otra configuración.
**aviso**  
 Asegúrese de seleccionar el archivo JAR correcto en `<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar`.   
El directorio de destino también contiene otros archivos JAR que no necesita cargar. 

## Creación y ejecución de la aplicación de Managed Service para Apache Flink
<a name="gs-table-7"></a>

Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. En este tutorial, se utiliza la consola.

**nota**  
Cuando crea la aplicación mediante la consola, sus recursos AWS Identity and Access Management (de IAM) y de Amazon CloudWatch Logs se crean automáticamente. Si crea la aplicación mediante la AWS CLI, debe crear estos recursos por separado.

### Creación de la aplicación
<a name="gs-table-7-console-create"></a>

1. Inicie sesión en la consola Consola de administración de AWS de Amazon MSF y ábrala en https://console.aws.amazon.com/flink.

1. Compruebe que ha seleccionado la región correcta: Este de EE. UU. (Norte de Virginia)) us-east-1.

1. En el menú de la derecha, seleccione **Aplicaciones de Apache Flink** y, a continuación, seleccione **Crear aplicación de flujo**. Como alternativa, seleccione **Crear aplicación de flujo** en la sección **Introducción** de la página inicial. 

1. En la página **Crear aplicación de flujo**, complete lo siguiente:
   + En **Elija un método para configurar la aplicación de procesamiento de transmisiones**, elija **Crear desde cero**.
   + En **Configuración de Apache Flink, versión Application Flink**, elija **Apache Flink 1.19**.
   + En la sección **Configuración de la aplicación**, siga los pasos que se describen a continuación:
     + En **Nombre de la aplicación**, escriba **MyApplication**.
     + En **Descripción**, escriba **My Java Table API test app**.
     + Para **acceder a los recursos de la aplicación**, seleccione **Crear o actualizar el rol de IAM kinesis-analytics-** con las políticas requeridas. MyApplication-us-east-1 
   + En **Plantilla para la configuración de la aplicación**, complete lo siguiente:
     + En **Plantillas**, elija **Desarrollo.**

1. Elija **Crear aplicación de flujo**.

**nota**  
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:  
Política: `kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
Rol: `kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### Modificar la política de IAM
<a name="gs-table-7-console-iam"></a>

Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.

**Cómo editar la política de IAM para añadir los permisos para el bucket de S3**

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Elija **Políticas**. Elija la política **`kinesis-analytics-service-MyApplication-us-east-1`** que la consola creó en su nombre en la sección anterior. 

1. Elija **Editar política** y, a continuación, elija la pestaña **JSON**.

1. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya el ejemplo de ID de cuenta ({{012345678901}}) por su ID de cuenta y {{<bucket-name>}} por el nombre del bucket de S3 que creó.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  Elija **Guardar cambios** y después **Probar**.

### Configurar la aplicación
<a name="gs-table-7-console-configure"></a>

Edita la aplicación para configurar el artefacto de código de la aplicación.

**Cómo configurar la aplicación**

1. En la **MyApplication**página, selecciona **Configurar**.

1. En la sección **Ubicación del código de aplicación**, elija **Configurar**.
   + En **Bucket de Amazon S3**, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija **Navegar** y seleccione el bucket correcto y, a continuación, elija **Elegir**. No haga clic en el nombre del bucket. 
   + En **Ruta al objeto de Amazon S3**, introduzca **amazon-msf-java-table-app-1.0.jar**.

1. Para los **permisos de acceso**, seleccione **Crear o actualizar el rol de IAM**. `kinesis-analytics-MyApplication-us-east-1`

1. En la sección **Propiedades del tiempo de ejecución**, añada las siguientes propiedades. 

1. Seleccione **Añadir nuevo elemento** y añada cada uno de los siguientes parámetros:    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/gs-table-create.html)

1. No modifique ningún otra configuración.

1. Seleccione **Save changes (Guardar cambios)**.

**nota**  
Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:   
Grupo de registro: `/aws/kinesis-analytics/MyApplication`
Flujo de registro: `kinesis-analytics-log-stream`

### Ejecución de la aplicación
<a name="gs-table-7-console-run"></a>

La aplicación ya está configurada y lista para ejecutarse.

**Cómo ejecutar la aplicación**

1. Vuelva a la página de la consola en Amazon Managed Service for Apache Flink y elija **MyApplication**.

1. Seleccione **Ejecutar** para iniciar la aplicación.

1. En **Configuración de restauración de la aplicación**, elija **Ejecutar con la última instantánea**.

1. Seleccione **Ejecutar**.

1. El **estado** en **Detalles de la aplicación** cambia de `Ready` a `Starting` y luego a `Running` después que se ha iniciado la aplicación. 

Cuando la aplicación esté en el estado `Running`, puede abrir el panel de control de Flink. 

**Apertura del panel de control y visualización del trabajo**

1. Seleccione **Abrir el panel de Apache Flink**. El panel se abre en una nueva página.

1. En la lista **Trabajos en ejecución**, elija el único trabajo que puede ver.
**nota**  
Si se configuran las propiedades del tiempo de ejecución o se editan las políticas de IAM de forma incorrecta, es posible que el estado de la aplicación cambie a `Running`, pero el panel de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común cuando la aplicación está mal configurada o carece de los permisos para acceder a recursos externos.   
Cuando esto suceda, consulte la pestaña **Excepciones** del panel de Flink para investigar la causa del problema.

### Observación de las métricas de la aplicación en ejecución
<a name="gs-observe-metrics"></a>

En la **MyApplication**página, en la sección de ** CloudWatch métricas de Amazon**, puedes ver algunas de las métricas fundamentales de la aplicación en ejecución. 

**Visualización de las métricas**

1. Junto al botón **Actualizar**, seleccione **10 segundos** en la lista desplegable.

1. Cuando la aplicación está en ejecución y en buen estado, se puede ver que la métrica de **tiempo de actividad** aumenta continuamente.

1. La métrica **fullrestarts** debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Consulte la pestaña **Excepciones** del panel de Flink para investigar el problema.

1. La métrica **Número de puntos de control fallidos** debe ser cero en una aplicación en buen estado. 
**nota**  
En este panel se muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Puedes crear un panel de aplicaciones personalizado con cualquier métrica del CloudWatch panel.

### Observe cómo la aplicación escribe datos en el bucket de destino
<a name="gs-observe-output"></a>

Ahora se puede observar cómo se ejecuta en Amazon Managed Service para Apache Flink al escribir archivos en Amazon S3.

Para observar los archivos, siga el mismo proceso que utilizó para comprobar los archivos que se escribían cuando la aplicación se ejecutaba de manera local. Consulte [Observe cómo la aplicación escribe datos en un bucket de S3](#gs-table-input-output). 

Recuerde que la aplicación escribe los nuevos archivos en el punto de control de Flink. Cuando se ejecutan en Amazon Managed Service para Apache Flink, los puntos de control están habilitados de forma predeterminada y se ejecutan cada 60 segundos. La aplicación crea nuevos archivos aproximadamente cada 1 minuto.

### Detener la aplicación
<a name="gs-table-7-console-stop"></a>

Para detener la aplicación, vaya a la página de la consola de la aplicación de Managed Service para Apache Flink denominada `MyApplication`.

**Cómo detener la aplicación**

1. En la lista desplegable **Acciones**, seleccione **Detener**.

1. El **estado** en **Detalles de la aplicación** cambia de `Running` a `Stopping` y después a `Ready` cuando se ha detenido totalmente la aplicación. 
**nota**  
No olvide que también debe dejar de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.