

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Trabajar con trabajos de Flink en Amazon EMR
<a name="flink-jobs"></a>

Hay varias formas de interactuar con Flink en Amazon EMR: a través de la consola, la interfaz de Flink que se encuentra en la interfaz de usuario de seguimiento y en ResourceManager la línea de comandos. Puede enviar un archivo JAR a una aplicación de Flink con cualquiera de estas opciones. Una vez enviado un archivo JAR, se convierte en un trabajo gestionado por Flink. JobManager JobManager Se encuentra en el nodo YARN que aloja el daemon Application Master de la sesión de Flink.

Puede ejecutar una aplicación de Flink como trabajo de YARN en un clúster de ejecución prolongada o como clúster transitorio. En un clúster de ejecución prolongada, puede enviar varios trabajos de Flink a un clúster de Flink que se ejecuta en Amazon EMR. Si ejecuta un trabajo de Flink como clúster transitorio, su clúster de Amazon EMR solo existe durante el tiempo que se tarda en ejecutar la aplicación de Flink, por lo que solo se le cobrará por los recursos y el tiempo utilizado. Puede enviar un trabajo de Flink con la operación de la API Amazon `AddSteps` EMR, como argumento de paso de `RunJobFlow` la operación y mediante AWS CLI `add-steps` los `create-cluster` comandos o.

## Inicio de una aplicación de YARN de Flink como un paso en un clúster de ejecución prolongada
<a name="flink-add-step"></a>

Para iniciar una aplicación de Flink a la que varios clientes puedan enviar trabajos mediante las operaciones de la API de YARN, debe crear un clúster o agregar una aplicación de Flink a un clúster existente. Para obtener instrucciones sobre cómo crear un clúster nuevo, consulte [Creación de un clúster con Flink](flink-create-cluster.md). Para iniciar una sesión de YARN en un clúster existente, utilice los siguientes pasos desde la consola, la AWS CLI o el SDK de Java.

**nota**  
El comando `flink-yarn-session` se agregó en Amazon EMR versión 5.5.0 como envoltorio para que el script `yarn-session.sh` simplifique la ejecución. Si utiliza una versión anterior de Amazon EMR, sustituya `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` por **Argumentos** en la consola o `Args` en el comando AWS CLI .

**Para enviar un trabajo de Flink en un clúster existente desde la consola**

Envíe la sesión de Flink de con el comando `flink-yarn-session` en un clúster existente.

1. [Abra la consola Amazon EMR en https://console.aws.amazon.com /emr.](https://console.aws.amazon.com/emr/)

1. En la lista de clústeres, seleccione el clúster que lanzó con anterioridad.

1. En la página de detalles del clúster, elija **Steps (Pasos)**, **Add Step (Añadir paso)**.

1. Utilice las directrices que siguen para introducir los parámetros y, a continuación, elija **Agregar**.  
****    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/es_es/emr/latest/ReleaseGuide/flink-jobs.html)

**Para enviar un trabajo de Flink en un clúster existente con el AWS CLI**
+ Utilice el comando `add-steps` para agregar un trabajo de Flink a un clúster de ejecución prolongada. El siguiente comando de ejemplo especifica `Args="flink-yarn-session", "-d"` para iniciar una sesión de Flink dentro del clúster de YARN en un estado desconectado (`-d`). Consulte [Configuración de YARN](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/yarn/) en la documentación de Flink más reciente para conocer los detalles de argumentos.

  ```
  aws emr add-steps --cluster-id {{<j-XXXXXXXX>}} --steps Type=CUSTOM_JAR,Name={{<example-flink-step-name>}},Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Envío del trabajo a una aplicación de Flink existente en un clúster de ejecución prolongada
<a name="flink-submit-work"></a>

Si ya tiene una aplicación de Flink existente en un clúster de ejecución prolongada, puede especificar el ID de la aplicación de Flink del clúster para enviar el trabajo. Para obtener el ID de la aplicación, ejecute `yarn application -list` la operación de [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API AWS CLI o a través de ella:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

El ID de la aplicación para esta sesión de Flink es`application_1473169569237_0002`, que puedes usar para enviar trabajos a la aplicación desde el SDK AWS CLI o desde un SDK.

**Example SDK para Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("{{myClusterId}}")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id {{<j-XXXXXXXX>}} \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region {{<region-code>}}
```

## Enviar un trabajo de Flink transitorio
<a name="flink-transient-job"></a>

El segundo ejemplo lanza un clúster transitorio que ejecuta un trabajo de Flink y, a continuación, termina al completarse.

**Example SDK para Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Utilice el subcomando `create-cluster` para crear un clúster transitorio que termina cuando se completa el trabajo de Flink:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName={{<YourKeyName>}},InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```