Amazon Managed Service para Apache Flink Amazon (Amazon MSF) se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.
Crear y ejecutar una aplicación de Managed Service para Apache Flink
En este paso, se deberá crear una aplicación de Managed Service para Apache Flink con flujos de datos de Kinesis como origen y receptor.
Esta sección contiene los siguientes pasos:
Creación de recursos dependientes
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:
-
Dos flujos de datos de Kinesis para entrada y salida.
-
Un bucket de Amazon S3 para almacenar el código de la aplicación
nota
En este tutorial se supone que está implementando su aplicación en la región us-east-1 Este de EE. UU. (Norte de Virginia). Si se utiliza otra región, adapte todos los pasos en consecuencia.
Crear dos Amazon Kinesis Data Streams
Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, cree dos flujos de datos de Kinesis (ExampleInputStream y ExampleOutputStream). Su aplicación utiliza estos flujos para los flujos de origen y destino de la aplicación.
Se puede crear estos flujos mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI. Para obtener instrucciones sobre la consola, consulte Creating and Updating Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams. Para crear las transmisiones mediante la AWS CLI, utilice los siguientes comandos, ajustándolos a la región que utilice para su aplicación.
Cómo crear flujos de datos (AWS CLI)
-
Para crear el primer flujo (
ExampleInputStream), utilice el siguiente comando de la AWS CLIcreate-streamde Amazon Kinesis.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ -
Para crear la segunda secuencia que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a
ExampleOutputStream:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
Creación de un bucket de Amazon S3 para el código de la aplicación
Se puede crear el bucket de Amazon S3 usando la consola. Para aprender a crear un bucket de Amazon S3, consulte Creación de un bucket en la Guía de usuario Amazon S3. Asigne al bucket de Amazon S3 un nombre único globalmente, por ejemplo, mediante el agregado de su nombre de inicio de sesión.
nota
Asegúrese de crear el bucket en la región que utilice para este tutorial (us-east-1).
Otros recursos
Al crear la aplicación, Managed Service para Apache Flink de manera automática crea los siguientes recursos de Amazon CloudWatch si aún no existen:
-
Un grupo de registro llamado
/AWS/KinesisAnalytics-java/<my-application> -
Un flujo de registro llamado
kinesis-analytics-log-stream
Configuración de su entorno de desarrollo local
Para el desarrollo y la depuración, se puede ejecutar la aplicación Apache Flink en su máquina directamente desde el IDE que prefiera. Todas las dependencias de Apache Flink se administran como las dependencias normales de Java con Apache Maven.
nota
En su máquina de desarrollo, debe tener instalados Java JDK 11, Maven y Git. Se recomienda utilizar un entorno de desarrollo como Eclipse Java Neon
Autenticación de la sesión de AWS
La aplicación utiliza los flujos de datos de Kinesis para publicar datos. Si se ejecuta de forma local, se debe tener una sesión de AWS autenticada válida con permisos para escribir en el flujo de datos de Kinesis. Siga los pasos siguientes para autenticar su sesión:
-
Si no tiene configurado el perfil con un nombre de AWS CLI y una credencial válida, consulte Cómo configurar la AWS Command Line Interface (AWS CLI).
-
Compruebe que AWS CLI está correctamente configurado y que sus usuarios tienen permisos para escribir en el flujo de datos de Kinesis publicando el siguiente registro de prueba:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
Si su IDE tiene un complemento con el que integrarse con AWS, puede usarlo para pasar las credenciales a la aplicación que se ejecuta en el IDE. Para obtener más información, consulte AWS Toolkit for IntelliJ IDEA
y AWS Toolkit for Eclipse.
Descargar y consultar el código de Java de streaming de Apache Flink
El código de la aplicación de Java para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:
-
Clone el repositorio remoto con el siguiente comando:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
Vaya al directorio
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.
Revisión de los componentes de la aplicación
La aplicación está completamente implementada en la clase com.amazonaws.services.msf.BasicStreamingJob. El método main() define el flujo de datos para procesar los datos de flujo y ejecutarlos.
nota
Para una experiencia de desarrollador optimizada, la aplicación está diseñada para ejecutarse sin cambios de código tanto en Amazon Managed Service para Apache Flink como de forma local, para el desarrollo en su IDE.
-
Para leer la configuración del tiempo de ejecución para que funcione cuando se ejecute en Amazon Managed Service para Apache Flink y en su IDE, la aplicación detecta de manera automática si se ejecuta de forma independiente de forma local en el IDE. En ese caso, la aplicación carga la configuración del tiempo de ejecución de forma diferente:
-
Cuando la aplicación detecte que se está ejecutando en modo independiente en tu IDE, crea el archivo
application_properties.jsonincluido en la carpeta de recursos del proyecto. El contenido del archivo es el siguiente. -
Cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink, el comportamiento predeterminado carga la configuración de la aplicación desde las propiedades de tiempo de ejecución que se defina en la aplicación Amazon Managed Service para Apache Flink. Consulte Creación y ejecución de la aplicación de Managed Service para Apache Flink.
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
El método
main()define el flujo de datos de la aplicación y lo ejecuta.-
Inicializa los entornos de streaming predeterminados. En este ejemplo, se muestra cómo crear tanto el
StreamExecutionEnvironmentque se utilizará con la API de DataSteam como elStreamTableEnvironmentque se utilizará con SQL y la API de tabla. Los dos objetos de entorno son dos referencias independientes al mismo entorno de tiempo de ejecución, para utilizar diferentes API.StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -
Cargue los parámetros de configuración de la aplicación. Esto los cargará de manera automática desde el lugar correcto, según el lugar en el que se ejecute la aplicación:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
La aplicación define una fuente mediante el conector Kinesis Consumer
para leer los datos del flujo de entrada. La configuración del flujo de entrada está definida en PropertyGroupId=InputStream0. El nombre y la región del flujo se encuentran en las propiedades denominadasstream.nameyaws.region, respectivamente. Para simplificar, esta fuente lee los registros como una cadena.private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... } -
A continuación, la aplicación define un receptor mediante el conector del receptor de Kinesis Streams
para enviar datos al flujo de salida. El nombre y la región del flujo de salida se definen en PropertyGroupId=OutputStream0, de forma semejante al flujo de entrada. El receptor está conectado directamente alDataStreaminterno que recibe los datos de la fuente. En una aplicación real, hay transformación entre la fuente y el receptor.private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... } -
Por último, se ejecuta el flujo de datos que se acaba de definir. Esta debe ser la última instrucción del método
main(), después de definir todos los operadores que requiere el flujo de datos:env.execute("Flink streaming Java API skeleton");
-
Uso del archivo pom.xml
El archivo pom.xml define todas las dependencias requeridas por la aplicación y configura el complemento Maven Shade para crear el fat-jar que contiene todas las dependencias exigidas por Flink.
-
Algunas dependencias tienen alcance
provided. Estas dependencias están disponibles de manera automática cuando la aplicación se ejecuta en Amazon Managed Service para Apache Flink. Son necesarias para compilar la aplicación o para ejecutarla de manera local en el IDE. Para obtener más información, consulte Ejecución de la aplicación a nivel local. Asegúrese de utilizar la misma versión de Flink que el tiempo de ejecución que usará en Amazon Managed Service para Apache Flink.<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
Se deben añadir dependencias adicionales de Apache Flink al pom con el ámbito predeterminado, como el conector Kinesis
que utiliza esta aplicación. Para obtener más información, consulte Uso de conectores de Apache Flink. También se puede añadir cualquier dependencia de Java adicional que necesite su aplicación. <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
El complemento Maven Java Compiler garantiza que el código esté compilado en Java 11, la versión de JDK actualmente compatible con Apache Flink.
-
El complemento Maven Shade empaqueta el fat-jar, y excluye algunas bibliotecas que proporciona el tiempo de ejecución. También especifica dos transformadores:
ServicesResourceTransformeryManifestResourceTransformer. Este último configura la clase que contiene el métodomainpara iniciar la aplicación. Si se cambia el nombre de la clase principal, no olvide actualizar este transformador. -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
Escritura de registros de muestra en el flujo de entrada
En esta sección, se envían registros de muestra al flujo para que los procese la aplicación. Tiene dos opciones para generar datos de muestra: mediante un script de Python o el generador de datos de Kinesis
Generación de datos de muestra mediante un script de Python
Se puede utilizar un script de Python para enviar registros de muestra al flujo.
nota
Para ejecutar este script de Python, debe usar Python 3.x y tener instalada la biblioteca AWS SDK para Python (Boto)
Para empezar a enviar datos de prueba al flujo de entrada de Kinesis:
-
Se descarga el script de Python
stock.pydel generador de datos desde el repositorio de GitHub del generador de datos. -
Ejecute el script
stock.py:$ python stock.py
Mantenga el script en ejecución mientras completa el resto del tutorial. Ahora se puede ejecutar la aplicación Apache Flink.
Genere datos de muestra con Kinesis Data Generator
Como alternativa a la secuencia de comandos de Python, se puede utilizar el Generador de datos de Kinesis
Para configurar y ejecutar Kinesis Data Generator:
-
Siga las instrucciones de la documentación de Kinesis Data Generator
para configurar el acceso a la herramienta. Se ejecutará una plantilla CloudFormation que configurará un usuario y una contraseña. -
Acceda a Kinesis Data Generator a través de la URL generada por la plantilla de CloudFormation. Encontrará la URL en la pestaña Resultados una vez que haya completado la plantilla de CloudFormation.
-
Configure el generador de datos:
-
Región: seleccione la región que está utilizando para este tutorial: us-east-1
-
Flujo de flujo/entrega: seleccione el flujo de entrada que utilizará la aplicación:
ExampleInputStream -
Registros por segundo: 100
-
Plantilla de registro: copie y pegue la siguiente plantilla:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Pruebe la plantilla: elija la plantilla de prueba y compruebe que el registro generado es semejante al siguiente:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
Inicie el generador de datos: elija Seleccionar enviar datos.
Kinesis Data Generator ahora envía datos al ExampleInputStream.
Ejecución de la aplicación a nivel local
Se puede ejecutar y depurar la aplicación Flink de forma local en su IDE.
nota
Antes de continuar, compruebe que las secuencias de entrada y salida estén disponibles. Consulte Crear dos Amazon Kinesis Data Streams. Además, compruebe que tiene permiso para leer y escribir en ambas secuencias. Consulte Autenticación de la sesión de AWS.
La configuración del entorno de desarrollo local requiere el JDK de Java 11, Apache Maven y un IDE para el desarrollo de Java. Verifique que cumple los requisitos previos requeridos. Consulte Cumplimiento de los requisitos previos para realizar los ejercicios.
Importación del proyecto Java a su IDE
Para empezar a trabajar en la aplicación en su IDE, debe importarla como un proyecto Java.
El repositorio que ha clonado contiene varios ejemplos. Cada ejemplo es un proyecto independiente. Para este tutorial, se importa el contenido del subdirectorio ./java/GettingStarted a su IDE.
Inserte el código como un proyecto Java existente con Maven.
nota
El proceso exacto para importar un nuevo proyecto de Java varía según el IDE que se utilice.
Compruebe la configuración de la aplicación local
Cuando se ejecuta localmente, la aplicación utiliza la configuración del archivo application_properties.json de la carpeta de recursos del proyecto en ./src/main/resources. Se puede editar este archivo para usar diferentes regiones o nombres de flujo de Kinesis.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Establecimiento de la configuración de ejecución del IDE
Se puede ejecutar y depurar la aplicación Flink desde su IDE directamente al ejecutar la clase principal com.amazonaws.services.msf.BasicStreamingJob, como lo haría con cualquier aplicación Java. Antes de ejecutar la aplicación, se debe configurar la configuración de ejecución. La configuración depende del IDE que se utilice. Por ejemplo, consulte Ejecutar/depurar configuraciones
-
Añada las dependencias
provideda la ruta de clases. Esto es necesario para garantizar que las dependencias con alcanceprovidedse transfieran a la aplicación cuando se ejecuta localmente. Sin esta configuración, la aplicación muestra un errorclass not foundde inmediato. -
Transmita las credenciales AWS para acceder a las transmisiones de Kinesis a la aplicación. La forma más rápida es utilizar el Kit de herramientas de AWS para IntelliJ IDEA
. Con este complemento IDE en la configuración de ejecución, puede seleccionar un perfil AWS específico. La autenticación de AWS se realiza con este perfil. No es necesario transmitir las credenciales AWS directamente. -
Compruebe que el IDE ejecute la aplicación mediante el JDK 11.
Ejecución la aplicación en su IDE
Tras establecer la configuración de ejecución para el BasicStreamingJob, se lo puede ejecutar o depurar como una aplicación Java normal.
nota
No se puede ejecutar el fat-jar generado por Maven directamente con java -jar
... desde la línea de comandos. Este contenedor no contiene las dependencias principales de Flink necesarias para ejecutar la aplicación de forma independiente.
Cuando la aplicación se inicia correctamente, registra cierta información sobre el minicluster independiente y la inicialización de los conectores. A esto le siguen varios registros INFO y WARN que Flink por lo común emite cuando se inicia la aplicación.
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
Una vez completada la inicialización, la aplicación no emite más entradas de registro. Mientras los datos fluyen, no se emite ningún registro.
Para verificar si la aplicación procesa los datos de manera correcta, puede inspeccionar las transmisiones de Kinesis de entrada y salida, tal y como se describe en la siguiente sección.
nota
No emitir registros sobre el flujo de datos es lo normal en una aplicación de Flink. Emitir registros en cada registro puede ser conveniente para la depuración, pero puede suponer una sobrecarga considerable cuando se ejecuta en producción.
Observación de los datos de entrada y salida en las transmisiones de Kinesis
Se pueden observar los registros enviados al flujo de entrada por el (que genera un ejemplo de Python) o el generador de datos de Kinesis (enlace) mediante el visor de datos de la consola de Amazon Kinesis.
Observación de los registros
Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis
. -
Compruebe que la región es la misma en la que está ejecutando este tutorial, que es us-east-1 Este de EE. UU. (Norte de Virginia) de forma predeterminada. Cambie la región si no coincide.
-
Elija Flujos de datos.
-
Selección del flujo que desee observar,
ExampleInputStreamoExampleOutputStream. -
Seleccione la pestaña Visor de datos.
-
Elija cualquier partición, mantenga el valor Último como posición inicial y, a continuación, seleccione Obtener registros. Es posible que aparezca el error “No se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en el flujo.
-
Elija el valor de la columna Datos para inspeccionar el contenido del registro en formato JSON.
Detención de la ejecución de la aplicación de forma local
Detenga la aplicación que se está ejecutando en el IDE. El IDE normalmente ofrece una opción de “parada”. La ubicación y el método exactos dependen del IDE que se utilice.
Compilación y empaquetado del código de la aplicación
En esta sección, se utilizará Apache Maven para compilar el código Java y empaquetarlo en un archivo JAR. Se puede compilar y empaquetar su código con la herramienta de línea de comandos de Maven o su IDE.
Compilación y empaquetado con la línea de comandos de Maven:
Vaya al directorio que contiene el proyecto Java GettingStarted y ejecute el siguiente comando:
$ mvn package
Compilación y empaquetado mediante su IDE:
Ejecute mvn package desde su integración de IDE con Maven.
En ambos casos, se crea el siguiente archivo JAR: target/amazon-msf-java-stream-app-1.0.jar.
nota
Al ejecutar un “proyecto de compilación” desde su IDE, es posible que no se cree el archivo JAR.
Carga del archivo JAR del código de la aplicación
En esta sección, cargará el archivo JAR que creó en la sección anterior en el bucket de Amazon Simple Storage Service (Amazon S3) que creó al principio de este tutorial. Si no se ha completado este paso, consulte (enlace).
Cómo cargar el archivo JAR del código de la aplicación
Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3
. -
Elija el bucket que creó anteriormente para el código de la aplicación.
-
Seleccione Cargar.
-
Elija Add files.
-
Navegue hasta el archivo JAR generado en el paso anterior:
target/amazon-msf-java-stream-app-1.0.jar. -
Elija Cargar sin cambiar ninguna otra configuración.
aviso
Asegúrese de seleccionar el archivo JAR correcto en <repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.
El directorio target también contiene otros archivos JAR que no necesita cargar.
Creación y ejecución de la aplicación de Managed Service para Apache Flink
Se puede crear y ejecutar una aplicación de Managed Service para Apache Flink mediante la consola o la AWS CLI. En este tutorial, se utiliza la consola.
nota
Al crear la aplicación mediante la consola, se crean sus recursos de AWS Identity and Access Management (IAM) y Registros de Amazon CloudWatch en su nombre. Si crea la aplicación mediante la AWS CLI, debe crear estos recursos por separado.
Temas
Creación de la aplicación
Para crear la aplicación
Inicie sesión en la Consola de administración de AWS y abra la consola de Amazon MSF en https://console.aws.amazon.com/flink.
-
Compruebe que ha seleccionado la región correcta: us-east-1 Este de EE. UU. (Norte de Virginia)
-
Abra el menú de la derecha y seleccione Aplicaciones de Apache Flink y luego Crear aplicación de streaming. También puede elegir Crear aplicación de streaming en el contenedor Introducción de la página inicial.
-
En la página Crear aplicación de streaming:
-
Elija un método para configurar la aplicación de procesamiento de transmisiones: elija Crear desde cero.
-
Configuración de Apache Flink, versión de Application Flink: elija Apache Flink 1.20.
-
-
Configuración de la aplicación
-
Nombre de aplicación: escriba
MyApplication. -
Descripción: escriba
My java test app -
Acceso a los recursos de la aplicación: elija Crear o actualizar el rol de IAM
kinesis-analytics-MyApplication-us-east-1con las políticas requeridas.
-
-
Configuración de la plantilla para los ajustes de la aplicación
-
Plantillas: elija Desarrollo.
-
-
Elija Crear aplicación de streaming en la parte inferior de la página.
nota
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:
-
Política:
kinesis-analytics-service-MyApplication-us-east-1 -
Rol: :
kinesisanalytics-MyApplication-us-east-1
Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Kinesis Data Analytics. El nombre de los recursos que se crean de manera automática lleva un prefijo kinesis-analytics- por motivos de compatibilidad con versiones anteriores.
Modificar la política de IAM
Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.
Edición de la política
Abra la consola de IAM en https://console.aws.amazon.com/iam/
. -
Elija Políticas. Elija la política
kinesis-analytics-service-MyApplication-us-east-1que la consola creó en su nombre en la sección anterior. -
Elija Editar política y, a continuación, elija la pestaña JSON.
-
Añada la sección subrayada de la siguiente política de ejemplo a la política. Reemplace el ID de la cuenta de muestra (
012345678901) por el ID de su cuenta. -
Seleccione Siguiente en la parte inferior de la página y, a continuación, seleccione Guardar cambios.
Configurar la aplicación
Edite la configuración de la aplicación para establecer el artefacto del código de la aplicación.
Edición de la configuración
-
En la página MyApplication, elija Configurar.
-
En la sección Ubicación del código de la aplicación:
-
En Bucket de Amazon S3, seleccione el bucket que creó anteriormente para el código de la aplicación. Elija Examinar y seleccione el bucket correcto y, a continuación, seleccione Elegir. No haga clic en el nombre del bucket.
-
En Ruta al objeto de Amazon S3, introduzca
amazon-msf-java-stream-app-1.0.jar.
-
-
Para los permisos de acceso, seleccione Crear o actualizar el rol de IAM
kinesis-analytics-MyApplication-us-east-1con las políticas requeridas. -
En la sección Propiedades del tiempo de ejecución, añada las siguientes propiedades.
-
Seleccione Añadir nuevo elemento y añada cada uno de los siguientes parámetros:
ID de grupo Clave Valor InputStream0stream.nameExampleInputStreamInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1 -
No modifique ninguna de las demás secciones.
-
Seleccione Save changes (Guardar cambios).
nota
Al activar el registro de Amazon CloudWatch, Managed Service para Apache Flink crea un grupo de registro y un flujo de registro. Los nombres de estos recursos son los siguientes:
-
Grupo de registro:
/aws/kinesis-analytics/MyApplication -
Flujo de registro:
kinesis-analytics-log-stream
Ejecución de la aplicación
La aplicación ya está configurada y lista para ejecutarse.
Cómo ejecutar la aplicación
-
En la consola de Amazon Managed Service para Apache Flink, seleccione Mi aplicación y, a continuación, Ejecutar.
-
En la página siguiente, la página de configuración de restauración de la aplicación, seleccione Ejecutar con la última instantánea y, a continuación, seleccione Ejecutar.
El estado en Detalles de la aplicación cambia de
ReadyaStartingy luego aRunningcuando se ha iniciado la aplicación.
Cuando la aplicación esté en el estado Running, se puede abrir el panel de control de Flink.
Para abrir el panel de
-
Seleccione Abrir el panel de control de Apache Flink. El panel se abre en una nueva página.
-
En la lista Trabajos en ejecución, elija el único trabajo que pueda ver.
nota
Si se configuran las propiedades de Runtime o se editan las políticas de IAM de forma incorrecta, el estado de la solicitud podría cambiar a
Running, pero el panel de control de Flink muestra que el trabajo se reinicia continuamente. Este es un escenario de error común si la aplicación está mal configurada o carece de permisos para acceder a los recursos externos.Cuando esto suceda, consulte la pestaña Excepciones en el panel de control de Flink para ver la causa del problema.
Observación de las métricas de la aplicación en ejecución
En la página MyApplication, en la sección de métricas de Amazon CloudWatch, se pueden ver algunas de las métricas fundamentales de la aplicación en ejecución.
Visualización de las métricas
-
Junto al botón Actualizar, seleccione 10 segundos en la lista desplegable.
-
Cuando la aplicación está en ejecución y en buen estado, se puede ver que la métrica de tiempo de actividad aumenta continuamente.
-
La métrica fullrestarts debe ser cero. Si aumenta, es posible que la configuración tenga problemas. Para investigar el problema, consulte la pestaña Excepciones del panel de control de Flink.
-
La métrica Número de puntos de control fallidos debe ser cero en una aplicación en buen estado.
nota
En este panel se muestra un conjunto fijo de métricas con una granularidad de 5 minutos. Se puede crear un panel de aplicaciones personalizado con cualquier métrica del panel de CloudWatch.
Observación de los datos de salida en los flujos de Kinesis
Asegúrese de seguir publicando datos en la entrada, ya sea mediante el script de Python o el generador de datos de Kinesis.
Ahora se puede observar el resultado de la aplicación que se ejecuta en Managed Service para Apache Flink mediante el visor de datos de https://console.aws.amazon.com/kinesis/
Visualización del resultado
Abra la consola de Kinesis en https://console.aws.amazon.com/kinesis
. -
Compruebe que la región es la misma que la que está utilizando para ejecutar este tutorial. Por defecto, es us-east-1 Este de EE. UU. (Norte de Virginia). De ser necesario, cambie la región.
-
Elija Flujos de datos.
-
Seleccione el flujo que desea observar. Para este tutorial, escriba
ExampleOutputStream. -
Seleccione la pestaña Visor de datos.
-
Seleccione cualquier partición, mantenga el valor Último como Posición inicial y, a continuación, elija Obtener registros. Es posible que aparezca el error “no se ha encontrado ningún registro para esta solicitud”. Si es así, seleccione Volver a intentar obtener los registros. Se muestran los registros más recientes publicados en el flujo.
-
Seleccione el valor en la columna Datos para inspeccionar el contenido del registro en formato JSON.
Detener la aplicación
Para detener la aplicación, vaya a la página de la consola de la aplicación de Managed Service para Apache Flink denominada MyApplication.
Cómo detener la aplicación
-
En la lista desplegable Acciones, seleccione Detener.
-
El estado en los detalles de la aplicación cambia de
RunningaStopping, y después aReadycuando se ha detenido totalmente la aplicación.nota
No olvide que también debe dejar de enviar datos al flujo de entrada desde el script de Python o el generador de datos de Kinesis.