使用命令行启动集群 - AWS Data Pipeline

AWS Data Pipeline 现已不再向新客户提供。AWS Data Pipeline 的现有客户可以继续正常使用该服务。了解更多

使用命令行启动集群

如果您定期运行 Amazon EMR 集群来分析 Web 日志或科学数据,则可以使用 AWS Data Pipeline 管理您的 Amazon EMR 集群。利用 AWS Data Pipeline,您可以指定先决条件(必须先满足这些先决条件,然后才能启动集群;例如,确保将今天的数据上传到 Amazon S3。) 本教程将引导您完成启动集群的过程,该集群可以是基于 Amazon EMR 的简单管道的模型,也可以是更相关的管道的一部分。

先决条件

必须先完成以下步骤,然后才能使用 CLI:

  1. 安装和配置命令行界面(CLI)。有关更多信息,请参阅 访问 AWS Data Pipeline

  2. 确保名为 DataPipelineDefaultRoleDataPipelineDefaultResourceRole 的 IAM 角色存在。AWS Data Pipeline 控制台会自动为您创建这些角色。如果您一次也没有使用过 AWS Data Pipeline 控制台,则必须手动创建这些角色。有关更多信息,请参阅 适用于 AWS Data Pipeline 的 IAM 角色

创建管道定义文件

以下代码是简单 Amazon EMR 集群的管道定义文件,该集群运行由 Amazon EMR 提供的现有 Hadoop 流式处理作业。该示例应用程序称作 WordCount,也可使用 Amazon EMR 控制台运行它。

将此代码复制到一个文本文件中并将其另存为 MyEmrPipelineDefinition.json。您应将 Amazon S3 存储桶位置替换为您拥有的 Amazon S3 存储桶的名称。您还应替换开始日期和结束日期。要立即启动集群,请将 startDateTime 设置为过去某天的日期,并将 endDateTime 设置为将来某天的日期。随后,AWS Data Pipeline 立即开始启动“过期”集群,以尝试解决它所认为的工作积压。此回填意味着,您无需等待一个小时即会看到 AWS Data Pipeline 启动其第一个集群。

{ "objects": [ { "id": "Hourly", "type": "Schedule", "startDateTime": "2012-11-19T07:48:00", "endDateTime": "2012-11-21T07:48:00", "period": "1 hours" }, { "id": "MyCluster", "type": "EmrCluster", "masterInstanceType": "m1.small", "schedule": { "ref": "Hourly" } }, { "id": "MyEmrActivity", "type": "EmrActivity", "schedule": { "ref": "Hourly" }, "runsOn": { "ref": "MyCluster" }, "step": "/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,s3n://elasticmapreduce/samples/wordcount/input,-output,s3://myawsbucket/wordcount/output/#{@scheduledStartTime},-mapper,s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate" } ] }

此管道有三个对象:

  • Hourly,表示工作的计划。您可以将计划设置为活动上的字段之一。在执行此操作时,该活动将根据计划运行或 (在此示例中) 每小时运行一次。

  • MyCluster,表示用于运行集群的 Amazon EC2 实例组。您可以指定要作为集群运行的 EC2 实例的大小和数目。如果您不指定实例数,则集群在启动时有两个节点:一个主节点和一个任务节点。您可以指定要在其中启动集群的子网。您可以向集群添加其他配置,例如,用于将其他软件加载到由 Amazon EMR 提供的 AMI 上的引导操作。

  • MyEmrActivity,表示要使用集群处理的计算。Amazon EMR 支持多种类型的集群,包括流式处理、级联和脚本化 Hive。runsOn 字段重新引用 MyCluster,并将它用作集群基础的规范。

上传并激活管道定义

您必须上传您的管道定义并激活您的管道。在以下示例命令中,将 pipeline_name 替换为管道的标签,将 pipeline_file 替换为管道定义 .json 文件的完全限定路径。

AWS CLI

要创建管道定义并激活管道,请使用以下 create-pipeline 命令。记下您的管道 ID,因为您将在大多数 CLI 命令中使用这个值。

aws datapipeline create-pipeline --name pipeline_name --unique-id token { "pipelineId": "df-00627471SOVYZEXAMPLE" }

使用以下 put-pipeline-definition 命令更新管道定义。

aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json

如果您的管道成功验证,则 validationErrors 字段为空。您应该查看所有警告。

要激活管道,请使用以下 activate-pipeline 命令。

aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE

您可以使用以下 list-pipelines 命令来验证您的管道是否出现在管道列表中。

aws datapipeline list-pipelines

监控管道运行

您可以使用 Amazon EMR 控制台查看由 AWS Data Pipeline 启动的集群,也可以使用 Amazon S3 控制台查看输出文件夹。

查看由 AWS Data Pipeline 启动的集群的进度
  1. 打开 Amazon EMR 控制台。

  2. 由 AWS Data Pipeline 生成的集群具有如下格式的名称:<pipeline-identifier>_@<emr-cluster-name>_<launch-time>

    Elastic MapReduce 集群 list showing three running clusters with unique identifiers.
  3. 在某个运行完成后,请打开 Amazon S3 控制台并检查带时间戳的输出文件夹是否存在并且包含集群的预期结果。

    Amazon S3 console showing folders with timestamp names in the wordcount directory.