AWS 文档 SDK 示例
将 RunJobFlow 和 AWS SDK 结合使用
以下代码示例演示了如何使用 RunJobFlow。
- Python
-
- 适用于 Python 的 SDK (Boto3)
-
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 def run_job_flow( name, log_uri, keep_alive, applications, job_flow_role, service_role, security_groups, steps, emr_client, ): """ Runs a job flow with the specified steps. A job flow creates a cluster of instances and adds steps to be run on the cluster. Steps added to the cluster are run as soon as the cluster is ready. This example uses the 'emr-5.30.1' release. A list of recent releases can be found here: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html. :param name: The name of the cluster. :param log_uri: The URI where logs are stored. This can be an Amazon S3 bucket URL, such as 's3://my-log-bucket'. :param keep_alive: When True, the cluster is put into a Waiting state after all steps are run. When False, the cluster terminates itself when the step queue is empty. :param applications: The applications to install on each instance in the cluster, such as Hive or Spark. :param job_flow_role: The IAM role assumed by the cluster. :param service_role: The IAM role assumed by the service. :param security_groups: The security groups to assign to the cluster instances. Amazon EMR adds all needed rules to these groups, so they can be empty if you require only the default rules. :param steps: The job flow steps to add to the cluster. These are run in order when the cluster is ready. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly created cluster. """ try: response = emr_client.run_job_flow( Name=name, LogUri=log_uri, ReleaseLabel="emr-5.30.1", Instances={ "MasterInstanceType": "m5.xlarge", "SlaveInstanceType": "m5.xlarge", "InstanceCount": 3, "KeepJobFlowAliveWhenNoSteps": keep_alive, "EmrManagedMasterSecurityGroup": security_groups["manager"].id, "EmrManagedSlaveSecurityGroup": security_groups["worker"].id, }, Steps=[ { "Name": step["name"], "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", step["script_uri"], *step["script_args"], ], }, } for step in steps ], Applications=[{"Name": app} for app in applications], JobFlowRole=job_flow_role.name, ServiceRole=service_role.name, EbsRootVolumeSize=10, VisibleToAllUsers=True, ) cluster_id = response["JobFlowId"] logger.info("Created cluster %s.", cluster_id) except ClientError: logger.exception("Couldn't create cluster.") raise else: return cluster_id-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的 RunJobFlow。
-
ListSteps
TerminateJobFlows