

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过流式处理来处理数据
<a name="UseCase_Streaming"></a>

Hadoop 流媒体是 Hadoop 附带的一个实用程序，它使您能够使用 Java 以外的语言开发可 MapReduce执行文件。流式处理是以 JAR 文件的形式实现的，这样您就可以像运行标准 JAR 文件一样，从 Amazon EMR API 或命令行运行它。

此部分介绍如何结合使用 Streaming 与 Amazon EMR。

**注意**  
Apache Hadoop Streaming 是一种独立工具。因此，这里并不介绍其所有函数和参数。[有关 Hadoop 直播的更多信息，请访问 http://hadoop.apache。 org/docs/stable/hadoop-streaming/HadoopStreaming.html。](http://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html)

## 使用 Hadoop Streaming 实用工具
<a name="HadoopStreamCommands"></a>

此部分介绍如何使用 Hadoop 的 Streaming 实用工具。


**Hadoop 进程**  

|  |  | 
| --- |--- |
| 1 |  以您所选择的编程语言编写映射器和 Reducer 可执行文件。 按照 Hadoop 的文档中的指示编写流式处理可执行文件。该等程序应从标准输入读取其输入内容，并通过标准输出来输出数据。默认情况下，输入/输出的每一行都代表一条记录，并且每一行中的第一个制表符都用作密钥与值之间的分隔符。  | 
| 2 |  在本地测试您的可执行文件，并将它们上载到 Amazon S3。  | 
| 3 |  使用 Amazon EMR 命令行界面或 Amazon EMR 控制台可运行您的应用程序。  | 

每个映射器脚本都会以单独进程的形式在集群中启动。每个 Reducer 可执行文件都会通过任务流程将映射器可执行文件的输出转到数据输出中。

大多数 Streaming 应用程序都需要 `input`、`output`、`mapper` 和 `reducer` 参数。下表描述了上述参数和其它可选参数。


| 参数 | 说明 | 必填 | 
| --- | --- | --- | 
| -input |  输入数据在 Amazon S3 上的位置。 类型：字符串 默认值：无 约束：URI。如果没有指定协议，那么它就可以使用集群的默认文件系统。  | 是 | 
| -output |  Amazon S3 上的位置，该位置为 Amazon EMR 上载已处理数据的地方。 类型：字符串 默认值：无 约束：URI 默认值：如果没有指定位置，那么 `input` 会将数据上载至 Amazon EMR 指定的位置。  | 是 | 
| -mapper |  映射器可执行文件的名称。 类型：字符串 默认值：无  | 是 | 
| -reducer |  Reducer 可执行文件的名称。 类型：字符串 默认值：无  | 是 | 
| -cacheFile |  一个 Amazon S3 位置，其中包含一些文件可供 Hadoop 复制到本地工作目录（主要目的是提高性能）。 类型：字符串 默认值：无 约束：[URI]\$1[要在工作目录中创建的符号链接名称]   | 否 | 
| -cacheArchive |  提取到工作目录的 JAR 文件。 类型：字符串 默认值：无 约束：[URI]\$1[要在工作目录中创建的符号链接目录名称]   | 否 | 
| -combiner |  合并结果 类型：字符串 默认值：无 约束：Java 类名  | 否 | 

以下示例代码是写入 Python 的映射器可执行文件。此脚本是 WordCount 示例应用程序的一部分。

```
 1. #!/usr/bin/python
 2. import sys
 3. 
 4. def main(argv):
 5.   line = sys.stdin.readline()
 6.   try:
 7.     while line:
 8.       line = line.rstrip()
 9.       words = line.split()
10.       for word in words:
11.         print "LongValueSum:" + word + "\t" + "1"
12.       line = sys.stdin.readline()
13.   except "end of file":
14.     return None
15. if __name__ == "__main__":
16.   main(sys.argv)
```

# 提交流式处理步骤
<a name="CLI_CreateStreaming"></a>

本节介绍向集群提交流式处理步骤的基本知识。Streaming 应用程序会从标准输入读取输入内容，然后针对每个输入运行脚本或可执行文件（称为映射器）。每个输入的结果都会保存在本地，通常位于 Hadoop Distributed File System（HDFS）分区上。所有输入经过映射器处理后，第二个脚本或可执行文件 (名为 Reducer) 会处理映射器结果。将 Reducer 的结果发送到标准输出。您可以将一系列 Streaming 步骤串联起来，让一个步骤的输出作为另一个步骤的输入。

映射器和 Reducer 都能够以文件的形式进行引用，或者您也可以提供一个 Java 类。您能够以任一种受支持的语言 (包括 Ruby、Perl、Python、PHP 或 Bash) 来执行映射器和 Reducer。

## 使用控制台提交流式处理步骤
<a name="emr-dev-create-stream-console"></a>

此示例介绍如何使用 Amazon EMR 控制台向正在运行的集群提交流式处理步骤。

**提交流式处理步骤**

1. [在 /emr 上打开亚马逊 EMR 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/emr/)

1. 在 **Cluster List (集群列表)** 中，选择您的集群的名称。

1. 滚动到 **Steps (步骤)** 部分并展开它，然后选择 **Add step (添加步骤)**。

1. 在 **Add Step (添加步骤)** 对话框中：
   + 对于 **Step type (步骤类型)**，选择 **Streaming program (流式程序)**。
   + 对于 **Name (名称)**，请接受默认名称（流式程序）或键入新名称。
   + 对于**映射器**，键入或浏览到 Hadoop 中映射器类所在的位置或映射器可执行文件（如 Python 程序）所在的 S3 存储桶。路径值必须采用*BucketName*/*path*/的形式*MapperExecutable*。
   + 对于 **Reducer**，键入或浏览到 Hadoop 中 Reducer 类所在的位置或 Reducer 可执行文件（如 Python 程序）所在的 S3 存储桶。路径值必须采用*BucketName*/*path*/的形式*MapperExecutable*。Amazon EMR 支持特殊 *aggregate* 关键字。有关更多信息，请转到 Hadoop 提供的 Aggregate 库。
   + 对于 **Input S3 location (输入 S3 位置)**，键入或浏览到输入数据的位置。
   + 对于 **Output S3 location (输出 S3 位置)**，键入或浏览到您的 Amazon S3 输出存储桶的名称。
   + 对于 **Arguments (参数)**，将该字段保留为空白。
   + 对于 **Action on failure (出现故障时的操作)**，接受默认选项 **Continue (继续)**。

1. 选择**添加**。步骤会出现在控制台中，其状态为“Pending”。

1. 步骤的状态会随着步骤的运行从“Pending”变为“Running”，再变为“Completed”。要更新状态，请选择 Actions (操作) 列上方的 **Refresh (刷新)** 图标。

## AWS CLI
<a name="emr-dev-create-stream-cli"></a>

这些示例演示了 AWS CLI 如何使用创建集群和提交流式处理步骤。

**要创建集群并提交流式处理步骤，请使用 AWS CLI**
+ 要使用创建集群并提交流式处理步骤 AWS CLI，请键入以下命令并*myKey*替换为您的 EC2 key pair 的名称。请注意，`--files` 的实际参数应该是指向您脚本位置的 Amazon S3 路径，并且 `-mapper` 和 `-reducer` 的实际参数应该是各自脚本文件的名称。

  ```
  aws emr create-cluster --name "Test cluster" --release-label emr-7.12.0 --applications Name=Hue Name=Hive Name=Pig --use-default-roles \
  --ec2-attributes KeyName=myKey --instance-type m5.xlarge --instance-count 3 \
  --steps Type=STREAMING,Name="Streaming Program",ActionOnFailure=CONTINUE,Args=[--files,pathtoscripts,-mapper,mapperscript,-reducer,reducerscript,aggregate,-input,pathtoinputdata,-output,pathtooutputbucket]
  ```
**注意**  
为了便于读取，包含 Linux 行继续符（\$1）。它们可以通过 Linux 命令删除或使用。对于 Windows，请将它们删除或替换为脱字号 (^)。

  如果不使用 `--instance-groups` 参数指定实例计数，则将启动单个主节点，其余实例将作为核心节点启动。所有节点都使用该命令中指定的实例类型。
**注意**  
如果您之前未创建默认 Amazon EMR 服务角色和 EC2 实例配置文件，请先键入 aws `emr create-default-roles` 创建它们，然后再键入 `create-cluster` 子命令。

  有关在中使用 Amazon EMR 命令的更多信息 AWS CLI，请参阅。[https://docs.aws.amazon.com/cli/latest/reference/emr](https://docs.aws.amazon.com/cli/latest/reference/emr)