Creación y administración de clústeres de Amazon EMR con Step Functions - AWS Step Functions

Creación y administración de clústeres de Amazon EMR con Step Functions

Aprenda a integrar AWS Step Functions con Amazon EMR utilizando las APIs de integración de servicios proporcionadas por Amazon EMR. Las API de integración de servicios son similares a las API de Amazon EMR correspondientes, con algunas diferencias en los campos que se pasan y en las respuestas que se devuelven.

Para obtener más información sobre la integración con los servicios de AWS en Step Functions, consulte Integración de los servicios de y Cómo pasar parámetros a una API de servicio en Step Functions.

Características principales de la integración optimizada de Amazon EMR
  • La integración del servicio Amazon EMR optimizado tiene un conjunto personalizado de API que agrupa las API de Amazon EMR subyacentes, tal como se describe a continuación. Por ello, difiere considerablemente de la integración del servicio del SDK de Amazon EMR AWS.

  • Se admite el patrón de integración Ejecutar un trabajo (.sync).

Step Functions no finaliza automáticamente un clúster de Amazon EMR si se detiene la ejecución. Si su máquina de estado se detiene antes de que su clúster de Amazon EMR finalice, es posible que el clúster siga ejecutándose indefinidamente y que se acumulen cargos adicionales. Para evitarlo, asegúrese de que cualquier clúster de Amazon EMR que cree finalice correctamente. Para obtener más información, consulte:

nota

A partir de emr-5.28.0, puede especificar el parámetro StepConcurrencyLevel al crear un clúster para permitir que diferentes pasos se ejecuten en paralelo en un único clúster. Puede utilizar los estados Map y Parallel de Step Functions para enviar trabajo en paralelo al clúster.

La disponibilidad de la integración de servicios de Amazon EMR depende de la disponibilidad de las API de Amazon EMR. Consulte la documentación de Amazon EMR para conocer las limitaciones en regiones especiales.

nota

Para su integración con Amazon EMR, Step Functions tiene una frecuencia de sondeo de trabajo codificada de 60 segundos durante los primeros 10 minutos y de 300 segundos después.

API de Amazon EMR optimizadas

La siguiente tabla describe las diferencias entre cada API de integración de servicios de Amazon EMR y las API correspondientes de Amazon EMR.

API de integración de servicios de Amazon EMR API de EMR correspondiente Diferencias
createCluster

Crea y comienza a ejecutar un clúster (flujo de trabajo).

Amazon EMR está vinculado directamente a un tipo de rol de IAM único conocido como rol vinculado a servicio. Para que createCluster y createCluster.sync funcionen, tiene que tener configurados los permisos necesarios para crear el rol vinculado a servicios AWSServiceRoleForEMRCleanup. Para obtener más información al respecto, incluida una instrucción que puede añadir a la política de permisos de IAM, consulte Uso del rol vinculado a servicios para Amazon EMR.

runJobFlow createCluster utiliza la misma sintaxis de solicitud que runJobFlow, a excepción de los siguientes elementos:
  • El campo Instances.KeepJobFlowAliveWhenNoSteps es obligatorio y debe tener el valor booleano TRUE.

  • El campo Steps no está permitido.

  • El campo Instances.InstanceFleets[index].Name se debe proporcionar y debe ser único si la API de conector modifyInstanceFleetByName opcional se utiliza.

  • El campo Instances.InstanceGroups[index].Name se debe proporcionar y debe ser único si la API modifyInstanceGroupByName opcional se utiliza.

La respuesta es la siguiente:
{ "ClusterId": "string" }
Amazon EMR utiliza lo siguiente:
{ "JobFlowId": "string" }
createCluster.sync

Crea y comienza a ejecutar un clúster (flujo de trabajo).

runJobFlow Lo mismo que createCluster, pero espera a que el clúster alcance el estado WAITING.
setClusterTerminationProtection

Bloquea un clúster (flujo de trabajo) de forma que las instancias de EC2 del clúster no se puedan terminar mediante la intervención del usuario, una llamada a la API o un error del flujo de trabajo.

setTerminationProtection La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR utiliza lo siguiente:
{ "JobFlowIds": ["string"] }
terminateCluster

Cierra un clúster (flujo de trabajo).

terminateJobFlows La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR utiliza lo siguiente:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

Cierra un clúster (flujo de trabajo).

terminateJobFlows Lo mismo que terminateCluster, pero espera a que el clúster finalice.
addStep

Agrega un nuevo paso a un clúster en ejecución.

Si lo desea, también puede especificar el parámetro ExecutionRoleArn mientras se utiliza esta API.

addJobFlowSteps

La solicitud utiliza la clave "ClusterId". Amazon EMR utiliza "JobFlowId". La solicitud utiliza un solo paso.
{ "Step": <"StepConfig object"> }
Amazon EMR utiliza lo siguiente:
{ "Steps": [<StepConfig objects>] }
La respuesta es la siguiente:
{ "StepId": "string" }
Amazon EMR devuelve lo siguiente:
{ "StepIds": [<strings>] }
addStep.sync

Agrega un nuevo paso a un clúster en ejecución.

Si lo desea, también puede especificar el parámetro ExecutionRoleArn mientras se utiliza esta API.

addJobFlowSteps

Lo mismo que addStep, pero espera a que el paso se complete.
cancelStep

Cancela un paso pendiente en un clúster en ejecución.

cancelSteps La solicitud utiliza:
{ "StepId": "string" }
Amazon EMR utiliza lo siguiente:
{ "StepIds": [<strings>] }
La respuesta es la siguiente:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR utiliza lo siguiente:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

Modifica las capacidades de Spot de destino y de destino bajo demanda para la flota de instancias con el especificado InstanceFleetName.

modifyInstanceFleet La solicitud es la misma que para modifyInstanceFleet, excepto por lo siguiente:
  • El campo Instance.InstanceFleetId no está permitido.

  • En el tiempo de ejecución, el InstanceFleetId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceFleets y un análisis del resultado.

modifyInstanceGroupByName

Modifica el número de nodos y las opciones de configuración de un grupo de instancias.

modifyInstanceGroups La solicitud es la siguiente:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR utiliza una lista:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

En el objeto InstanceGroupModifyConfig, el campo InstanceGroupId no está permitido.

Se ha añadido un nuevo campo, InstanceGroupName. En el tiempo de ejecución, el InstanceGroupId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceGroups y un análisis del resultado.

Ejemplo de flujo de trabajo

El código siguiente incluye un estado Task que crea un clúster.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Arguments": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-account-id-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

El código siguiente incluye un estado Task que habilita la protección de la terminación.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Arguments": { "ClusterId": "{% $ClusterId %}", "TerminationProtected": true }, "End": true }

El código siguiente incluye un estado Task que envía un paso a un clúster.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Arguments": { "ClusterId": "{% $ClusterId %}", "ExecutionRoleArn": "arn:aws:iam::account-id:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "TERMINATE_CLUSTER", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://region.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://region.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

El código siguiente incluye un estado Task que cancela un paso.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Arguments": { "ClusterId": "{% $ClusterId %}", "StepId": "{% $AddStepsResult.StepId %}" }, "End": true }

El código siguiente incluye un estado Task que termina un clúster.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Arguments": { "ClusterId": "{% $ClusterId %}", }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para un grupo de instancias.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Arguments": { "ClusterId": "j-account-id3", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para una flota de instancias.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Arguments": { "ClusterId": "j-account-id3", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

Políticas de IAM para llamar a Amazon EMR

En estas plantillas de ejemplo, se muestra cómo AWS Step Functions genera políticas de IAM basadas en los recursos de la definición de la máquina de estado. Para obtener más información, consulte Generación de políticas de IAM para servicios integrados por Steps Functions y Descubrimiento de los patrones de integración de servicios en Step Functions.

addStep

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/clusterId" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::123456789012:role/myRoleName" ] } ] }

setClusterTerminationProtection

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

Recursos estáticos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id" ] } ] }

Recursos dinámicos

{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }