

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Creación de aplicaciones de Managed Service para Apache Flink con Apache Beam
<a name="how-creating-apps-beam"></a>

**nota**  
**No existe un Apache Flink Runner compatible con Flink 1.20. Para obtener más información, consulte la [compatibilidad de las versiones de Flink](https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility) en la documentación de Apache Beam.** >

Se puede usar el marco [Apache Beam](https://beam.apache.org/) con su aplicación Managed Service para Apache Flink para procesar datos de streaming. Las aplicaciones de Managed Service para Apache Flink que utilizan Apache Beam, utilizan el [ejecutor Apache Flink](https://beam.apache.org/documentation/runners/flink/) para ejecutar las canalizaciones de Beam.

Para obtener un tutorial sobre cómo usar Apache Beam en una aplicación de Managed Service para Apache Flink, consulte [Utilice CloudFormationCreación de una aplicación con Apache Beam](examples-beam.md).

**Topics**
+ [Limitaciones del ejecutor de Apache Flink con Managed Service para Apache Flink](#how-creating-apps-beam-using)
+ [Uso de Apache Beam con Managed Service para Apache Flink](#how-creating-apps-beam-capabilities)
+ [Creación de una aplicación con Apache Beam](examples-beam.md)

## Limitaciones del ejecutor de Apache Flink con Managed Service para Apache Flink
<a name="how-creating-apps-beam-using"></a>

Note lo siguiente sobre el uso del ejecutor de Apache Flink con Managed Service para Apache Flink:
+ Las métricas de Apache Beam no se pueden ver en la consola de Managed Service para Apache Flink.
+ **Apache Beam solo es compatible con el servicio gestionado para las aplicaciones de Apache Flink que utilizan la versión 1.8 o superior de Apache Flink. Apache Beam no es compatible con las aplicaciones de Managed Service para Apache Flink que utilizan la versión 1.6 de Apache Flink.**

## Uso de Apache Beam con Managed Service para Apache Flink
<a name="how-creating-apps-beam-capabilities"></a>

Managed Service para Apache Flink es compatible con las mismas capacidades de Apache Beam que el ejecutor Apache Flink. Para obtener información sobre las características compatibles con el ejecutor Apache Flink, consulte [Beam Compatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/). 

Se recomienda que pruebe su aplicación Apache Flink en el servicio Managed Service para Apache Flink para comprobar que todas las características que su aplicación necesita sean compatibles.

# Creación de una aplicación con Apache Beam
<a name="examples-beam"></a>

En este ejercicio, creará una aplicación de Managed Service para Apache Flink que transforma datos usando [Apache Beam](https://beam.apache.org/). Apache Beam es un modelo de programación para procesar datos de streaming. Para obtener más información sobre el uso de Apache Beam con Managed Service para Apache Flink, consulte [Creación de aplicaciones de Managed Service para Apache Flink con Apache Beam](how-creating-apps-beam.md).

**nota**  
Para configurar los requisitos previos necesarios para este ejercicio, primero complete el ejercicio [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

**Topics**
+ [Creación de recursos dependientes](#examples-beam-resources)
+ [Escritura de registros de muestra en el flujo de entrada](#examples-beam-write)
+ [Descarga y examen del código de la aplicación](#examples-beam-download)
+ [Compilar el código de la aplicación](#examples-beam-compile)
+ [Cargar el código de Java de streaming de Apache Flink](#examples-beam-upload)
+ [Crear y ejecutar la aplicación de Managed Service para Apache Flink](#examples-beam-create-run)
+ [Limpie los recursos AWS](#examples-beam-cleanup)
+ [Siguientes pasos](#examples-beam-nextsteps)

## Creación de recursos dependientes
<a name="examples-beam-resources"></a>

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes: 
+ Dos flujos de datos de Kinesis (`ExampleInputStream` y `ExampleOutputStream`)
+ Un bucket de Amazon S3 para almacenar el código de la aplicación (`ka-app-code-<username>`) 

Se puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:
+ [Creación y actualización de flujos de datos](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*. Asigne un nombre a sus flujos de datos **ExampleInputStream** y **ExampleOutputStream**.
+ [¿Cómo se puede crear un bucket de S3?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) en la *guía de usuario de Amazon Simple Storage Service*. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo, **ka-app-code-*<username>***.

## Escritura de registros de muestra en el flujo de entrada
<a name="examples-beam-write"></a>

En esta sección, se utiliza un script de Python para escribir cadenas asignadas al azar al flujo para que la aplicación realice el procesamiento.

**nota**  
Esta sección requiere [AWS SDK para Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Cree un archivo denominado `ping.py` con el siguiente contenido:

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. Ejecute el script `ping.py`: 

   ```
   $ python ping.py
   ```

   Mantenga el script en ejecución mientras completa el resto del tutorial.

## Descarga y examen del código de la aplicación
<a name="examples-beam-download"></a>

El código de la aplicación Java de este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte [Installing Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). 

1. Clone el repositorio remoto con el siguiente comando:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. Vaya al directorio `amazon-kinesis-data-analytics-java-examples/Beam`.

El código de la aplicación se encuentra en el archivo `BasicBeamStreamingJob.java`. Tenga en cuenta lo siguiente en relación con el código de la aplicación:
+ La aplicación utiliza Apache Beam [ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)para procesar los registros entrantes mediante la invocación de una función de transformación personalizada llamada`PingPongFn`.

  El código para invocar la función `PingPongFn` es el siguiente:

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ Las aplicaciones de Managed Service para Apache Flink que usan Apache Beam requieren los siguientes componentes. Si no incluye estos componentes y versiones en su `pom.xml`, la aplicación carga las versiones incorrectas desde las dependencias del entorno y, dado que las versiones no coinciden, la aplicación se bloquea durante el tiempo de ejecución.

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ La función de `PingPongFn` transformación pasa los datos de entrada al flujo de salida, a menos que los datos de entrada sean **ping**, en cuyo caso emite la cadena **pong\$1n** al flujo de salida. 

  El código de la función de transformación es el siguiente:

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## Compilar el código de la aplicación
<a name="examples-beam-compile"></a>

Para compilar la aplicación, haga lo siguiente:

1. Si aún no lo ha hecho, instale Java y Maven. Para obtener más información, consulte [Cumplimiento de los requisitos previos obligatorios](getting-started.md#setting-up-prerequisites) en el tutorial de [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

1. Compile la aplicación con el siguiente comando: 

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**nota**  
El código fuente proporcionado se basa en bibliotecas de Java 11. 

Al compilar la aplicación, se crea el archivo JAR de la aplicación (`target/basic-beam-app-1.0.jar`).

## Cargar el código de Java de streaming de Apache Flink
<a name="examples-beam-upload"></a>

En esta sección, cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección [Creación de recursos dependientes](#examples-beam-resources).

1. En la consola de Amazon S3, elija el *<username>* bucket **ka-app-code-** y elija **Upload**.

1. En el paso **Seleccionar archivos**, elija **Añadir archivos**. Vaya al archivo `basic-beam-app-1.0.jar` que creó en el paso anterior. 

1. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija **Cargar**.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.

## Crear y ejecutar la aplicación de Managed Service para Apache Flink
<a name="examples-beam-create-run"></a>

Siga estos pasos para crear, configurar, actualizar y ejecutar la aplicación mediante la consola.

### Creación de la aplicación
<a name="examples-beam-create"></a>

1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

1. En el panel de Managed Service para Apache Flink, seleccione **Crear aplicación de análisis**.

1. En la página **Managed Service para Apache Flink: crear aplicación**, proporcione los detalles de la aplicación de la siguiente manera:
   + En **Nombre de la aplicación**, escriba **MyApplication**.
   + En **Tiempo de ejecución**, escriba **Apache Flink**.
**nota**  
Apache Beam no es compatible actualmente con la versión 1.19 o posterior de Apache Flink.
   + Deje el menú desplegable de versión como **Apache Flink versión 1.15**.

1. Para los **permisos de acceso**, seleccione **Crear o actualizar el rol de IAM**. `kinesis-analytics-MyApplication-us-west-2`

1. Elija **Crear aplicación**.

**nota**  
Al crear una aplicación de Managed Service para Apache Flink mediante la consola, tiene la opción de tener un rol de IAM y una política creada para su aplicación. La aplicación utiliza este rol y la política para acceder a los recursos dependientes. Estos recursos de IAM reciben un nombre usando el nombre de la aplicación y la región tal y como se indica a continuación:  
Política: `kinesis-analytics-service-MyApplication-us-west-2`
Rol: `kinesis-analytics-MyApplication-us-west-2`

### Modificar la política de IAM
<a name="get-started-exercise-7-console-iam"></a>

Edite la política de IAM para agregar permisos de acceso a los flujos de datos de Kinesis.

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Elija **Políticas**. Elija la política **`kinesis-analytics-service-MyApplication-us-west-2`** que la consola creó en su nombre en la sección anterior. 

1. En la página **Resumen**, elija **Editar política**. Seleccione la pestaña **JSON**.

1. Añada la sección subrayada de la siguiente política de ejemplo a la política. Sustituya la cuenta de muestra IDs (*012345678901*) por su ID de cuenta.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### Configurar la aplicación
<a name="examples-beam-configure"></a>

1. En la **MyApplication**página, selecciona **Configurar**.

1. En la página **Configurar aplicación**, proporcione la **Ubicación del código**:
   + Para el **bucket de Amazon S3**, introduzca **ka-app-code-*<username>***.
   + En **Ruta al objeto de Amazon S3**, introduzca **basic-beam-app-1.0.jar**.

1. En **Acceso a los recursos de la aplicación**, en **Permisos de acceso**, seleccione **Crear o actualizar el rol de IAM `kinesis-analytics-MyApplication-us-west-2`**.

1. Introduzca lo siguiente:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/examples-beam.html)

1. En **Monitorización**, asegúrese de que el **Nivel de métricas de monitorización** se ha establecido en **Aplicación**.

1. Para el **CloudWatch registro**, active la casilla **Activar**.

1. Elija **Actualizar**.

**nota**  
Si decide habilitar el CloudWatch registro, Managed Service for Apache Flink crea un grupo de registros y un flujo de registros automáticamente. Los nombres de estos recursos son los siguientes:   
Grupo de registro: `/aws/kinesis-analytics/MyApplication`
Flujo de registro: `kinesis-analytics-log-stream`
Este flujo de registro se utiliza para supervisar la aplicación. No es el mismo flujo de registro que utiliza la aplicación para enviar los resultados.

### Ejecución de la aplicación
<a name="examples-beam-run"></a>

Para ver el gráfico de trabajos de Flink, ejecute la aplicación, abra el panel de Apache Flink y elija el trabajo de Flink que desee.

Puede comprobar las métricas del servicio gestionado para Apache Flink en la CloudWatch consola para comprobar que la aplicación funciona. 

## Limpie los recursos AWS
<a name="examples-beam-cleanup"></a>

Esta sección incluye procedimientos para limpiar los AWS recursos creados en el tutorial Tumbling Window.

**Topics**
+ [Eliminación de su aplicación de Managed Service para Apache Flink](#examples-beam-cleanup-app)
+ [Eliminación de sus flujos de datos de Kinesis](#examples-beam-cleanup-stream)
+ [Eliminación del objeto y el bucket de Amazon S3](#examples-beam-cleanup-s3)
+ [Eliminación de sus recursos de IAM](#examples-beam-cleanup-iam)
+ [CloudWatch Elimine sus recursos](#examples-beam-cleanup-cw)

### Eliminación de su aplicación de Managed Service para Apache Flink
<a name="examples-beam-cleanup-app"></a>

1. Inicie sesión en y abra la Consola de administración de AWS consola de Amazon MSF en https://console.aws.amazon.com /flink.

1. en el panel Servicio gestionado para Apache Flink, elija. **MyApplication**

1. En la página de la aplicación, seleccione **Eliminar** y, a continuación, confirme la eliminación.

### Eliminación de sus flujos de datos de Kinesis
<a name="examples-beam-cleanup-stream"></a>

1. [Abra la consola de Kinesis en https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. En el panel Kinesis Data Streams, **ExampleInputStream**elija.

1. En la **ExampleInputStream**página, elija **Eliminar Kinesis Stream** y, a continuación, confirme la eliminación.

1. En la página de **transmisiones de Kinesis**, elija, elija **Acciones **ExampleOutputStream****, elija **Eliminar** y, a continuación, confirme la eliminación.

### Eliminación del objeto y el bucket de Amazon S3
<a name="examples-beam-cleanup-s3"></a>

1. Abra la consola de Amazon S3 en [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Elija el ***<username>*cubo ka-app-code -.**

1. Elija **Eliminar** y luego ingrese el nombre del bucket para confirmar la eliminación.

### Eliminación de sus recursos de IAM
<a name="examples-beam-cleanup-iam"></a>

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En la barra de navegación, seleccione **Políticas**.

1. En el control de filtros, introduzca **kinesis**.

1. Elija la política **kinesis-analytics-service- MyApplication -us-west-2**.

1. Seleccione **Acciones de política** y, a continuación, **Eliminar**.

1. En la barra de navegación, seleccione **Roles**.

1. Elija el rol **kinesis-analytics- MyApplication** -us-west-2.

1. Elija **Eliminar rol** y, a continuación, confirme la eliminación.

### CloudWatch Elimine sus recursos
<a name="examples-beam-cleanup-cw"></a>

1. Abre la CloudWatch consola en [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. En la barra de navegación, elija **Registros**.

1. Elija el grupo de aws/kinesis-analytics/MyApplication registros**/**.

1. Elija **Eliminar grupo de registro** y, a continuación, confirme la eliminación.

## Siguientes pasos
<a name="examples-beam-nextsteps"></a>

Ya que ha creado y ejecutado una aplicación básica de Managed Service para Apache Flink que transforma los datos usando Apache Beam, consulte la siguiente aplicación para encontrar un ejemplo de una solución más avanzada de Managed Service para Apache Flink.
+ **[Taller sobre streaming de Managed Service para Apache Flink](https://streaming-analytics.workshop.aws/beam-on-kda/)**: en este taller, analizamos un ejemplo integral que combina aspectos de transmisión por lotes y streaming en una canalización uniforme de Apache Beam. 