

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Ejemplos y tutoriales para cuadernos de Studio en Managed Service para Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [Tutorial: creación de un cuaderno de Studio en Managed Service para Apache Flink](example-notebook.md)
+ [Tutorial: implementación de un cuaderno de Studio como un servicio administrado para una aplicación Apache Flink con estado duradero.](example-notebook-deploy.md)
+ [Visualización de ejemplos de consultas para analizar datos en un cuaderno de Studio](how-zeppelin-sql-examples.md)

# Tutorial: creación de un cuaderno de Studio en Managed Service para Apache Flink
<a name="example-notebook"></a>

El siguiente tutorial muestra cómo crear un cuaderno de Studio que lea datos de un clúster de Flujo de Datos de Kinesis o Amazon MSK.

**Topics**
+ [Cumplimiento de los requisitos previos de](#example-notebook-setup)
+ [Cree una base de datos AWS Glue](#example-notebook-glue)
+ [Próximos pasos: creación de un cuaderno de Studio con Kinesis Data Streams o Amazon MSK](#examples-notebook-nextsteps)
+ [Creación de un cuaderno de Studio con Kinesis Data Streams](example-notebook-streams.md)
+ [Creación de un cuaderno de Studio con Amazon MSK](example-notebook-msk.md)
+ [Limpieza de su aplicación y recursos dependientes](example-notebook-cleanup.md)

## Cumplimiento de los requisitos previos de
<a name="example-notebook-setup"></a>

Asegúrese de que la suya AWS CLI es la versión 2 o posterior. Para instalar la versión más reciente AWS CLI, consulte [Instalación, actualización y desinstalación de la AWS CLI versión 2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html).

## Cree una base de datos AWS Glue
<a name="example-notebook-glue"></a>

Su cuaderno de Studio utiliza una base de datos [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) para los metadatos sobre su origen de datos de Amazon MSK.

**Crear una AWS Glue base de datos**

1. Abra la AWS Glue consola en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Elija **Agregar una base de datos**. En la ventana **Añadir base de datos**, introduzca **default** en el **nombre de la base de datos**. Seleccione **Crear**. 

## Próximos pasos: creación de un cuaderno de Studio con Kinesis Data Streams o Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Con este tutorial, puede crear un cuaderno de Studio que utilice flujos de datos de Kinesis o Amazon MSK:
+ [Creación de un cuaderno de Studio con Kinesis Data Streams](example-notebook-streams.md): con flujos de datos de Kinesis, puede crear rápidamente una aplicación que utilice un flujo de datos de Kinesis como origen. Solo necesita crear un flujo de datos de Kinesis como recurso dependiente.
+ [Creación de un cuaderno de Studio con Amazon MSK](example-notebook-msk.md): con Amazon MSK, cree una aplicación que utiliza un clúster de Amazon MSK como origen. Debe crear una Amazon VPC, una instancia de cliente de Amazon EC2 y un clúster de Amazon MSK como recursos dependientes.

# Creación de un cuaderno de Studio con Kinesis Data Streams
<a name="example-notebook-streams"></a>

En este tutorial se describe cómo crear un cuaderno de Studio que utilice un flujo de datos de Kinesis como fuente.

**Topics**
+ [Cumplimiento de los requisitos previos de](#example-notebook-streams-setup)
+ [Cree una tabla AWS Glue](#example-notebook-streams-glue)
+ [Creación de un cuaderno de Studio con Kinesis Data Streams](#example-notebook-streams-create)
+ [Envío de datos a su flujo de datos de Kinesis](#example-notebook-streams-send)
+ [Prueba de su cuaderno de Studio](#example-notebook-streams-test)

## Cumplimiento de los requisitos previos de
<a name="example-notebook-streams-setup"></a>

Antes de crear un cuaderno de Studio, cree un flujo de datos de Kinesis (`ExampleInputStream`). Su aplicación utiliza estos flujos para el origen de la aplicación.

Puede crear este flujo mediante la consola de Amazon Kinesis o el siguiente comando de la AWS CLI . Para obtener instrucciones sobre la consola, consulte [Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*. Asigne el nombre **ExampleInputStream** al flujo y establezca el **número de particiones abiertas** en **1**.

Para crear la transmisión (`ExampleInputStream`) mediante el AWS CLI, utilice el siguiente comando de Amazon Kinesis `create-stream` AWS CLI .

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Cree una tabla AWS Glue
<a name="example-notebook-streams-glue"></a>

Su cuaderno de Studio utiliza una [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) base de datos para los metadatos sobre su origen de datos de Kinesis Data Streams.

**nota**  
Puede crear primero la base de datos manualmente o dejar que Managed Service para Apache Flink la cree por usted al crear el cuaderno. De forma similar, puede crear la tabla manualmente tal y como se describe en esta sección o puede usar el código conector de creación de tablas para Managed Service para Apache Flink en su cuaderno en Apache Zeppelin para crear la tabla mediante una instrucción DDL. A continuación, puede registrarse AWS Glue para asegurarse de que la tabla se ha creado correctamente.

**Crear una tabla**

1. Inicie sesión en Consola de administración de AWS y abra la AWS Glue consola en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si aún no tiene una AWS Glue base de datos, elija **Bases de datos** en la barra de navegación izquierda. Elija **Agregar una base de datos**. En la ventana **Añadir base de datos**, introduzca **default** en el **nombre de la base de datos**. Seleccione **Crear**.

1. En la barra de navegación izquierda, seleccione **Tablas**. En la página **Tablas**, seleccione **Añadir tablas** y **Añadir tabla manualmente**.

1. En la página **Configurar las propiedades de la tabla**, introduzca **stock** como **Nombre de la tabla**. Asegúrese de seleccionar la base de datos que creó anteriormente. Elija **Next** (Siguiente).

1. En la página **Añadir un almacén de datos**, elija **Kinesis**. Para el **Nombre del flujo**, introduzca**ExampleInputStream**. Para la **URL de origen de Kinesis**, pulse Intro **https://kinesis.us-east-1.amazonaws.com**. Si copia y pega la **URL de origen de Kinesis**, asegúrese de eliminar los espacios iniciales o finales. Elija **Next** (Siguiente).

1. En la página de **Clasificación**, seleccione **JSON.** Elija **Siguiente**.

1. En la página **Definir un esquema**, elija Añadir columna para añadir una. Añada columnas con las siguientes propiedades:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/example-notebook-streams.html)

   Elija **Next** (Siguiente).

1. En la página siguiente, verifique su configuración y seleccione **Finalizar.**

1. Elija la tabla recién creada de la lista de tablas.

1. Elija **Editar tabla** y añada una propiedad con la clave `managed-flink.proctime` y el valor `proctime`.

1. Seleccione **Aplicar**.

## Creación de un cuaderno de Studio con Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Ahora que ha creado los recursos que utiliza su aplicación, cree su cuaderno de Studio. 

**Topics**
+ [Cree un bloc de notas de Studio con Consola de administración de AWS](#example-notebook-create-streams-console)
+ [Cree un bloc de notas de Studio con AWS CLI](#example-notebook-msk-create-api)

### Cree un bloc de notas de Studio con Consola de administración de AWS
<a name="example-notebook-create-streams-console"></a>

1. ¿Abrir la consola Managed Service for Apache Flink en [ https://console.aws.amazon.com/managed-flink/casa? region=us-east-1\$1/applications/panel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de control. 

1. En la página de **Aplicaciones de Managed Service para Apache Flink**, seleccione la pestaña **Studio**. Seleccione **Crear cuaderno de Studio**.
**nota**  
También puede crear un cuaderno de Studio desde las consolas de Amazon MSK o Kinesis Data Streams seleccionando el clúster de Amazon MSK o la el flujo de datos de Kinesis de entrada y eligiendo **Procesar datos en tiempo real**.

1. En la página **Crear cuaderno de Studio**, proporcione la siguiente información:
   + Introduzca **MyNotebook** como nombre del cuaderno.
   + Elija el **valor predeterminado** para la **Base de datos de Glue de AWS **.

   Seleccione **Crear cuaderno de Studio**.

1. **En la página, selecciona Ejecutar. **MyNotebook**** Espere a que el **Estado** muestre **En ejecución**. Se aplican cargos cuando el cuaderno se está ejecutando.

### Cree un bloc de notas de Studio con AWS CLI
<a name="example-notebook-msk-create-api"></a>

Para crear tu bloc de notas de Studio con el AWS CLI, haz lo siguiente:

1. Verifique su ID de cuenta. Necesitará este valor para crear su aplicación.

1. Cree el rol `arn:aws:iam::AccountID:role/ZeppelinRole` y añada los siguientes permisos al rol creado automáticamente por consola.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Cree un archivo denominado `create.json` con el siguiente contenido. Reemplace los valores de marcador de posición con su información.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Para crear su aplicación, ejecute el siguiente comando:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Una vez completado el comando, verá un resultado que muestra los detalles de su nuevo cuaderno de Studio. A continuación se muestra un ejemplo de la salida.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Para iniciar su aplicación, ejecute el siguiente comando. Sustituya los valores de muestra por su ID de la cuenta.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envío de datos a su flujo de datos de Kinesis
<a name="example-notebook-streams-send"></a>

Para enviar los datos de prueba al flujo de datos de Kinesis, haga lo siguiente:

1. Abra el [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Elija **Crear un usuario de Cognito con**. CloudFormation

1. La CloudFormation consola se abre con la plantilla de Kinesis Data Generator. Elija **Next** (Siguiente).

1. En la página **Especificar detalles de pila**, ingrese el nombre y la contraseña de su usuario de Cognito. Elija **Next** (Siguiente).

1. En la página **Configurar opciones de pila**, elija **Siguiente**.

1. En la página **Revisar un Kinesis-Data-Generator-Cognito usuario**, seleccione la opción **Acepto que AWS CloudFormation podría crear recursos de IAM**. casilla de verificación. Elija **Crear pila**.

1. Espera a que la CloudFormation pila termine de crearse. **Una vez completada la pila, abra la pila **Kinesis-Data-Generator-Cognito-User** en la consola y seleccione la pestaña Salidas. CloudFormation ** **KinesisDataGeneratorUrl**Abre la URL que aparece para el valor de salida.

1. En la página de **Amazon Kinesis Data Generator**, inicie sesión con las credenciales que creó en el paso 4.

1. En la siguiente página , especifique los valores siguientes:     
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/example-notebook-streams.html)

   En **Plantilla de registro**, pegue el siguiente código:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Elija **Enviar datos**.

1. El generador enviará los datos a su flujo de datos de Kinesis. 

   Deje el generador ejecutándose mientras completa la siguiente sección.

## Prueba de su cuaderno de Studio
<a name="example-notebook-streams-test"></a>

En esta sección, utilizará su cuaderno de Studio para consultar datos de su flujo de datos de Kinesis.

1. ¿Abrir la consola Managed Service for Apache Flink en [ https://console.aws.amazon.com/managed-flink/casa? region=us-east-1\$1/applications/panel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de control.

1. En la página de **Aplicaciones de Managed Service para Apache Flink**, seleccione la pestaña **Cuaderno de Studio**. Elija **MyNotebook**.

1. **En la página, selecciona Abrir en Apache Zeppelin. **MyNotebook****

   La interfaz de Apache Zeppelin se abre en una pestaña nueva.

1. En la página **¡Bienvenido a Zeppelin\$1**, elija la **Zeppelin Note**.

1. En la página **Zeppelin Note**, introduzca la siguiente consulta en una nota nueva:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Seleccione el icono de reproducción.

   Al cabo de poco tiempo, el cuaderno muestra los datos de la flujo de datos de Kinesis.

Para abrir el Panel de control de Apache Flink de su aplicación y ver los aspectos operativos, elija **TRABAJO DE FLINK**. Para obtener más información sobre el Panel de control de Flink, consulte [Apache Flink Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) en la Guía para desarrolladores de [Managed Service para Apache Flink](https://docs.aws.amazon.com/).

Para ver más ejemplos de consultas SQL de Flink Streaming, consulte [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) en la [documentación de Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

# Creación de un cuaderno de Studio con Amazon MSK
<a name="example-notebook-msk"></a>

En este tutorial se describe cómo crear un cuaderno de Studio que utilice un clúster de Amazon MSK como fuente.

**Topics**
+ [Configurar un clúster de Amazon MSK](#example-notebook-msk-setup)
+ [Añada una puerta de enlace NAT a su VPC](#example-notebook-msk-nat)
+ [Cree una AWS Glue conexión y una tabla](#example-notebook-msk-glue)
+ [Creación de un cuaderno de Studio con Amazon MSK](#example-notebook-msk-create)
+ [Envío de datos a su clúster de Amazon MSK](#example-notebook-msk-send)
+ [Prueba de su cuaderno de Studio](#example-notebook-msk-test)

## Configurar un clúster de Amazon MSK
<a name="example-notebook-msk-setup"></a>

Para este tutorial, necesita un clúster de Amazon MSK que permita el acceso a texto sin formato. Si aún no ha configurado un clúster de Amazon MSK, siga el tutorial [Cómo empezar a utilizar Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) para crear una VPC de Amazon, un clúster de Amazon MSK, un tema y una instancia de cliente de Amazon. EC2 

Al seguir el tutorial, haga lo siguiente:
+ En el [paso 3: cree un clúster de Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html), en el paso 4, cambie el `ClientBroker` valor de `TLS` a**PLAINTEXT**.

## Añada una puerta de enlace NAT a su VPC
<a name="example-notebook-msk-nat"></a>

Si ha creado un clúster de Amazon MSK siguiendo el tutorial [Cómo empezar a utilizar Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html), o si su Amazon VPC existente aún no tiene una puerta de enlace NAT para sus subredes privadas, debe añadir una puerta de enlace NAT a su Amazon VPC. En el siguiente diagrama se muestra la arquitectura. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/images/vpc_05.png)


Para crear una puerta de enlace NAT para su Amazon VPC, haga lo siguiente:

1. Abra la consola de Amazon VPC en [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. En la barra de navegación izquierda, elija **puertas de enlace NAT**.

1. En la página **puertas de enlace NAT**, seleccione **Crear puerta de enlace NAT**.

1. En la página **Crear puerta de enlace NAT**, especifique los valores siguientes:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/example-notebook-msk.html)

   Elija **Create a NAT Gateway** (Crear una puerta de enlace NAT).

1. En la de navegación izquierda, elija **Tablas de ruteo**.

1. Elija **Create Route Table (Crear tabla de ruteo)**.

1. En la página **Crear tabla de enrutamiento**, proporcione la siguiente información:
   + **Name tag:** **ZeppelinRouteTable**
   + **VPC****: elija su VPC (p. ej., VPC).AWS KafkaTutorial**

   Seleccione **Crear**.

1. En la lista de tablas de rutas, elija. **ZeppelinRouteTable** Elija la pestaña **Rutas** y, a continuación, **Editar rutas**.

1. En la pestaña **Editar rutas**, elija **Añadir rutas**.

1. En ****Para **Destino**, escriba **0.0.0.0/0**. Para **Target**, elija **NAT Gateway**, **ZeppelinGateway**. Elija **Guardar rutas**. Seleccione **Cerrar**.

1. En la página de tablas de rutas, con la **ZeppelinRouteTable**opción seleccionada, elija la pestaña **Asociaciones de subredes**. Elija **Editar asociaciones de subredes.**

1. **En la página **Editar asociaciones de subredes**, elija **AWS KafkaTutorialSubnet2 y AWS KafkaTutorialSubnet 3**.** Seleccione **Guardar**.

## Cree una AWS Glue conexión y una tabla
<a name="example-notebook-msk-glue"></a>

Su cuaderno de Studio utiliza una base de datos [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html) para los metadatos sobre su origen de datos de Amazon MSK. En esta sección, crea una AWS Glue conexión que describe cómo acceder a su clúster de Amazon MSK y una AWS Glue tabla que describe cómo presentar los datos de su fuente de datos a clientes como su bloc de notas Studio. 

**Creación de una conexión**

1. Inicie sesión en Consola de administración de AWS y abra la AWS Glue consola en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Si aún no tiene una AWS Glue base de datos, elija **Bases de datos** en la barra de navegación izquierda. Elija **Agregar una base de datos**. En la ventana **Añadir base de datos**, introduzca **default** en el **nombre de la base de datos**. Seleccione **Crear**.

1. En la barra de navegación de la izquierda, seleccione **Conexiones**. Elija **Añadir conexión**.

1. En la ventana **Añadir conexión**, introduzca los siguientes valores:
   + En **Nombre de conexión**, ingrese **ZeppelinConnection**.
   + En **Tipo de conexión**, elija **Kafka**.
   + En el **caso del servidor bootstrap de Kafka URLs**, proporcione la cadena del agente de arranque de su clúster. Puede obtener los agentes de arranque desde la consola MSK o ingresando el siguiente comando de la CLI:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Desactive la casilla de verificación **Exigir conexión SSL**.

   Elija **Next** (Siguiente).

1. En la página **VPC**, especifique los valores siguientes:
   + **Para **VPC**, elija el nombre de su VPC (por ejemplo, VPC). AWS KafkaTutorial**
   + **Para **Subnet**, elija 2.AWS KafkaTutorialSubnet**
   + Para los **grupos de seguridad**, elija todos los grupos disponibles.

   Elija **Next** (Siguiente).

1. En la página **Propiedades de la conexión** o **Acceso a la conexión**, seleccione **Finalizar**.

**Crear una tabla**
**nota**  
Puede crear la tabla manualmente tal y como se describe en los pasos siguientes, o puede usar el código conector de creación de tablas para Managed Service para Apache Flink en su cuaderno en Apache Zeppelin para crear la tabla mediante una instrucción DDL. A continuación, puede comprobar AWS Glue que la tabla se ha creado correctamente.

1. En la barra de navegación izquierda, seleccione **Tablas**. En la página **Tablas**, seleccione **Añadir tablas** y **Añadir tabla manualmente**.

1. En la página **Configurar las propiedades de la tabla**, introduzca **stock** como **Nombre de la tabla**. Asegúrese de seleccionar la base de datos que creó anteriormente. Elija **Next** (Siguiente).

1. En la página **Añadir almacén de datos**, elija **Kafka**. Para el **nombre del tema**, introduce el nombre del tema (por ejemplo **AWS KafkaTutorialTopic**). En **Conexión**, elija **ZeppelinConnection**.

1. En la página de **Clasificación**, seleccione **JSON.** Elija **Siguiente**.

1. En la página **Definir un esquema**, elija Añadir columna para añadir una. Añada columnas con las siguientes propiedades:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/es_es/managed-flink/latest/java/example-notebook-msk.html)

   Elija **Next** (Siguiente).

1. En la página siguiente, verifique su configuración y seleccione **Finalizar.**

1. Elija la tabla recién creada de la lista de tablas.

1. Elija **Editar tabla** y añada las siguientes propiedades:
   + clave: `managed-flink.proctime`, valor: `proctime`
   + clave: `flink.properties.group.id`, valor: `test-consumer-group`
   + clave: `flink.properties.auto.offset.reset`, valor: `latest`
   + clave: `classification`, valor: `json`

   Sin estos pares clave/valor, se produce un error en el cuaderno Flink. 

1. Seleccione **Aplicar**.

## Creación de un cuaderno de Studio con Amazon MSK
<a name="example-notebook-msk-create"></a>

Ahora que ha creado los recursos que utiliza su aplicación, cree su cuaderno de Studio. 

**Topics**
+ [Cree un cuaderno de Studio con el Consola de administración de AWS](#example-notebook-create-msk-console)
+ [Cree un bloc de notas de Studio con AWS CLI](#example-notebook-msk-create-api)

**nota**  
También puede crear un cuaderno de Studio desde la consola Amazon MSK seleccionando un clúster existente y, a continuación, seleccionando **Procesar datos en tiempo real**.

### Cree un cuaderno de Studio con el Consola de administración de AWS
<a name="example-notebook-create-msk-console"></a>

1. ¿Abrir la consola Managed Service for Apache Flink en [ https://console.aws.amazon.com/managed-flink/casa? region=us-east-1\$1/applications/panel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de control.

1. En la página de **Aplicaciones de Managed Service para Apache Flink**, seleccione la pestaña **Studio**. Seleccione **Crear cuaderno de Studio**.
**nota**  
Para crear un cuaderno de Studio desde las consolas Amazon MSK o Kinesis Data Streams, seleccione el clúster Amazon MSK o el flujo de datos de Kinesis de entrada y, a continuación, elija **Procesar datos en tiempo real**.

1. En la página **Crear cuaderno de Studio**, proporcione la siguiente información:
   + Introduzca **MyNotebook** como **Nombre del cuaderno de Studio.**
   + Elija el **valor predeterminado** para la **Base de datos de Glue de AWS **.

   Seleccione **Crear cuaderno de Studio**.

1. **En la página, seleccione la pestaña Configuración. **MyNotebook**** En la sección **Redes**, elija **Editar**.

1. En la MyNotebook página **Editar red para**, selecciona la **configuración de VPC basada en el clúster de Amazon MSK**. Elija su clúster de Amazon MSK para el **Clúster de Amazon MSK.** Elija **Guardar cambios**.

1. **En la **MyNotebook**página, selecciona Ejecutar.** Espere a que el **Estado** muestre **En ejecución**.

### Cree un bloc de notas de Studio con AWS CLI
<a name="example-notebook-msk-create-api"></a>

Para crear tu bloc de notas de Studio mediante el AWS CLI, haz lo siguiente:

1. Verifique que disponga de la siguiente información. Necesita estos valores para crear su aplicación.
   + Su ID de cuenta de .
   + El ID de subred IDs y grupo de seguridad de la VPC de Amazon que contiene el clúster de Amazon MSK.

1. Cree un archivo denominado `create.json` con el siguiente contenido. Reemplace los valores de marcador de posición con su información.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Para crear su aplicación, ejecute el siguiente comando:

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Una vez completado el comando, debería ver un resultado similar al siguiente, con los detalles de su nuevo cuaderno de Studio:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Para iniciar su aplicación, ejecute el siguiente comando. Sustituya los valores de muestra por su ID de la cuenta.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Envío de datos a su clúster de Amazon MSK
<a name="example-notebook-msk-send"></a>

En esta sección, ejecuta un script de Python en su EC2 cliente de Amazon para enviar datos a su fuente de datos de Amazon MSK.

1. Conéctate con tu EC2 cliente de Amazon.

1. Ejecute los siguientes comandos para instalar la versión 3 de Python, Pip y el paquete Kafka para Python, y confirme las acciones:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Configure AWS CLI en su máquina cliente introduciendo el siguiente comando:

   ```
   aws configure
   ```

   Proporcione las credenciales de su cuenta y **us-east-1** para `region`.

1. Cree un archivo denominado `stock.py` con el siguiente contenido. Sustituya el valor de muestra por la cadena Bootstrap Brokers de su clúster de Amazon MSK y actualice el nombre del tema si su tema no es: **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Ejecute el script con el siguiente comando:

   ```
   $ python3 stock.py
   ```

1. Deje el script en ejecución mientras completa la siguiente sección.

## Prueba de su cuaderno de Studio
<a name="example-notebook-msk-test"></a>

En esta sección, utilizará su cuaderno de Studio para consultar datos de su clúster de Amazon MSK.

1. [¿Abrir la consola Managed Service for Apache Flink en casa? https://console.aws.amazon.com/managed-flink/ region=us-east-1\$1/applications/panel](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard) de control.

1. En la página de **Aplicaciones de Managed Service para Apache Flink**, seleccione la pestaña **Cuaderno de Studio**. Elija **MyNotebook**.

1. **En la página, selecciona Abrir en Apache Zeppelin. **MyNotebook****

   La interfaz de Apache Zeppelin se abre en una pestaña nueva.

1. En la página **¡Bienvenido a Zeppelin\$1**, elija la **nueva nota de Zeppelin**.

1. En la página **Zeppelin Note**, introduzca la siguiente consulta en una nota nueva:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Seleccione el icono de reproducción.

   La aplicación muestra los datos del clúster de Amazon MSK.

Para abrir el Panel de control de Apache Flink de su aplicación y ver los aspectos operativos, elija **TRABAJO DE FLINK**. Para obtener más información sobre el Panel de control de Flink, consulte [Apache Flink Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) en la Guía para desarrolladores de [Managed Service para Apache Flink](https://docs.aws.amazon.com/).

Para ver más ejemplos de consultas SQL de Flink Streaming, consulte [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) en la [documentación de Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

# Limpieza de su aplicación y recursos dependientes
<a name="example-notebook-cleanup"></a>

## Eliminación de su cuaderno de Studio
<a name="example-notebook-cleanup-app"></a>

1. Abra la consola de Managed Service para Apache Flink.

1. Elija **MyNotebook**.

1. Elija **Acciones** y, a continuación, elija **Eliminar**.

## Elimine la AWS Glue base de datos y la conexión
<a name="example-notebook-cleanup-glue"></a>

1. Abra la AWS Glue consola en [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. En la barra de navegación izquierda, elija **Bases de datos**. Marque la casilla de verificación situada junto a **Predeterminado** para seleccionarla. Seleccione **Acción**, **Eliminar base de datos**. Confirme la opción elegida.

1. En la barra de navegación de la izquierda, seleccione **Conexiones**. Marque la casilla de verificación situada junto a ella **ZeppelinConnection**para seleccionarla. Seleccione **Acción**, **Eliminar conexión**. Confirme la opción elegida.

## Eliminación de la política y el rol de IAM
<a name="example-notebook-msk-cleanup-iam"></a>

1. Abra la consola de IAM en [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. En el menú de navegación izquierdo, elija **Roles**.

1. Usa la barra de búsqueda para buscar el **ZeppelinRole**rol.

1. Elige el **ZeppelinRole**rol. Selecciones **Eliminar rol**. Confirme la eliminación.

## Elimine su grupo de CloudWatch registros
<a name="example-notebook-cleanup-cw"></a>

La consola crea un grupo de CloudWatch registros y un flujo de registros para usted cuando crea la aplicación con la consola. No tiene un grupo de registro ni un flujo si creó la aplicación con la AWS CLI.

1. Abra la CloudWatch consola en [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. En la barra de navegación izquierda, elija **Grupos de registro**.

1. Elija el grupo de AWS/KinesisAnalytics/MyNotebook registros**/**.

1. Elija **Acciones**, **Eliminar grupo(s) de registro(s)**. Confirme la eliminación.

## Limpieza de los recursos de flujo de datos de Kinesis
<a name="example-notebook-cleanup-streams"></a>

Para eliminar la transmisión de Kinesis, abra la consola de flujo de datos de Kinesis, seleccione la transmisión de Kinesis y seleccione **Acciones**, **Eliminar**.

## Limpieza de recursos MSK
<a name="example-notebook-cleanup-msk"></a>

Siga los pasos de esta sección si ha creado un clúster de Amazon MSK para este tutorial. Esta sección contiene instrucciones para limpiar la instancia de cliente de Amazon EC2, Amazon VPC y el clúster de Amazon MSK.

### Eliminación de su clúster de Amazon MSK
<a name="example-notebook-msk-cleanup-msk"></a>

Siga los pasos de esta sección si ha creado un clúster de Amazon MSK para este tutorial.

1. ¿Abrir la consola Amazon MSK en [https://console.aws.amazon.com/msk/casa? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Elija **AWS KafkaTutorialCluster**. Elija **Eliminar**. Ingrese **delete** en la ventana que aparece y confirme su selección.

### Terminación de su instancia de cliente
<a name="example-notebook-msk-cleanup-client"></a>

Siga los pasos de esta sección si ha creado una instancia de cliente de Amazon EC2 para este tutorial.

1. Abra la consola de Amazon EC2 en [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. En el panel de navegación izquierdo, seleccione **Instancias**.

1. Selecciona la casilla de verificación situada junto a ella para seleccionarla. **ZeppelinClient**

1. Seleccione **Estado de la instancia** y **Terminar instancia**.

### Eliminación de su VPC de Amazon
<a name="example-notebook-msk-cleanup-vpc"></a>

Siga los pasos de esta sección si ha creado un clúster de Amazon VPC para este tutorial.

1. Abra la consola de Amazon EC2 en [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Seleccione **Interfaces de red** en la barra de navegación izquierda.

1. Escriba los términos de búsqueda en el cuadro de búsqueda y presione “Ingresar”.

1. Seleccione la casilla de verificación del encabezado de la tabla para seleccionar todas las interfaces de red mostradas.

1. Elija **Acciones**, **Desasociar**. En la ventana que aparece, seleccione **Activar** en **Desprendimiento forzado**. Seleccione **Separar** y espere a que todas las interfaces de red alcancen el estado **Disponible**.

1. Seleccione la casilla de verificación del encabezado de la tabla para seleccionar todas las interfaces de red mostradas.

1. Elija **Acciones**, **Eliminar**. Confirme la acción.

1. Abra la consola de Amazon VPC en [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Selecciona **AWS KafkaTutorialVPC**. Elija **Acciones**, **Eliminar VPC**. Ingrese **delete** y confirme la eliminación.

# Tutorial: implementación de un cuaderno de Studio como un servicio administrado para una aplicación Apache Flink con estado duradero.
<a name="example-notebook-deploy"></a>

El siguiente tutorial muestra cómo implementar un cuaderno de Studio como un servicio gestionado para una aplicación Apache Flink con estado duradero.

**Topics**
+ [Cómo completar los requisitos previos de](#example-notebook-durable-setup)
+ [Implemente una aplicación con un estado duradero mediante el Consola de administración de AWS](#example-notebook-deploy-console)
+ [Implemente una aplicación con un estado duradero mediante el AWS CLI](#example-notebook-deploy-cli)

## Cómo completar los requisitos previos de
<a name="example-notebook-durable-setup"></a>

Cree un nuevo cuaderno de Studio siguiendo las [Tutorial: creación de un cuaderno de Studio en Managed Service para Apache Flink](example-notebook.md), utilizando flujo de datos Kinesis o Amazon MSK. Asigne un nombre al cuaderno de Studio `ExampleTestDeploy`.

## Implemente una aplicación con un estado duradero mediante el Consola de administración de AWS
<a name="example-notebook-deploy-console"></a>

1. Agregue una ubicación de bucket S3 en la que desee almacenar el código empaquetado en la **ubicación del código de la aplicación (*opcional***) en la consola. Esto permite seguir los pasos necesarios para implementar y ejecutar la aplicación directamente desde el cuaderno.

1. Añada los permisos necesarios al rol de la aplicación para habilitar el rol que está utilizando para leer y escribir en un bucket de Amazon S3 y para lanzar una aplicación Managed Service para Apache Flink:
   + Amazon S3 FullAccess
   + Amazon gestionó - flinkFullAccess
   + Acceso a sus fuentes, destinos y, según VPCs corresponda. Para obtener más información, consulte [Revisión de los permisos de IAM para cuadernos de Studio](how-zeppelin-iam.md).

1. Consulte el siguiente código de muestra:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Al lanzar esta característica, verá un nuevo menú desplegable en la esquina superior derecha de cada nota de su cuaderno con el nombre del cuaderno. Se puede hacer lo siguiente:
   + Consulte la configuración del cuaderno de Studio en Consola de administración de AWS.
   + Cree su Zeppelin Note y expórtelo a Amazon S3. En ese momento, proporcione un nombre para la aplicación y seleccione **Crear y exportar**. Recibirá una notificación cuando se complete la exportación.
   + Si lo necesita, puede ver y ejecutar cualquier prueba adicional en el ejecutable en Amazon S3.
   + Una vez completada la compilación, podrá implementar su código como una aplicación de streaming de Kinesis con estado duradero y escalado automático.
   + Utilice el menú desplegable y elija **Deploy Zeppelin Note as Kinesis streaming application (Implementar Zeppelin Note como aplicación de streaming de Kinesis)**. Revise el nombre de la aplicación y elija **Implementar mediante AWS consola**.
   + Esto lo llevará a la Consola de administración de AWS página para crear una aplicación de servicio gestionado para Apache Flink. Tenga en cuenta que el nombre de la aplicación, el paralelismo, la ubicación del código, las funciones predeterminadas de Glue DB, VPC (si corresponde) y roles de IAM se han rellenado previamente. Compruebe que los roles de IAM tengan los permisos necesarios para acceder a sus orígenes y destinos. Las instantáneas están habilitadas de forma predeterminada para una administración duradera del estado de las aplicaciones.
   + Elija **Crear aplicación**.
   + Se puede elegir **configurar** y modificar cualquier configuración y seleccionar **Ejecutar** para iniciar la aplicación de streaming.

## Implemente una aplicación con un estado duradero mediante el AWS CLI
<a name="example-notebook-deploy-cli"></a>

Para implementar una aplicación mediante el AWS CLI, debe actualizar su modelo de servicio AWS CLI para usar el que se proporciona con la información de la versión beta 2. Para obtener información acerca de cómo utilizar el modelo de servicio actualizado, consulte [Cumplimiento de los requisitos previos deCómo completar los requisitos previos de](example-notebook.md#example-notebook-setup).

El siguiente ejemplo de código inicia un cuaderno de Studio:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

El siguiente ejemplo de código inicia un cuaderno de Studio:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

El código siguiente devuelve la URL de la página del cuaderno de Apache Zeppelin de una aplicación:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Visualización de ejemplos de consultas para analizar datos en un cuaderno de Studio
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Cree tablas con Amazon MSK/Apache Kafka](#how-zeppelin-examples-creating-tables)
+ [Creación de tablas con Kinesis](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [Consulta en ventana de saltos de tamaño constante](#how-zeppelin-examples-tumbling)
+ [Consulta en ventana deslizante](#how-zeppelin-examples-sliding)
+ [Uso de SQL interactivo](#how-zeppelin-examples-interactive-sql)
+ [Utilice el conector BlackHole SQL](#how-zeppelin-examples-blackhole-connector-sql)
+ [Uso de Scala para generar datos de muestra](#notebook-example-data-generator)
+ [Uso de Scala interactiva](#notebook-example-interactive-scala)
+ [Uso de Python interactivo](#notebook-example-interactive-python)
+ [Uso de una combinación de Python, SQL y Scala interactivos](#notebook-example-interactive-pythonsqlscala)
+ [Uso de flujo de datos de Kinesis entre cuentas](#notebook-example-crossaccount-kds)

Para obtener información sobre la configuración de las consultas SQL de Apache Flink, consulte [Flink on Zeppelin Notebooks for Interactive Data Analysis](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

**Para ver su aplicación en el panel de control de Apache Flink, elija **TRABAJO FLINK** en la página Bloc de notas de Zeppelin de su aplicación.**

Para obtener más información sobre las consultas de ventanas, consulte [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) en la [Documentación de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Para ver más ejemplos de consultas SQL de Apache Flink, consulte [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) en la [Documentación de Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

## Cree tablas con Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Se puede utilizar el conector Amazon MSK Flink con Managed Service para Apache Flink Studio para autenticar su conexión con la autenticación Plaintext, SSL o IAM. Cree sus tablas con las propiedades específicas según sus requisitos.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Se puede combinarlas con otras propiedades en [Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/).

## Creación de tablas con Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

En el siguiente ejemplo, se crea una tabla con Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Para obtener más información acerca de otras propiedades que puede usar, consulte [Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Consulta en ventana de saltos de tamaño constante
<a name="how-zeppelin-examples-tumbling"></a>

La siguiente consulta SQL de Flink Streaming selecciona de la tabla `ZeppelinTopic` el precio más alto de cada intervalo de cinco segundos:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Consulta en ventana deslizante
<a name="how-zeppelin-examples-sliding"></a>

La siguiente consulta SQL de Apache Flink Streaming selecciona de la tabla `ZeppelinTopic` el precio más alto de cada ventana deslizante de cinco segundos:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Uso de SQL interactivo
<a name="how-zeppelin-examples-interactive-sql"></a>

En este ejemplo, se imprime el tiempo máximo del evento y el tiempo de procesamiento y la suma de los valores de la tabla de valores clave. Asegúrese de tener el ejemplo del script de generación de datos de [Uso de Scala para generar datos de muestra](#notebook-example-data-generator) en ejecución. Para probar otras consultas SQL, como el filtrado y las uniones, en su cuaderno de Studio, consulte la documentación de Apache Flink: [Queries](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) en la documentación de Apache Flink.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Utilice el conector BlackHole SQL
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

El conector BlackHole SQL no requiere que cree una transmisión de datos de Kinesis o un clúster de Amazon MSK para probar sus consultas. Para obtener información sobre el conector BlackHole SQL, consulte el [conector BlackHole SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) en la documentación de Apache Flink. En este ejemplo, el catálogo predeterminado es un catálogo en memoria.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Uso de Scala para generar datos de muestra
<a name="notebook-example-data-generator"></a>

En este ejemplo, se utiliza Scala para generar datos de muestra. Se puede utilizar estos datos de ejemplo para probar varias consultas. Utilice la instrucción crear tabla para crear la tabla de valores clave.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Uso de Scala interactiva
<a name="notebook-example-interactive-scala"></a>

Esta es la traducción en Scala del [Uso de SQL interactivo](#how-zeppelin-examples-interactive-sql). Para ver más ejemplos de Scala, consulte la [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) en la documentación de Apache Flink.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Uso de Python interactivo
<a name="notebook-example-interactive-python"></a>

Esta es la traducción en Python del [Uso de SQL interactivo](#how-zeppelin-examples-interactive-sql). Para ver más ejemplos de Python, consulte [Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) en la documentación de Apache Flink. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Uso de una combinación de Python, SQL y Scala interactivos
<a name="notebook-example-interactive-pythonsqlscala"></a>

Se puede utilizar cualquier combinación de SQL, Python y Scala en su cuaderno para el análisis interactivo. En un cuaderno de Studio que vaya a implementar como una aplicación con un estado duradero, puede usar una combinación de SQL y Scala. En este ejemplo, se muestran las secciones que se ignoran y las que se implementan en la aplicación con un estado duradero.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Uso de flujo de datos de Kinesis entre cuentas
<a name="notebook-example-crossaccount-kds"></a>

Para usar un flujo de datos de Kinesis que esté en una cuenta distinta de la cuenta que tiene su cuaderno de Studio, cree un rol de ejecución de servicios en la cuenta en la que se ejecuta el cuaderno de Studio y una política de confianza de roles en la cuenta que tiene el flujo de datos. Utilice `aws.credentials.provider`, `aws.credentials.role.arn`, y `aws.credentials.role.sessionName` en el conector de Kinesis de la instrucción DDL de creación de tabla para crear una tabla con el flujo de datos.

Utilice el siguiente rol de ejecución de servicios para la cuenta de cuadernos de Studio.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Utilice la política `AmazonKinesisFullAccess` y la siguiente política de confianza de roles para la cuenta de flujo de datos.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Utilice el siguiente párrafo para la declaración de creación de la tabla.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```