

AWS Data Pipeline 不再提供給新客戶。的現有客戶 AWS Data Pipeline 可以繼續正常使用服務。[進一步了解](https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/)

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 範例
<a name="emrcluster-example"></a>

以下為此物件類型的範例。

**Topics**
+ [使用 hadoopVersion 啟動 Amazon EMR 叢集](emrcluster-example-launch.md)
+ [使用發行標籤 emr-4.x 或更高版本啟動 Amazon EMR 叢集](emrcluster-example-release-label.md)
+ [在 Amazon EMR 叢集上安裝其他軟體](emrcluster-example-install-software.md)
+ [停用 3.x 版本的伺服器端加密](emrcluster-example1-disable-encryption.md)
+ [停用 4.x 版本的伺服器端加密](emrcluster-example2-disable-encryption.md)
+ [設定 Hadoop KMS ACL 並在 HDFS 中建立加密區域](emrcluster-example-hadoop-kms.md)
+ [指定自訂 IAM 角色](emrcluster-example-custom-iam-roles.md)
+ [在適用於 Java 的 AWS 開發套件中使用 EmrCluster 資源](emrcluster-example-java.md)
+ [在私有子網路中設定 Amazon EMR 叢集](emrcluster-example-private-subnet.md)
+ [將 EBS 磁碟區連接到叢集節點](emrcluster-example-ebs.md)

# 使用 hadoopVersion 啟動 Amazon EMR 叢集
<a name="emrcluster-example-launch"></a>

**Example**  <a name="example1"></a>
下列範例使用 AMI 版本 1.0 和 Hadoop 0.20 啟動 Amazon EMR 叢集。  

```
{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
  "hadoopVersion" : "0.20",
  "keyPair" : "my-key-pair",
  "masterInstanceType" : "m3.xlarge",
  "coreInstanceType" : "m3.xlarge",
  "coreInstanceCount" : "10",
  "taskInstanceType" : "m3.xlarge",
  "taskInstanceCount": "10",
  "bootstrapAction" : ["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,arg1,arg2,arg3","s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop/configure-other-stuff,arg1,arg2"]
}
```

# 使用發行標籤 emr-4.x 或更高版本啟動 Amazon EMR 叢集
<a name="emrcluster-example-release-label"></a>

**Example**  
下列範例使用較新的`releaseLabel`欄位啟動 Amazon EMR 叢集：  

```
{
  "id" : "MyEmrCluster",
  "type" : "EmrCluster",
  "keyPair" : "my-key-pair",
  "masterInstanceType" : "m3.xlarge",
  "coreInstanceType" : "m3.xlarge",
  "coreInstanceCount" : "10",
  "taskInstanceType" : "m3.xlarge",
  "taskInstanceCount": "10",
  "releaseLabel": "emr-4.1.0",
  "applications": ["spark", "hive", "pig"],
  "configuration": {"ref":"myConfiguration"}  
}
```

# 在 Amazon EMR 叢集上安裝其他軟體
<a name="emrcluster-example-install-software"></a>

**Example**  <a name="example2"></a>
`EmrCluster` 提供在 Amazon EMR 叢集上安裝第三方軟體`supportedProducts`的欄位，例如，它可讓您安裝自訂 Hadoop 分佈，例如 MapR。它接受要讀取及採取動作的第三方軟體引數逗號分隔清單。以下範例會示範如何使用 `EmrCluster` 的 `supportedProducts` 欄位建立自訂 MapR M3 版本叢集，在其上安裝 Karmasphere Analytics，並在其上執行 `EmrActivity` 物件。  

```
{
    "id": "MyEmrActivity",
    "type": "EmrActivity",
    "schedule": {"ref": "ResourcePeriod"},
    "runsOn": {"ref": "MyEmrCluster"},
    "postStepCommand": "echo Ending job >> /mnt/var/log/stepCommand.txt",    
    "preStepCommand": "echo Starting job > /mnt/var/log/stepCommand.txt",
    "step": "/home/hadoop/contrib/streaming/hadoop-streaming.jar,-input,s3n://elasticmapreduce/samples/wordcount/input,-output, \
     hdfs:///output32113/,-mapper,s3n://elasticmapreduce/samples/wordcount/wordSplitter.py,-reducer,aggregate"
  },
  {    
    "id": "MyEmrCluster",
    "type": "EmrCluster",
    "schedule": {"ref": "ResourcePeriod"},
    "supportedProducts": ["mapr,--edition,m3,--version,1.2,--key1,value1","karmasphere-enterprise-utility"],
    "masterInstanceType": "m3.xlarge",
    "taskInstanceType": "m3.xlarge"
}
```

# 停用 3.x 版本的伺服器端加密
<a name="emrcluster-example1-disable-encryption"></a>

**Example**  <a name="example3"></a>
根據預設， 建立的 Hadoop 2.x 版`EmrCluster`活動 AWS Data Pipeline 會啟用伺服器端加密。若您想要停用伺服器端加密，您必須在叢集物件定義中指定引導操作。  
以下範例會建立停用伺服器端加密的 `EmrCluster` 活動：  

```
{  
   "id":"NoSSEEmrCluster",
   "type":"EmrCluster",
   "hadoopVersion":"2.x",
   "keyPair":"my-key-pair",
   "masterInstanceType":"m3.xlarge",
   "coreInstanceType":"m3.large",
   "coreInstanceCount":"10",
   "taskInstanceType":"m3.large",
   "taskInstanceCount":"10",
   "bootstrapAction":["s3://Region.elasticmapreduce/bootstrap-actions/configure-hadoop,-e, fs.s3.enableServerSideEncryption=false"]
}
```

# 停用 4.x 版本的伺服器端加密
<a name="emrcluster-example2-disable-encryption"></a>

**Example**  <a name="example4"></a>
您必須使用 `EmrConfiguration` 物件停用伺服器端加密。  
以下範例會建立停用伺服器端加密的 `EmrCluster` 活動：  

```
   {
      "name": "ReleaseLabelCluster",
      "releaseLabel": "emr-4.1.0",
      "applications": ["spark", "hive", "pig"],
      "id": "myResourceId",
      "type": "EmrCluster",
      "configuration": {
        "ref": "disableSSE"
      }
    },
    {
      "name": "disableSSE",
      "id": "disableSSE",
      "type": "EmrConfiguration",
      "classification": "emrfs-site",
      "property": [{
        "ref": "enableServerSideEncryption"
      }
      ]
    },
    {
      "name": "enableServerSideEncryption",
      "id": "enableServerSideEncryption",
      "type": "Property",
      "key": "fs.s3.enableServerSideEncryption",
      "value": "false"
    }
```

# 設定 Hadoop KMS ACL 並在 HDFS 中建立加密區域
<a name="emrcluster-example-hadoop-kms"></a>

**Example**  <a name="example5"></a>
下列物件會建立 Hadoop KMS 的 ACL，並在 HDFS 中建立加密區域及對應的加密金鑰：  

```
{
      "name": "kmsAcls",
      "id": "kmsAcls",
      "type": "EmrConfiguration",
      "classification": "hadoop-kms-acls",
      "property": [
        {"ref":"kmsBlacklist"},
        {"ref":"kmsAcl"}
      ]
    },
    {
      "name": "hdfsEncryptionZone",
      "id": "hdfsEncryptionZone",
      "type": "EmrConfiguration",
      "classification": "hdfs-encryption-zones",
      "property": [
        {"ref":"hdfsPath1"},
        {"ref":"hdfsPath2"}
      ]
    },
    {
      "name": "kmsBlacklist",
      "id": "kmsBlacklist",
      "type": "Property",
      "key": "hadoop.kms.blacklist.CREATE",
      "value": "foo,myBannedUser"
    },
    {
      "name": "kmsAcl",
      "id": "kmsAcl",
      "type": "Property",
      "key": "hadoop.kms.acl.ROLLOVER",
      "value": "myAllowedUser"
    },
    {
      "name": "hdfsPath1",
      "id": "hdfsPath1",
      "type": "Property",
      "key": "/myHDFSPath1",
      "value": "path1_key"
    },
    {
      "name": "hdfsPath2",
      "id": "hdfsPath2",
      "type": "Property",
      "key": "/myHDFSPath2",
      "value": "path2_key"
    }
```

# 指定自訂 IAM 角色
<a name="emrcluster-example-custom-iam-roles"></a>

**Example**  <a name="example6"></a>
根據預設， `DataPipelineDefaultRole`會以 Amazon EMR 服務角色和 Amazon EC2 執行個體描述檔`DataPipelineDefaultResourceRole`的形式 AWS Data Pipeline 傳遞，以代表您建立資源。不過，您可以建立自訂 Amazon EMR 服務角色和自訂執行個體描述檔，並改用它們。 AWS Data Pipeline 應該有足夠的許可來使用自訂角色建立叢集，而且您必須新增 AWS Data Pipeline 做為信任的實體。  
下列範例物件指定 Amazon EMR 叢集的自訂角色：  

```
{  
   "id":"MyEmrCluster",
   "type":"EmrCluster",
   "hadoopVersion":"2.x",
   "keyPair":"my-key-pair",
   "masterInstanceType":"m3.xlarge",
   "coreInstanceType":"m3.large",
   "coreInstanceCount":"10",
   "taskInstanceType":"m3.large",
   "taskInstanceCount":"10",
   "role":"emrServiceRole",
   "resourceRole":"emrInstanceProfile"
}
```

# 在適用於 Java 的 AWS 開發套件中使用 EmrCluster 資源
<a name="emrcluster-example-java"></a>

**Example**  <a name="example7"></a>
下列範例示範如何使用 `EmrCluster`和 `EmrActivity` 建立 Amazon EMR 4.x 叢集，以使用 Java 開發套件執行 Spark 步驟：  

```
public class dataPipelineEmr4 {

  public static void main(String[] args) {
    
	AWSCredentials credentials = null;
	credentials = new ProfileCredentialsProvider("/path/to/AwsCredentials.properties","default").getCredentials();
	DataPipelineClient dp = new DataPipelineClient(credentials);
	CreatePipelineRequest createPipeline = new CreatePipelineRequest().withName("EMR4SDK").withUniqueId("unique");
	CreatePipelineResult createPipelineResult = dp.createPipeline(createPipeline);
	String pipelineId = createPipelineResult.getPipelineId();
    
	PipelineObject emrCluster = new PipelineObject()
	    .withName("EmrClusterObj")
	    .withId("EmrClusterObj")
	    .withFields(
			new Field().withKey("releaseLabel").withStringValue("emr-4.1.0"),
			new Field().withKey("coreInstanceCount").withStringValue("3"),
			new Field().withKey("applications").withStringValue("spark"),
			new Field().withKey("applications").withStringValue("Presto-Sandbox"),
			new Field().withKey("type").withStringValue("EmrCluster"),
			new Field().withKey("keyPair").withStringValue("myKeyName"),
			new Field().withKey("masterInstanceType").withStringValue("m3.xlarge"),
			new Field().withKey("coreInstanceType").withStringValue("m3.xlarge")        
			);
  
	PipelineObject emrActivity = new PipelineObject()
	    .withName("EmrActivityObj")
	    .withId("EmrActivityObj")
	    .withFields(
			new Field().withKey("step").withStringValue("command-runner.jar,spark-submit,--executor-memory,1g,--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/lib/spark-examples.jar,10"),
			new Field().withKey("runsOn").withRefValue("EmrClusterObj"),
			new Field().withKey("type").withStringValue("EmrActivity")
			);
      
	PipelineObject schedule = new PipelineObject()
	    .withName("Every 15 Minutes")
	    .withId("DefaultSchedule")
	    .withFields(
			new Field().withKey("type").withStringValue("Schedule"),
			new Field().withKey("period").withStringValue("15 Minutes"),
			new Field().withKey("startAt").withStringValue("FIRST_ACTIVATION_DATE_TIME")
			);
      
	PipelineObject defaultObject = new PipelineObject()
	    .withName("Default")
	    .withId("Default")
	    .withFields(
			new Field().withKey("failureAndRerunMode").withStringValue("CASCADE"),
			new Field().withKey("schedule").withRefValue("DefaultSchedule"),
			new Field().withKey("resourceRole").withStringValue("DataPipelineDefaultResourceRole"),
			new Field().withKey("role").withStringValue("DataPipelineDefaultRole"),
			new Field().withKey("pipelineLogUri").withStringValue("s3://myLogUri"),
			new Field().withKey("scheduleType").withStringValue("cron")
			);     
      
	List<PipelineObject> pipelineObjects = new ArrayList<PipelineObject>();
    
	pipelineObjects.add(emrActivity);
	pipelineObjects.add(emrCluster);
	pipelineObjects.add(defaultObject);
	pipelineObjects.add(schedule);
    
	PutPipelineDefinitionRequest putPipelineDefintion = new PutPipelineDefinitionRequest()
	    .withPipelineId(pipelineId)
	    .withPipelineObjects(pipelineObjects);
    
	PutPipelineDefinitionResult putPipelineResult = dp.putPipelineDefinition(putPipelineDefintion);
	System.out.println(putPipelineResult);
    
	ActivatePipelineRequest activatePipelineReq = new ActivatePipelineRequest()
	    .withPipelineId(pipelineId);
	ActivatePipelineResult activatePipelineRes = dp.activatePipeline(activatePipelineReq);
	
      System.out.println(activatePipelineRes);
      System.out.println(pipelineId);
    
    }

}
```

# 在私有子網路中設定 Amazon EMR 叢集
<a name="emrcluster-example-private-subnet"></a>

**Example**  <a name="example8"></a>
此範例包含組態，該組態會在 VPC 內的私有子網路中啟動叢集。如需詳細資訊，請參閱《[Amazon EMR 管理指南》中的在 VPC 中啟動 Amazon EMR 叢集](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-vpc-launching-job-flows.html)。 **此組態為選擇性。您可以在任何使用 `EmrCluster` 物件的管道中使用它。  
若要在私有子網路中啟動 Amazon EMR 叢集，請在`EmrCluster`組態`serviceAccessSecurityGroupId`中指定 `SubnetId`、`emrManagedSlaveSecurityGroupId`、 `emrManagedMasterSecurityGroupId`和 。  

```
{
  "objects": [
    {
      "output": {
        "ref": "S3BackupLocation"
      },
      "input": {
        "ref": "DDBSourceTable"
      },
      "maximumRetries": "2",
      "name": "TableBackupActivity",
      "step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}",
      "id": "TableBackupActivity",
      "runsOn": {
        "ref": "EmrClusterForBackup"
      },
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "false"
    },
    {
      "readThroughputPercent": "#{myDDBReadThroughputRatio}",
      "name": "DDBSourceTable",
      "id": "DDBSourceTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3BackupLocation",
      "id": "S3BackupLocation",
      "type": "S3DataNode"
    },
    {
      "name": "EmrClusterForBackup",
      "coreInstanceCount": "1",
      "taskInstanceCount": "1",
      "taskInstanceType": "m4.xlarge",
      "coreInstanceType": "m4.xlarge",
      "releaseLabel": "emr-4.7.0",
      "masterInstanceType": "m4.xlarge",
      "id": "EmrClusterForBackup",
      "subnetId": "#{mySubnetId}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "emrManagedSlaveSecurityGroupId": "#{mySlaveSecurityGroup}",
      "serviceAccessSecurityGroupId": "#{myServiceAccessSecurityGroup}",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster",
      "keyPair": "user-key-pair"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{myPipelineLogUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Source DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "0.25",
      "watermark": "Enter value between 0.1-1.0",
      "description": "DynamoDB read throughput ratio",
      "id": "myDDBReadThroughputRatio",
      "type": "Double"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
     "myDDBRegion": "us-east-1",
      "myDDBTableName": "ddb_table",
      "myDDBReadThroughputRatio": "0.25",
      "myOutputS3Loc": "s3://s3_path",
      "mySubnetId": "subnet_id",
      "myServiceAccessSecurityGroup":  "service access security group",
      "mySlaveSecurityGroup": "slave security group",
      "myMasterSecurityGroup": "master security group",
      "myPipelineLogUri": "s3://s3_path"
  }
}
```

# 將 EBS 磁碟區連接到叢集節點
<a name="emrcluster-example-ebs"></a>

**Example**  <a name="example8"></a>
您可以將 EBS 磁碟區連接到您管道中 EMR 叢集內任何類型的節點。若要將 EBS 磁碟區連接到節點，請在您的 `EmrCluster` 組態中使用 `coreEbsConfiguration`、`masterEbsConfiguration` 和 `TaskEbsConfiguration`。  
此 Amazon EMR 叢集範例會將 Amazon EBS 磁碟區用於其主節點、任務和核心節點。如需詳細資訊，請參閱《[Amazon EMR 管理指南》中的 Amazon EMR 中的 Amazon EBS 磁碟區](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-storage.html)。 **  
這些組態都是選擇性的。您可以在任何使用 `EmrCluster` 物件的管道中使用他們。  
在管道中，按一下 `EmrCluster` 物件組態，選擇 **Master EBS Configuration (主要 EBS 組態)**、**Core EBS Configuration (核心 EBS 組態)** 或 **Task EBS Configuration (任務 EBS 組態)**，然後輸入與以下範例相似的組態詳細資訊。  

```
{
  "objects": [
    {
      "output": {
        "ref": "S3BackupLocation"
      },
      "input": {
        "ref": "DDBSourceTable"
      },
      "maximumRetries": "2",
      "name": "TableBackupActivity",
      "step": "s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport,#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}",
      "id": "TableBackupActivity",
      "runsOn": {
        "ref": "EmrClusterForBackup"
      },
      "type": "EmrActivity",
      "resizeClusterBeforeRunning": "false"
    },
    {
      "readThroughputPercent": "#{myDDBReadThroughputRatio}",
      "name": "DDBSourceTable",
      "id": "DDBSourceTable",
      "type": "DynamoDBDataNode",
      "tableName": "#{myDDBTableName}"
    },
    {
      "directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}",
      "name": "S3BackupLocation",
      "id": "S3BackupLocation",
      "type": "S3DataNode"
    },
    {
      "name": "EmrClusterForBackup",
      "coreInstanceCount": "1",
      "taskInstanceCount": "1",
      "taskInstanceType": "m4.xlarge",
      "coreInstanceType": "m4.xlarge",
      "releaseLabel": "emr-4.7.0",
      "masterInstanceType": "m4.xlarge",
      "id": "EmrClusterForBackup",
      "subnetId": "#{mySubnetId}",
      "emrManagedMasterSecurityGroupId": "#{myMasterSecurityGroup}",
      "emrManagedSlaveSecurityGroupId": "#{mySlaveSecurityGroup}",
      "region": "#{myDDBRegion}",
      "type": "EmrCluster",
      "coreEbsConfiguration": {
        "ref": "EBSConfiguration"
      },
      "masterEbsConfiguration": {
        "ref": "EBSConfiguration"
      },
      "taskEbsConfiguration": {
        "ref": "EBSConfiguration"
      },
      "keyPair": "user-key-pair"
    },
    {
       "name": "EBSConfiguration",
        "id": "EBSConfiguration",
        "ebsOptimized": "true",
        "ebsBlockDeviceConfig" : [
            { "ref": "EbsBlockDeviceConfig" }
        ],
        "type": "EbsConfiguration"
    },
    {
        "name": "EbsBlockDeviceConfig",
        "id": "EbsBlockDeviceConfig",
        "type": "EbsBlockDeviceConfig",
        "volumesPerInstance" : "2",
        "volumeSpecification" : {
            "ref": "VolumeSpecification"
        }
    },
    {
      "name": "VolumeSpecification",
      "id": "VolumeSpecification",
      "type": "VolumeSpecification",
      "sizeInGB": "500",
      "volumeType": "io1",
      "iops": "1000"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{myPipelineLogUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "myOutputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "Source DynamoDB table name",
      "id": "myDDBTableName",
      "type": "String"
    },
    {
      "default": "0.25",
      "watermark": "Enter value between 0.1-1.0",
      "description": "DynamoDB read throughput ratio",
      "id": "myDDBReadThroughputRatio",
      "type": "Double"
    },
    {
      "default": "us-east-1",
      "watermark": "us-east-1",
      "description": "Region of the DynamoDB table",
      "id": "myDDBRegion",
      "type": "String"
    }
  ],
  "values": {
     "myDDBRegion": "us-east-1",
      "myDDBTableName": "ddb_table",
      "myDDBReadThroughputRatio": "0.25",
      "myOutputS3Loc": "s3://s3_path",
      "mySubnetId": "subnet_id",
      "mySlaveSecurityGroup": "slave security group",
      "myMasterSecurityGroup": "master security group",
      "myPipelineLogUri": "s3://s3_path"
  }
}
```