Crear y ejecutar una aplicación de Managed Service para Apache Flink - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Crear y ejecutar una aplicación de Managed Service para Apache Flink

En este ejercicio, se crea una aplicación de Managed Service para Apache Flink con flujos de datos como origen y receptor.

Creación de recursos dependientes

Antes de crear un Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Un bucket de Amazon S3 para almacenar el código de la aplicación y escribir los resultados de la aplicación.

    nota

    En este tutorial se supone que está implementando su aplicación en la región us-east-1. Si se utiliza otra región, se deben adaptar todos los pasos en consecuencia.

Crear un bucket de Amazon S3

Se puede crear el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear este recurso, consulte los siguientes temas:

  • ¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente mediante el agregado de su nombre de inicio de sesión.

    nota

    Asegúrese de crear el bucket en la región que utilice para este tutorial. El valor predeterminado del tutorial es us-east-1.

Otros recursos de

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/<my-application>.

  • Un flujo de registro llamado kinesis-analytics-log-stream.

Configuración de su entorno de desarrollo local

Para el desarrollo y la depuración, se puede ejecutar la aplicación Apache Flink en su máquina directamente desde el IDE que prefiera. Todas las dependencias de Apache Flink se administran como las dependencias normales de Java con Apache Maven.

nota

En su máquina de desarrollo, debe tener instalados Java JDK 11, Maven y Git. Se recomienda utilizar un entorno de desarrollo como Eclipse Java Neon o IntelliJ IDEA. Para verificar que cumple con todos los requisitos previos, consulte. Cumplimiento de los requisitos previos para realizar los ejercicios No se necesita instalar un clúster de Apache Flink en su máquina.

Autenticación de la sesión de AWS

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, debe tener una sesión AWS autenticada válida con permisos para escribir en la transmisión de datos de Kinesis. Siga los pasos siguientes para autenticar su sesión:

  1. Si no tiene configurado el AWS CLI perfil con un nombre específico con una credencial válida, consulte. Configure el AWS Command Line Interface ()AWS CLI

  2. Si su IDE tiene un complemento con el que integrarse AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en el IDE. Para obtener más información, consulte AWS Toolkit for IntelliJ IDEA y AWS Toolkit for compiling the application or running Eclipse.

Descargar y consultar el código de Java de streaming de Apache Flink

El código de aplicación de este ejemplo está disponible en GitHub.

Cómo descargar el código de la aplicación de Java
  1. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Vaya al directorio ./java/GettingStartedTable.

Revisión de los componentes de la aplicación

La aplicación está completamente implementada en la clase com.amazonaws.services.msf.BasicTableJob. El método main() define las fuentes, las transformaciones y los receptores. La ejecución se inicia mediante una sentencia de ejecución al final de este método.

nota

Para una experiencia de desarrollador óptima, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su IDE.

  • Para leer la configuración del tiempo de ejecución para que funcione cuando se ejecute en Amazon Managed Service para Apache Flink y en su IDE, la aplicación detecta de manera automática si se ejecuta de forma independiente de forma local en el IDE. En ese caso, la aplicación carga la configuración del tiempo de ejecución de forma diferente:

    1. Cuando la aplicación detecte que se está ejecutando en modo independiente en tu IDE, crea el archivo application_properties.json incluido en la carpeta de recursos del proyecto. El contenido del archivo es el siguiente.

    2. Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, el comportamiento predeterminado carga la configuración de la aplicación desde las propiedades de tiempo de ejecución que se defina en la aplicación Amazon Managed Service para Apache Flink. Consulte Creación y ejecución de la aplicación de Managed Service para Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • El método main() define el flujo de datos de la aplicación y lo ejecuta.

    • Inicializa los entornos de streaming predeterminados. En este ejemplo, mostramos cómo crear tanto la API StreamExecutionEnvironment para usarla con la DataStream API como la que se va StreamTableEnvironment a usar con SQL y la API de tablas. Los dos objetos de entorno son dos referencias independientes al mismo entorno de ejecución, para utilizarlos de forma diferente APIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    • Cargue los parámetros de configuración de la aplicación. Esto los cargará de manera automática desde el lugar correcto, según el lugar en el que se ejecute la aplicación:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • El conector FileSystem receptor que la aplicación utiliza para escribir los resultados en los archivos de salida de Amazon S3 cuando Flink completa un punto de control. Se deben habilitar los puntos de control para escribir archivos en el destino. Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, la configuración de la aplicación controla el punto de control y lo habilita de forma predeterminada. Por el contrario, cuando se ejecuta de forma local, los puntos de control están deshabilitados de forma predeterminada. La aplicación detecta que se ejecuta localmente y configura los puntos de control cada 5000 ms.

      if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
    • Esta aplicación no recibe datos de una fuente externa real. Genera datos aleatorios para procesarlos a través del DataGen conector. Este conector está disponible para DataStream API, SQL y Table API. Para demostrar la integración entre APIs ellas, la aplicación utiliza la versión DataStram API porque proporciona más flexibilidad. Cada registro se genera mediante una función generadora denominada StockPriceGeneratorFunction en este caso, en la que se puede poner una lógica personalizada.

      DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
    • En la DataStream API, los registros pueden tener clases personalizadas. Las clases deben seguir reglas específicas para que Flink pueda usarlas como registro. Para obtener más información, consulte Tipos de datos admitidos. En este ejemplo, la clase StockPrice es un POJO.

    • Luego, la fuente se conecta al entorno de ejecución, lo que genera un DataStream de StockPrice. Esta aplicación no utiliza la semántica del momento del evento y no genera una marca de agua. Ejecute la DataGenerator fuente con un paralelismo de 1, independiente del paralelismo del resto de la aplicación.

      DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
    • Lo que sigue en el flujo de procesamiento de datos se define mediante la API de tabla y SQL. Para ello, convertimos el de en una tabla. DataStream StockPrices El esquema de la tabla se deduce automáticamente de la clase StockPrice.

      Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    • El siguiente fragmento de código muestra cómo definir una vista y una consulta mediante la API de tabla de programación:

      Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    • Se define una tabla de receptor para escribir los resultados en un bucket de Amazon S3 como archivos JSON. Para ilustrar la diferencia con la definición de una vista mediante programación, con la API de tablas, la tabla de destino se define mediante SQL.

      tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
    • El último paso consiste en un executeInsert() que inserta los precios de las acciones filtradas en la tabla de receptor. Este método inicia la ejecución del flujo de datos que hemos definido hasta ahora.

      filteredStockPricesTable.executeInsert("s3_sink");

Uso del archivo pom.xml

El archivo pom.xml define todas las dependencias requeridas por la aplicación y configura el complemento Maven Shade para crear el fat-jar que contiene todas las dependencias exigidas por Flink.

  • Algunas dependencias tienen alcance provided. Estas dependencias están disponibles de manera automática cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink. Se requieren para la aplicación o para la aplicación local en su IDE. Para obtener más información, consulte (actualice a TableAPI) Ejecución de la aplicación a nivel local. Asegúrese de utilizar la misma versión de Flink que el tiempo de ejecución que usará en Amazon Managed Service para Apache Flink. Para usar TableAPI y SQL, debe incluir los caracteres flink-table-planner-loader y flink-table-runtime-dependencies, ambos con el alcance provided.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Se deben añadir dependencias adicionales de Apache Flink al pom con el ámbito predeterminado. Por ejemplo, el DataGen conector, el conector FileSystem SQL y el formato JSON.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
  • Para escribir en Amazon S3 cuando se ejecuta localmente, el sistema de archivos Hadoop S3 también se incluye con alcance provided.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • El complemento Maven Java Compiler garantiza que el código esté compilado en Java 11, la versión de JDK actualmente compatible con Apache Flink.

  • El complemento Maven Shade empaqueta el fat-jar, y excluye algunas bibliotecas que proporciona el tiempo de ejecución. También especifica dos transformadores: ServicesResourceTransformer y ManifestResourceTransformer. Este último configura la clase que contiene el método main para iniciar la aplicación. Si se cambia el nombre de la clase principal, no olvide actualizar este transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Ejecución de la aplicación a nivel local

Se puede ejecutar y depurar la aplicación Flink de forma local en su IDE.

nota

Antes de continuar, compruebe que las secuencias de entrada y salida estén disponibles. Consulte Crear dos Amazon Kinesis Data Streams. Además, compruebe que tiene permiso para leer y escribir en ambas secuencias. Consulte Autenticación de la sesión de AWS.

La configuración del entorno de desarrollo local requiere el JDK de Java 11, Apache Maven y un IDE para el desarrollo de Java. Verifique que cumple los requisitos previos requeridos. Consulte Cumplimiento de los requisitos previos para realizar los ejercicios.

Importación del proyecto Java a su IDE

Para empezar a trabajar en la aplicación en su IDE, debe importarla como un proyecto Java.

El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, importe el contenido del subdirectorio./jave/GettingStartedTable a su IDE.

Inserte el código como un proyecto Java existente con Maven.

nota

El proceso exacto para importar un nuevo proyecto de Java varía según el IDE que se utilice.

Modificación de la configuración de la aplicación local

Cuando se ejecuta localmente, la aplicación utiliza la configuración del archivo application_properties.json de la carpeta de recursos del proyecto en ./src/main/resources. Para esta aplicación de tutorial, los parámetros de configuración son el nombre del bucket y la ruta en la que se escribirán los datos.

Edite la configuración y modifique el nombre del bucket de Amazon S3 para que coincida con el bucket que creó al principio de este tutorial.

[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
nota

La propiedad de configuración name debe contener solo el nombre del bucket, por ejemplo my-bucket-name. No incluya ningún prefijo, como s3:// o una barra al final.

Si se modifica la ruta, omita las barras diagonales iniciales o finales.

Establecimiento de la configuración de ejecución del IDE

Se puede ejecutar y depurar la aplicación Flink desde su IDE directamente al ejecutar la clase principal com.amazonaws.services.msf.BasicTableJob, como lo haría con cualquier aplicación Java. Antes de ejecutar la aplicación, se debe configurar la configuración de ejecución. La configuración depende del IDE que se utilice. Por ejemplo, consulte Ejecutar/depurar configuraciones en la documentación de IntelliJ IDEA. Es especialmente importante que se configure lo siguiente:

  1. Añada las dependencias provided a la ruta de clases. Esto es necesario para garantizar que las dependencias con alcance provided se transfieran a la aplicación cuando se ejecuta localmente. Sin esta configuración, la aplicación muestra un error class not found de inmediato.

  2. Pase las AWS credenciales para acceder a las transmisiones de Kinesis a la aplicación. La forma más rápida es utilizar el Kit de herramientas de AWS para IntelliJ IDEA. Con este complemento IDE en la configuración de ejecución, puede seleccionar un AWS perfil específico. AWS la autenticación se realiza con este perfil. No es necesario transmitir las credenciales AWS directamente.

  3. Compruebe que el IDE ejecute la aplicación mediante el JDK 11.

Ejecución la aplicación en su IDE

Tras establecer la configuración de ejecución para el BasicTableJob, se lo puede ejecutar o depurar como una aplicación Java normal.

nota

No se puede ejecutar el fat-jar generado por Maven directamente con java -jar ... desde la línea de comandos. Este contenedor no contiene las dependencias principales de Flink necesarias para ejecutar la aplicación de forma independiente.

Cuando la aplicación se inicia correctamente, registra cierta información sobre el minicluster independiente y la inicialización de los conectores. A esto le siguen varios registros INFO y WARN que Flink por lo común emite cuando se inicia la aplicación.

21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...

Una vez completada la inicialización, la aplicación no emite más entradas de registro. Mientras los datos fluyen, no se emite ningún registro.

Para verificar si la aplicación procesa los datos correctamente, se puede inspeccionar el contenido del bucket de salida, tal y como se describe en la siguiente sección.

nota

No emitir registros sobre el flujo de datos es lo normal en una aplicación de Flink. Emitir registros en cada registro puede ser conveniente para la depuración, pero puede suponer una sobrecarga considerable cuando se ejecuta en producción.

Observe cómo la aplicación escribe datos en un bucket de S3

Esta aplicación de ejemplo genera datos aleatorios internamente y los escribe en el bucket S3 de destino que se configuró. A menos que se modifique la ruta de configuración predeterminada, los datos se escribirán en la ruta de output seguida de la partición de datos y horas, en ese formato ./output/<yyyy-MM-dd>/<HH>.

El conector FileSystem colector crea nuevos archivos en el punto de control de Flink. Cuando se ejecuta localmente, la aplicación ejecuta un punto de control cada 5 segundos (5000 milisegundos), tal y como se especifica en el código.

if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
Navegación por el bucket de S3 y observación del archivo escrito por la aplicación
  1. Elija el bucket que creó anteriormente.

  2. Navegue hasta la ruta output y, a continuación, hasta las carpetas de fecha y hora que corresponden a la hora actual en la zona horaria UTC.

  3. Actualice periódicamente para observar la aparición de nuevos archivos cada 5 segundos.

  4. Seleccione y descargue un archivo para observar su contenido.

    nota

    De forma predeterminada, los archivos no tienen extensiones. El contenido tiene formato JSON. Se pueden abrir los archivos con cualquier editor de texto para inspeccionar el contenido.

Detención de la ejecución de la aplicación de forma local

Detenga la aplicación que se está ejecutando en el IDE. El IDE normalmente ofrece una opción de “parada”. La ubicación y el método exactos dependen del IDE.

Compilación y empaquetado del código de la aplicación

En esta sección, se utilizará Apache Maven para compilar el código Java y empaquetarlo en un archivo JAR. Se puede compilar y empaquetar su código con la herramienta de línea de comandos de Maven o su IDE.

Compilación y empaquetado con la línea de comandos de Maven

Diríjase al directorio que contiene el GettingStarted proyecto Jave y ejecute el siguiente comando:

$ mvn package

Compilación y empaquetado mediante su IDE

Ejecute mvn package desde su integración de IDE con Maven.

En ambos casos, se crea el archivo JAR target/amazon-msf-java-table-app-1.0.jar.

nota

Al ejecutar un proyecto de compilación desde el IDE, es posible que no se cree el archivo JAR.

Carga del archivo JAR del código de la aplicación

En esta sección, se cargará el archivo JAR que creó en la sección anterior en el bucket de Amazon S3 que creó al principio de este tutorial. Si ya lo ha hecho, complete Crear un bucket de Amazon S3.

Cómo cargar el código de la aplicación
  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3/.

  2. Elija el bucket que creó anteriormente para el código de la aplicación.

  3. Elija el campo Cargar.

  4. Elija Add files.

  5. Navegue hasta el archivo JAR generado en la sección anterior:target/amazon-msf-java-table-app-1.0.jar.

  6. Elija Cargar sin cambiar ninguna otra configuración.

    aviso

    Asegúrese de seleccionar el archivo JAR correcto en <repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar.

    El directorio de destino también contiene otros archivos JAR que no necesita cargar.

Creación y ejecución de la aplicación de Managed Service para Apache Flink

Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. En este tutorial, se utiliza la consola.

nota

Cuando crea la aplicación mediante la consola, sus recursos AWS Identity and Access Management (de IAM) y de Amazon CloudWatch Logs se crean automáticamente. Si crea la aplicación mediante la AWS CLI, debe crear estos recursos por separado.

Creación de la aplicación

  1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

  2. Compruebe que ha seleccionado la región correcta: Este de EE. UU. (Norte de Virginia)) us-east-1.

  3. En el menú de la derecha, seleccione Aplicaciones de Apache Flink y, a continuación, seleccione Crear aplicación de flujo. Como alternativa, seleccione Crear aplicación de flujo en la sección Introducción de la página inicial.

  4. En la página Crear aplicación de flujo, complete lo siguiente:

    • En Elija un método para configurar la aplicación de procesamiento de transmisiones, elija Crear desde cero.

    • En Configuración de Apache Flink, versión Application Flink, elija Apache Flink 1.19.

    • En la sección Configuración de la aplicación, siga los pasos que se describen a continuación:

      • En Nombre de la aplicación, escriba MyApplication.

      • En Descripción, escriba My Java Table API test app.

      • Para acceder a los recursos de la aplicación, seleccione Crear o actualizar el rol kinesis-analytics-MyApplication-us de IAM -east-1 con las políticas requeridas.

    • En Plantilla para la configuración de la aplicación, complete lo siguiente:

      • En Plantillas, elija Desarrollo.

  5. Elija Crear aplicación de flujo.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Rol: kinesisanalytics-MyApplication-us-east-1

Modificar la política de IAM

Edite la política de IAM para añadir los permisos para acceder al bucket de Amazon S3.

Cómo editar la política de IAM para añadir los permisos para el bucket de S3
  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-east-1 que la consola creó en su nombre en la sección anterior.

  3. Elija Editar política y, a continuación, elija la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya el ejemplo de ID de cuenta (012345678901) por su ID de cuenta y <bucket-name> por el nombre del bucket de S3 que creó.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteOutputBucket", "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::amzn-s3-demo-bucket2" ] } ] }
  5. Elija Guardar cambios y después Probar.

Configurar la aplicación

Edita la aplicación para configurar el artefacto de código de la aplicación.

Cómo configurar la aplicación
  1. En la MyApplicationpágina, selecciona Configurar.

  2. En la sección Ubicación del código de aplicación, elija Configurar.

    • En Bucket de Amazon S3, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija Navegar y seleccione el bucket correcto y, a continuación, elija Elegir. No haga clic en el nombre del bucket.

    • En Ruta al objeto de Amazon S3, introduzca amazon-msf-java-table-app-1.0.jar.

  3. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM. kinesis-analytics-MyApplication-us-east-1

  4. En la sección Propiedades del tiempo de ejecución, añada las siguientes propiedades.

  5. Seleccione Añadir nuevo elemento y añada cada uno de los siguientes parámetros:

    ID de grupo Clave Valor
    bucket name your-bucket-name
    bucket path output
  6. No modifique ningún otra configuración.

  7. Seleccione Save changes (Guardar cambios).

nota

Cuando eliges habilitar el CloudWatch registro de Amazon, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros para ti. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Ejecución de la aplicación

La aplicación ya está configurada y lista para ejecutarse.

Cómo ejecutar la aplicación
  1. Vuelva a la página de la consola en Amazon Managed Service for Apache Flink y elija MyApplication.

  2. Seleccione Ejecutar para iniciar la aplicación.

  3. En Configuración de restauración de la aplicación, elija Ejecutar con la última instantánea.

  4. Seleccione Ejecutar.

  5. El estado en Detalles de la aplicación cambia de Ready a Starting y luego a Running después que se ha iniciado la aplicación.

Cuando la aplicación esté en el estado Running, puede abrir el panel de control de Flink.

Apertura del panel de control y visualización del trabajo
  1. Seleccione Abrir el panel de Apache Flink. El panel se abre en una nueva página.

  2. En la lista Trabajos en ejecución, elija el único trabajo que puede ver.

    nota

    Si se configuran las propiedades del tiempo de ejecución o se editan las políticas de IAM de forma incorrecta, es posible que el estado de la aplicación cambie a Running, pero el panel de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común cuando la aplicación está mal configurada o carece de los permisos para acceder a recursos externos.

    Cuando esto suceda, consulte la pestaña Excepciones del panel de Flink para investigar la causa del problema.

Observación de las métricas de la aplicación en ejecución

En la MyApplicationpágina, en la sección de CloudWatch métricas de Amazon, puedes ver algunas de las métricas fundamentales de la aplicación en ejecución.

Visualización de las métricas
  1. Junto al botón Actualizar, seleccione 10 segundos en la lista desplegable.

  2. Cuando la aplicación está en ejecución y en buen estado, se puede ver que la métrica de tiempo de actividad aumenta continuamente.

  3. La métrica fullrestarts debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Consulte la pestaña Excepciones del panel de Flink para investigar el problema.

  4. La métrica Número de puntos de control fallidos debe ser cero en una aplicación en buen estado.

    nota

    En este panel se muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Puedes crear un panel de aplicaciones personalizado con cualquier métrica del CloudWatch panel.

Observe cómo la aplicación escribe datos en el bucket de destino

Ahora se puede observar cómo se ejecuta en Amazon Managed Service para Apache Flink al escribir archivos en Amazon S3.

Para observar los archivos, siga el mismo proceso que utilizó para comprobar los archivos que se escribían cuando la aplicación se ejecutaba de manera local. Consulte Observe cómo la aplicación escribe datos en un bucket de S3.

Recuerde que la aplicación escribe los nuevos archivos en el punto de control de Flink. Cuando se ejecutan en Amazon Managed Service para Apache Flink, los puntos de control están habilitados de forma predeterminada y se ejecutan cada 60 segundos. La aplicación crea nuevos archivos aproximadamente cada 1 minuto.

Detener la aplicación

Para detener la aplicación, vaya a la página de la consola de la aplicación de Managed Service para Apache Flink denominada MyApplication.

Cómo detener la aplicación
  1. En la lista desplegable Acciones, seleccione Detener.

  2. El estado en Detalles de la aplicación cambia de Running a Stopping y después a Ready cuando se ha detenido totalmente la aplicación.

    nota

    No olvide que también debe dejar de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.