

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Trabajar con trabajos de Flink de Zeppelin en Amazon EMR
<a name="flink-zeppelin"></a>

## Introducción
<a name="flink-zeppelin-intro"></a>

Las versiones 6.10.0 y posteriores de Amazon EMR admiten la integración de [Apache Zeppelin](emr-zeppelin.md) con Apache Flink. Puede enviar trabajos de Flink de forma interactiva a través de los cuadernos de Zeppelin. Con el intérprete de Flink, puede ejecutar consultas de Flink, definir los trabajos de streaming y lotes de Flink y visualizar el resultado en los cuadernos de Zeppelin. El intérprete de Flink se basa en la API de REST de Flink. Esto le permite acceder a los trabajos de Flink y manipularlos desde el entorno de Zeppelin para procesar y analizar los datos en tiempo real.

Hay cuatro subintérpretes en el intérprete de Flink. Tienen diferentes propósitos, pero todos están en la JVM y comparten los mismos puntos de entrada a Flink preconfigurados (`ExecutionEnviroment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment`). Los intérpretes son los siguientes:
+ `%flink`: crea `ExecutionEnvironment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment` y proporciona un entorno de Scala.
+ `%flink.pyflink`: proporciona un entorno de Python.
+ `%flink.ssql`: proporciona un entorno de SQL de streaming.
+ `%flink.bsql`: proporciona un entorno de SQL de lotes.

## Requisitos previos
<a name="flink-zeppelin-prerequisites"></a>
+ La integración de Zeppelin con Flink es compatible con los clústeres creados con la versión 6.10.0 y posteriores de Amazon EMR.
+ Para ver las interfaces web que están alojadas en los clústeres de EMR como se requiere en estos pasos, debe configurar un túnel de SSH para permitir el acceso entrante. Para obtener más información, consulte [Configurar ajustes de proxy para ver sitios web alojados en el nodo principal](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-proxy.html).

## Configuración de Zeppelin-Flink en un clúster de EMR
<a name="flink-zeppelin-configure"></a>

Siga los siguientes pasos para configurar Apache Flink en Apache Zeppelin para que se ejecute en un clúster de EMR:

1. Cree un clúster nuevo desde la consola de Amazon EMR. Seleccione emr-6.10.0 o una versión posterior para la versión de Amazon EMR. A continuación, elija personalizar el paquete de aplicaciones con la opción Personalizado. Incluya al menos Flink, Hadoop y Zeppelin en su paquete.  
![\[En la consola de Amazon EMR, personalice el paquete de aplicaciones con la opción Personalizado. Incluye al menos Flink, Hadoop y Zeppelin en su paquete\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-console.png)

1. Cree el resto del clúster con la configuración que prefiera.

1. Una vez que el clúster esté en ejecución, selecciónelo en la consola para ver sus detalles y abrir la pestaña Aplicaciones. Seleccione Zeppelin en la sección Interfaces de usuario de aplicaciones para abrir la interfaz web de Zeppelin. Asegúrese de haber configurado el acceso a la interfaz web de Zeppelin con un túnel de SSH al nodo principal y una conexión de proxy, tal y como se describe en [Requisitos previos](#flink-zeppelin-prerequisites).  
![\[En la interfaz web de Zeppelin, puede importar y crear nuevos cuadernos.\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/welcome-to-zeppelin.png)

1. Ahora puede crear una nota nueva en un cuaderno de Zeppelin con Flink como intérprete predeterminado.  
![\[Puede crear una nota nueva en un cuaderno de Zeppelin con Flink como intérprete predeterminado.\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-create-notebook.png)

1. Consulte los siguientes ejemplos de código que muestran cómo ejecutar trabajos de Flink desde un cuaderno de Zeppelin.

## Ejecución de trabajos de Flink con Zeppelin-Flink en un clúster de EMR
<a name="flink-zeppelin-run-jobs"></a>
+ Ejemplo 1: Flink Scala

  a) WordCount Ejemplo de lote (SCALA)

  ```
  %flink
  
  val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
               .map(w => (w, 1))
               .groupBy(0)
               .sum(1)
               .print()
  ```

  b) WordCount Ejemplo de transmisión (SCALA)

  ```
  %flink
  
  val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
    .map(w => (w, 1))
    .keyBy(0)
    .sum(1)
    .print
  
  senv.execute()
  ```  
![\[Por ejemplo, puede ejecutar WordCount trabajos por lotes WordCount y en streaming desde un portátil Zeppelin.\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/streaming-wordcount-example.png)
+ Ejemplo 2: SQL en streaming de Flink

  ```
  %flink.ssql
  SET 'sql-client.execution.result-mode' = 'tableau';
  SET 'table.dml-sync' = 'true';
  SET 'execution.runtime-mode' = 'streaming';
  
  create table dummy_table (
    id int,
    data string
  ) with (
    'connector' = 'filesystem',
    'path' = 's3://s3-bucket/dummy_table',
    'format' = 'csv'
  );
  
  INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
  
  SELECT * FROM dummy_table;
  ```  
![\[En este ejemplo se muestra cómo ejecutar un trabajo de SQL en streaming de Flink.\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/flink-streaming-sql.png)
+ Ejemplo 3: Pyflink. Tenga en cuenta que debe cargar su propio archivo de texto de muestra `word.txt` con un nombre en su bucket de S3.

  ```
  %flink.pyflink
  
  import argparse
  import logging
  import sys
  
  from pyflink.common import Row
  from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                             DataTypes, FormatDescriptor)
  from pyflink.table.expressions import lit, col
  from pyflink.table.udf import udtf
  
  def word_count(input_path, output_path):
      t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
      # write all the data to one file
      t_env.get_config().set("parallelism.default", "1")
  
      # define the source
      if input_path is not None:
          t_env.create_temporary_table(
              'source',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .build())
                             .option('path', input_path)
                             .format('csv')
                             .build())
          tab = t_env.from_path('source')
      else:
          print("Executing word_count example with default input data set.")
          print("Use --input to specify file input.")
          tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                    DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
  
      # define the sink
      if output_path is not None:
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .option('path', output_path)
                             .format(FormatDescriptor.for_format('canal-json')
                                     .build())
                             .build())
      else:
          print("Printing result to stdout. Use --output to specify output path.")
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('print')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .build())
  
      @udtf(result_types=[DataTypes.STRING()])
      def split(line: Row):
          for s in line[0].split():
              yield Row(s)
  
      # compute word count
      tab.flat_map(split).alias('word') \
         .group_by(col('word')) \
         .select(col('word'), lit(1).count) \
         .execute_insert('sink') \
         .wait()
  
  
  logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  
  
  word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  ```

1. Elija **FLINK JOB** en la interfaz de usuario de Zeppelin para acceder a la interfaz de usuario web de Flink y verla.  
![\[Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/batch-wordcount-example.png)

1. Al elegir **FLINK JOB**, se accede a la consola web de Flink en otra pestaña del navegador.  
![\[Al elegir FLINK JOB (TRABAJO DE FLINK) abre la consola web de Flink en otra pestaña del navegador.\]](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/images/flink-web-console.png)