Crear y ejecutar una aplicación de Managed Service para Apache Flink - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Crear y ejecutar una aplicación de Managed Service para Apache Flink

En este paso, se deberá crear una aplicación de Managed Service para Apache Flink con flujos de datos de Kinesis como origen y receptor.

Creación de recursos dependientes

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de datos de Kinesis para entrada y salida.

  • Un bucket de Amazon S3 para almacenar el código de la aplicación

    nota

    En este tutorial se supone que está implementando su aplicación en la región us-east-1 Este de EE. UU. (Norte de Virginia). Si se utiliza otra región, adapte todos los pasos en consecuencia.

Crear dos Amazon Kinesis Data Streams

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.

Se puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI. Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams. Para crear las transmisiones mediante la AWS CLI, utilice los siguientes comandos, ajustándolos a la región que utilice para su aplicación.

Cómo crear flujos de datos (AWS CLI)
  1. Para crear el primer flujo (ExampleInputStream), utilice el siguiente comando de la AWS CLI create-stream de Amazon Kinesis.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. Para crear la segunda secuencia que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream:

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

Creación de un bucket de Amazon S3 para el código de la aplicación

Se puede crear el bucket de Amazon S3 usando la consola. Para aprender a crear un bucket de Amazon S3, consulte Creación de un bucket en la Guía de usuario Amazon S3. Asigne al bucket de Amazon S3 un nombre único globalmente, por ejemplo, mediante el agregado de su nombre de inicio de sesión.

nota

Asegúrese de crear el bucket en la región que utilice para este tutorial (us-east-1).

Otros recursos

Al crear la aplicación, Managed Service para Apache Flink de manera automática crea los siguientes recursos de Amazon CloudWatch si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/<my-application>

  • Un flujo de registro llamado kinesis-analytics-log-stream

Configuración de su entorno de desarrollo local

Para el desarrollo y la depuración, se puede ejecutar la aplicación Apache Flink en su máquina directamente desde el IDE que prefiera. Todas las dependencias de Apache Flink se administran como las dependencias normales de Java con Apache Maven.

nota

En su máquina de desarrollo, debe tener instalados Java JDK 11, Maven y Git. Se recomienda utilizar un entorno de desarrollo como Eclipse Java Neon o IntelliJ IDEA. Para verificar que cumple con todos los requisitos previos, consulte. Cumplimiento de los requisitos previos para realizar los ejercicios No se necesita instalar un clúster de Apache Flink en su máquina.

Autenticación de la sesión de AWS

La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, se debe tener una sesión de AWS autenticada válida con permisos para escribir en el flujo de datos de Kinesis. Siga los pasos siguientes para autenticar su sesión:

  1. Si no tiene configurado el perfil con un nombre de AWS CLI y una credencial válida, consulte Cómo configurar la AWS Command Line Interface (AWS CLI).

  2. Compruebe que AWS CLI está correctamente configurado y que sus usuarios tienen permisos para escribir en el flujo de datos de Kinesis publicando el siguiente registro de prueba:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Si su IDE tiene un complemento con el que integrarse con AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en el IDE. Para obtener más información, consulte AWS Toolkit for IntelliJ IDEA y AWS Toolkit for Eclipse.

Descargar y consultar el código de Java de streaming de Apache Flink

El código de la aplicación de Java para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Vaya al directorio amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Revisión de los componentes de la aplicación

La aplicación está completamente implementada en la clase com.amazonaws.services.msf.BasicStreamingJob. El método main() define el flujo de datos para procesar los datos de flujo y ejecutarlos.

nota

Para una experiencia de desarrollador optimizada, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su IDE.

  • Para leer la configuración del tiempo de ejecución para que funcione cuando se ejecute en Amazon Managed Service para Apache Flink y en su IDE, la aplicación detecta de manera automática si se ejecuta de forma independiente de forma local en el IDE. En ese caso, la aplicación carga la configuración del tiempo de ejecución de forma diferente:

    1. Cuando la aplicación detecte que se está ejecutando en modo independiente en tu IDE, crea el archivo application_properties.json incluido en la carpeta de recursos del proyecto. El contenido del archivo es el siguiente.

    2. Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, el comportamiento predeterminado carga la configuración de la aplicación desde las propiedades de tiempo de ejecución que se defina en la aplicación Amazon Managed Service para Apache Flink. Consulte Creación y ejecución de la aplicación de Managed Service para Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • El método main() define el flujo de datos de la aplicación y lo ejecuta.

    • Inicializa los entornos de streaming predeterminados. En este ejemplo, se muestra cómo crear tanto el StreamExecutionEnvironment que se utilizará con la API de DataSteam como el StreamTableEnvironment que se utilizará con SQL y la API de tabla. Los dos objetos de entorno son dos referencias independientes al mismo entorno de tiempo de ejecución, para utilizar diferentes API.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • Cargue los parámetros de configuración de la aplicación. Esto los cargará de manera automática desde el lugar correcto, según el lugar en el que se ejecute la aplicación:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • La aplicación define una fuente mediante el conector Kinesis Consumer para leer los datos del flujo de entrada. La configuración del flujo de entrada está definida en PropertyGroupId=InputStream0. El nombre y la región del flujo se encuentran en las propiedades denominadas stream.name y aws.region, respectivamente. Para simplificar, esta fuente lee los registros como una cadena.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • A continuación, la aplicación define un receptor mediante el conector del receptor de Kinesis Streams para enviar datos al flujo de salida. El nombre y la región del flujo de salida se definen en PropertyGroupId =OutputStream0, de forma semejante al flujo de entrada. El receptor está conectado directamente al DataStream interno que recibe los datos de la fuente. En una aplicación real, hay transformación entre la fuente y el receptor.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • Por último, se ejecuta el flujo de datos que se acaba de definir. Esta debe ser la última instrucción del método main(), después de definir todos los operadores que requiere el flujo de datos:

      env.execute("Flink streaming Java API skeleton");

Uso del archivo pom.xml

El archivo pom.xml define todas las dependencias requeridas por la aplicación y configura el complemento Maven Shade para crear el fat-jar que contiene todas las dependencias exigidas por Flink.

  • Algunas dependencias tienen alcance provided. Estas dependencias están disponibles de manera automática cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink. Son necesarias para compilar la aplicación o para ejecutarla de manera local en el IDE. Para obtener más información, consulte Ejecución de la aplicación a nivel local. Asegúrese de utilizar la misma versión de Flink que el tiempo de ejecución que usará en Amazon Managed Service para Apache Flink.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Se deben añadir dependencias adicionales de Apache Flink al pom con el ámbito predeterminado, como el conector Kinesis que utiliza esta aplicación. Para obtener más información, consulte Uso de conectores de Apache Flink. También se puede añadir cualquier dependencia de Java adicional que necesite su aplicación.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • El complemento Maven Java Compiler garantiza que el código esté compilado en Java 11, la versión de JDK actualmente compatible con Apache Flink.

  • El complemento Maven Shade empaqueta el fat-jar, y excluye algunas bibliotecas que proporciona el tiempo de ejecución. También especifica dos transformadores: ServicesResourceTransformer y ManifestResourceTransformer. Este último configura la clase que contiene el método main para iniciar la aplicación. Si se cambia el nombre de la clase principal, no olvide actualizar este transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Escritura de registros de muestra en el flujo de entrada

En esta sección, se envían registros de muestra al flujo para que los procese la aplicación. Tiene dos opciones para generar datos de muestra: mediante un script de Python o el generador de datos de Kinesis.

Generación de datos de muestra mediante un script de Python

Se puede utilizar un script de Python para enviar registros de muestra al flujo.

nota

Para ejecutar este script de Python, debe usar Python 3.x y tener instalada la biblioteca AWS SDK para Python (Boto).

Para empezar a enviar datos de prueba al flujo de entrada de Kinesis:

  1. Se descarga el script de Python stock.py del generador de datos desde el repositorio de GitHub del generador de datos.

  2. Ejecute el script stock.py:

    $ python stock.py

Mantenga el script en ejecución mientras completa el resto del tutorial. Ahora se puede ejecutar la aplicación Apache Flink.

Genere datos de muestra con Kinesis Data Generator

Como alternativa a la secuencia de comandos de Python, se puede utilizar el Generador de datos de Kinesis, también disponible en una versión alojada, para enviar datos de muestra al azar al flujo. Kinesis Data Generator se ejecuta en su navegador y no se necesita instalar nada en su máquina.

Para configurar y ejecutar Kinesis Data Generator:

  1. Siga las instrucciones de la documentación de Kinesis Data Generator para configurar el acceso a la herramienta. Se ejecutará una plantilla CloudFormation que configurará un usuario y una contraseña.

  2. Acceda a Kinesis Data Generator a través de la URL generada por la plantilla de CloudFormation. Encontrará la URL en la pestaña Resultados una vez que haya completado la plantilla de CloudFormation.

  3. Configure el generador de datos:

    • Región: seleccione la región que está utilizando para este tutorial: us-east-1

    • Flujo de flujo/entrega: seleccione el flujo de entrada que utilizará la aplicación: ExampleInputStream

    • Registros por segundo: 100

    • Plantilla de registro: copie y pegue la siguiente plantilla:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Pruebe la plantilla: elija la plantilla de prueba y compruebe que el registro generado es semejante al siguiente:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie el generador de datos: elija Seleccionar enviar datos.

Kinesis Data Generator ahora envía datos al ExampleInputStream.

Ejecución de la aplicación a nivel local

Se puede ejecutar y depurar la aplicación Flink de forma local en su IDE.

nota

Antes de continuar, compruebe que las secuencias de entrada y salida estén disponibles. Consulte Crear dos Amazon Kinesis Data Streams. Además, compruebe que tiene permiso para leer y escribir en ambas secuencias. Consulte Autenticación de la sesión de AWS.

La configuración del entorno de desarrollo local requiere el JDK de Java 11, Apache Maven y un IDE para el desarrollo de Java. Verifique que cumple los requisitos previos requeridos. Consulte Cumplimiento de los requisitos previos para realizar los ejercicios.

Importación del proyecto Java a su IDE

Para empezar a trabajar en la aplicación en su IDE, debe importarla como un proyecto Java.

El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, se importa el contenido del subdirectorio ./java/GettingStarted a su IDE.

Inserte el código como un proyecto Java existente con Maven.

nota

El proceso exacto para importar un nuevo proyecto de Java varía según el IDE que se utilice.

Compruebe la configuración de la aplicación local

Cuando se ejecuta localmente, la aplicación utiliza la configuración del archivo application_properties.json de la carpeta de recursos del proyecto en ./src/main/resources. Se puede editar este archivo para usar diferentes regiones o nombres de flujo de Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Establecimiento de la configuración de ejecución del IDE

Se puede ejecutar y depurar la aplicación Flink desde su IDE directamente al ejecutar la clase principal com.amazonaws.services.msf.BasicStreamingJob, como lo haría con cualquier aplicación Java. Antes de ejecutar la aplicación, se debe configurar la configuración de ejecución. La configuración depende del IDE que se utilice. Por ejemplo, consulte Ejecutar/depurar configuraciones en la documentación de IntelliJ IDEA. Es especialmente importante que se configure lo siguiente:

  1. Añada las dependencias provided a la ruta de clases. Esto es necesario para garantizar que las dependencias con alcance provided se transfieran a la aplicación cuando se ejecuta localmente. Sin esta configuración, la aplicación muestra un error class not found de inmediato.

  2. Transmita las credenciales AWS para acceder a las transmisiones de Kinesis a la aplicación. La forma más rápida es utilizar el Kit de herramientas de AWS para IntelliJ IDEA. Con este complemento IDE en la configuración de ejecución, puede seleccionar un perfil AWS específico. La autenticación de AWS se realiza con este perfil. No es necesario transmitir las credenciales AWS directamente.

  3. Compruebe que el IDE ejecute la aplicación mediante el JDK 11.

Ejecución la aplicación en su IDE

Tras establecer la configuración de ejecución para el BasicStreamingJob, se lo puede ejecutar o depurar como una aplicación Java normal.

nota

No se puede ejecutar el fat-jar generado por Maven directamente con java -jar ... desde la línea de comandos. Este contenedor no contiene las dependencias principales de Flink necesarias para ejecutar la aplicación de forma independiente.

Cuando la aplicación se inicia correctamente, registra cierta información sobre el minicluster independiente y la inicialización de los conectores. A esto le siguen varios registros INFO y WARN que Flink por lo común emite cuando se inicia la aplicación.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

Una vez completada la inicialización, la aplicación no emite más entradas de registro. Mientras los datos fluyen, no se emite ningún registro.

Para verificar si la aplicación procesa los datos de manera correcta, puede inspeccionar las transmisiones de Kinesis de entrada y salida, tal y como se describe en la siguiente sección.

nota

No emitir registros sobre el flujo de datos es lo normal en una aplicación de Flink. Emitir registros en cada registro puede ser conveniente para la depuración, pero puede suponer una sobrecarga considerable cuando se ejecuta en producción.

Observación de los datos de entrada y salida en las transmisiones de Kinesis

Se pueden observar los registros enviados al flujo de entrada por el (que genera un ejemplo de Python) o el generador de datos de Kinesis (enlace) mediante el visor de datos de la consola de Amazon Kinesis.

Observación de los registros
  1. Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Compruebe que la región es la misma en la que está ejecutando este tutorial, que es us-east-1 Este de EE. UU. (Norte de Virginia) de forma predeterminada. Cambie la región si no coincide.

  3. Elija Flujos de datos.

  4. Selección del flujo que desee observar, ExampleInputStream o ExampleOutputStream.

  5. Seleccione la pestaña Visor de datos.

  6. Elija cualquier partición, mantenga el valor Último como posición inicial y, a continuación, seleccione Obtener registros. Es posible que aparezca el error “No se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en el flujo.

  7. Elija el valor de la columna Datos para inspeccionar el contenido del registro en formato JSON.

Detención de la ejecución de la aplicación de forma local

Detenga la aplicación que se está ejecutando en el IDE. El IDE normalmente ofrece una opción de “parada”. La ubicación y el método exactos dependen del IDE que se utilice.

Compilación y empaquetado del código de la aplicación

En esta sección, se utilizará Apache Maven para compilar el código Java y empaquetarlo en un archivo JAR. Se puede compilar y empaquetar su código con la herramienta de línea de comandos de Maven o su IDE.

Compilación y empaquetado con la línea de comandos de Maven:

Vaya al directorio que contiene el proyecto Java GettingStarted y ejecute el siguiente comando:

$ mvn package

Compilación y empaquetado mediante su IDE:

Ejecute mvn package desde su integración de IDE con Maven.

En ambos casos, se crea el siguiente archivo JAR: target/amazon-msf-java-stream-app-1.0.jar.

nota

Al ejecutar un “proyecto de compilación” desde su IDE, es posible que no se cree el archivo JAR.

Carga del archivo JAR del código de la aplicación

En esta sección, cargará el archivo JAR que creó en la sección anterior en el bucket de Amazon Simple Storage Service (Amazon S3) que creó al principio de este tutorial. Si no se ha completado este paso, consulte (enlace).

Cómo cargar el archivo JAR del código de la aplicación
  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  2. Elija el bucket que creó anteriormente para el código de la aplicación.

  3. Seleccione Cargar.

  4. Elija Add files.

  5. Navegue hasta el archivo JAR generado en el paso anterior: target/amazon-msf-java-stream-app-1.0.jar.

  6. Elija Cargar sin cambiar ninguna otra configuración.

aviso

Asegúrese de seleccionar el archivo JAR correcto en <repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

El directorio target también contiene otros archivos JAR que no necesita cargar.

Creación y ejecución de la aplicación de Managed Service para Apache Flink

Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. En este tutorial, se utiliza la consola.

nota

Al crear la aplicación mediante la consola, se crean sus recursos de AWS Identity and Access Management (IAM) y Registros de Amazon CloudWatch en su nombre. Si crea la aplicación mediante la AWS CLI, debe crear estos recursos por separado.

Creación de la aplicación

Para crear la aplicación
  1. Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.

  2. Compruebe que ha seleccionado la región correcta: us-east-1 Este de EE. UU. (Norte de Virginia)

  3. Abra el menú de la derecha y seleccione Aplicaciones de Apache Flink y luego Crear aplicación de streaming. También puede elegir Crear aplicación de streaming en el contenedor Introducción de la página inicial.

  4. En la página Crear aplicación de streaming:

    • Elija un método para configurar la aplicación de procesamiento de transmisiones: elija Crear desde cero.

    • Configuración de Apache Flink, versión de Application Flink: elija Apache Flink 1.20.

  5. Configuración de la aplicación

    • Nombre de aplicación: escriba MyApplication.

    • Descripción: escriba My java test app

    • Acceso a los recursos de la aplicación: elija Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-east-1 con las políticas requeridas.

  6. Configuración de la plantilla para los ajustes de la aplicación

    • Plantillas: elija Desarrollo.

  7. Elija Crear aplicación de streaming en la parte inferior de la página.

nota

Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Rol: : kinesisanalytics-MyApplication-us-east-1

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Kinesis Data Analytics. El nombre de los recursos que se crean de manera automática lleva un prefijo kinesis-analytics- por motivos de compatibilidad con versiones anteriores.

Modificar la política de IAM

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

Edición de la política
  1. Abra la consola de IAM en https://console.aws.amazon.com/iam/.

  2. Elija Políticas. Elija la política kinesis-analytics-service-MyApplication-us-east-1 que la consola creó en su nombre en la sección anterior.

  3. Elija Editar política y, a continuación, elija la pestaña JSON.

  4. Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (012345678901) por el ID de su cuenta.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Seleccione Siguiente en la parte inferior de la página y, a continuación, seleccione Guardar cambios.

Configurar la aplicación

Edite la configuración de la aplicación para establecer el artefacto del código de la aplicación.

Edición de la configuración
  1. En la página MyApplication, elija Configurar.

  2. En la sección Ubicación del código de la aplicación:

    • En Bucket de Amazon S3, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija Examinar y seleccione el bucket correcto y, a continuación, seleccione Elegir. No haga clic en el nombre del bucket.

    • En Ruta al objeto de Amazon S3, introduzca amazon-msf-java-stream-app-1.0.jar.

  3. Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM kinesis-analytics-MyApplication-us-east-1 con las políticas requeridas.

  4. En la sección Propiedades del tiempo de ejecución, añada las siguientes propiedades.

  5. Seleccione Añadir nuevo elemento y añada cada uno de los siguientes parámetros:

    ID de grupo Clave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. No modifique ninguna de las demás secciones.

  7. Seleccione Save changes (Guardar cambios).

nota

Al activar el registro de Amazon CloudWatch, Managed Service para Apache Flink crea un grupo de registro y un flujo de registro. Los nombres de estos recursos son los siguientes:

  • Grupo de registro: /aws/kinesis-analytics/MyApplication

  • Flujo de registro: kinesis-analytics-log-stream

Ejecución de la aplicación

La aplicación ya está configurada y lista para ejecutarse.

Cómo ejecutar la aplicación
  1. En la consola de Amazon Managed Service para Apache Flink, seleccione Mi aplicación y, a continuación, Ejecutar.

  2. En la página siguiente, la página de configuración de restauración de la aplicación, seleccione Ejecutar con la última instantánea y, a continuación, seleccione Ejecutar.

    El estado en Detalles de la aplicación cambia de Ready a Starting y luego a Running cuando se ha iniciado la aplicación.

Cuando la aplicación esté en el estado Running, se puede abrir el panel de control de Flink.

Para abrir el panel de
  1. Seleccione Abrir el panel de control de Apache Flink. El panel se abre en una nueva página.

  2. En la lista Trabajos en ejecución, elija el único trabajo que pueda ver.

    nota

    Si se configuran las propiedades de Runtime o se editan las políticas de IAM de forma incorrecta, el estado de la solicitud podría cambiar a Running, pero el panel de control de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común si la aplicación está mal configurada o carece de permisos para acceder a los recursos externos.

    Cuando esto suceda, consulte la pestaña Excepciones en el panel de control de Flink para ver la causa del problema.

Observación de las métricas de la aplicación en ejecución

En la página MyApplication, en la sección de métricas de Amazon CloudWatch, se pueden ver algunas de las métricas fundamentales de la aplicación en ejecución.

Visualización de las métricas
  1. Junto al botón Actualizar, seleccione 10 segundos en la lista desplegable.

  2. Cuando la aplicación está en ejecución y en buen estado, se puede ver que la métrica de tiempo de actividad aumenta continuamente.

  3. La métrica fullrestarts debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Para investigar el problema, consulte la pestaña Excepciones del panel de control de Flink.

  4. La métrica Número de puntos de control fallidos debe ser cero en una aplicación en buen estado.

    nota

    En este panel se muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Se puede crear un panel de aplicaciones personalizado con cualquier métrica del panel de CloudWatch.

Observación de los datos de salida en los flujos de Kinesis

Asegúrese de seguir publicando datos en la entrada, ya sea mediante el script de Python o el generador de datos de Kinesis.

Ahora se puede observar el resultado de la aplicación que se ejecuta en Managed Service para Apache Flink mediante el visor de datos de https://console.aws.amazon.com/kinesis/, de forma similar a lo que se hacía anteriormente.

Visualización del resultado
  1. Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis.

  2. Compruebe que la región es la misma que la que está utilizando para ejecutar este tutorial. Por defecto, es us-east-1 Este de EE. UU. (Norte de Virginia). De ser necesario, cambie la región.

  3. Elija Flujos de datos.

  4. Seleccione el flujo que desea observar. Para este tutorial, escriba ExampleOutputStream.

  5. Seleccione la pestaña Visor de datos.

  6. Seleccione cualquier partición, mantenga el valor Último como Posición inicial y, a continuación, elija Obtener registros. Es posible que aparezca el error “no se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en el flujo.

  7. Seleccione el valor en la columna Datos para inspeccionar el contenido del registro en formato JSON.

Detener la aplicación

Para detener la aplicación, vaya a la página de la consola de la aplicación de Managed Service para Apache Flink denominada MyApplication.

Cómo detener la aplicación
  1. En la lista desplegable Acciones, seleccione Detener.

  2. El estado en los detalles de la aplicación cambia de Running a Stopping, y después a Ready cuando se ha detenido totalmente la aplicación.

    nota

    No olvide que también debe dejar de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.

Siguiente paso

Limpieza de recursos de AWS