

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 回应来自亚马逊 EMR CloudWatch 的事件
<a name="emr-events-response"></a>

[本节介绍如何响应 Amazon EMR 作为事件消息发出的CloudWatch 可操作事件。](emr-manage-cloudwatch-events.md)响应事件的方式包括创建规则、设置警报和其他响应。以下部分包括程序的链接以及对常见事件的建议响应。

**Topics**
+ [使用为 Amazon EMR 事件创建规则 CloudWatch](emr-events-cloudwatch-console.md)
+ [根据 Amazon EM CloudWatch R 中的指标设置警报](UsingEMR_ViewingMetrics_Alarm.md)
+ [响应 Amazon EMR 集群实例容量不足事件](emr-events-response-insuff-capacity.md)
+ [响应 Amazon EMR 集群实例集调整大小超时事件](emr-events-response-timeout-events.md)

# 使用为 Amazon EMR 事件创建规则 CloudWatch
<a name="emr-events-cloudwatch-console"></a>

Amazon EMR 会自动将事件发送到 CloudWatch 事件流。您可以根据特定的模式创建匹配事件的规则，以便将事件路由到目标来执行操作，如发送电子邮件通知。针对事件的 JSON 对象匹配模式。有关亚马逊 EMR 事件详情的更多信息，请参阅《亚马逊活动用户指南》[中的*亚马 CloudWatch *逊 EMR](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#emr_event_type) 事件。

有关设置 CloudWatch 事件规则的信息，请参阅[创建在事件上触发的 CloudWatch 规则](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/Create-CloudWatch-Events-Rule.html)。

# 根据 Amazon EM CloudWatch R 中的指标设置警报
<a name="UsingEMR_ViewingMetrics_Alarm"></a>

亚马逊 EMR 将指标推送给亚马逊。 CloudWatch作为响应，您可以使用 CloudWatch 对您的 Amazon EMR 指标设置警报。例如，您可以在中配置警报， CloudWatch 以便在 HDFS 利用率上升到 80% 以上时随时向您发送电子邮件。有关详细说明，请参阅《*Amazon CloudWatch 用户指南》*中的[创建或编辑 CloudWatch 警报](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ConsoleAlarms.html)。

# 响应 Amazon EMR 集群实例容量不足事件
<a name="emr-events-response-insuff-capacity"></a>

## 概述
<a name="emr-events-response-insuff-capacity-overview"></a>

当所选可用区没有足够的容量来满足您的集群启动或调整大小请求时，Amazon EMR 集群会返回事件代码 `EC2 provisioning - Insufficient Instance Capacity`。如果 Amazon EMR 反复遇到容量不足异常，并且无法满足您的集群启动或集群调整大小操作的预置请求，则对于实例组和实例集该事件都会定期触发。

本页介绍在 EMR 集群发生此类事件时，如何最好地响应此类事件。

## 对容量不足事件的建议响应
<a name="emr-events-response-insuff-capacity-rec"></a>

我们建议您通过以下方式之一来应对容量不足事件：
+ 等待容量恢复。容量经常变化，因此容量不足的异常可以自行恢复。只要 Amazon EC2 容量可用，您的集群就会开始或完成大小调整。
+ 或者，您可以终止集群，修改实例类型配置，然后使用更新的集群配置请求创建新集群。有关更多信息，请参阅 [Amazon EMR 集群的可用区灵活性](emr-flexibility.md)。

您还可以设置对容量不足事件的规则或自动响应，如下一节所述。

## 从容量不足事件中自动恢复
<a name="emr-events-response-insuff-capacity-ex"></a>

您可以构建自动化以响应 Amazon EMR 事件，例如带有事件代码 `EC2 provisioning - Insufficient Instance Capacity` 的事件。例如，以下 AWS Lambda 函数终止具有使用按需实例的实例组的 EMR 集群，然后创建一个新的 EMR 集群，其实例组包含的实例类型与原始请求不同。

以下条件会触发自动流程的发生：
+ 主节点或核心节点的容量不足事件已持续超过 20 分钟。
+ 集群未处于**就绪**或**等待**状态。有关 EMR 集群状态的更多信息，请参阅 [了解集群的生命周期](emr-overview.md#emr-overview-cluster-lifecycle)。

**注意**  
在为容量不足异常建立自动化流程时，应考虑容量不足事件是可以恢复的。容量经常发生变化，只要 Amazon EC2 容量可用，您的集群就会恢复调整大小或开始操作。

**Example 响应容量不足事件的功能**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR

import json
import boto3
import datetime
from datetime import timezone

INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning"
INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = (
    "EC2 provisioning - Insufficient Instance Capacity"
)
ALLOWED_INSTANCE_TYPES_TO_USE = [
    "m5.xlarge",
    "c5.xlarge",
    "m5.4xlarge",
    "m5.2xlarge",
    "t3.xlarge",
]
CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"]
CLUSTER_START_SLA = 20

CLIENT = boto3.client("emr", region_name="us-east-1")

# checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity'
def is_insufficient_capacity_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE
        )


# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceGroupType could be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]
    clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][
        "CreationDateTime"
    ]
    clusterState = describeClusterResponse["Cluster"]["Status"]["State"]

    now = datetime.datetime.now()
    now = now.replace(tzinfo=timezone.utc)
    isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta(
        minutes=CLUSTER_START_SLA
    )

    # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER),
    # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING
    if (
        (instanceGroupType == "CORE" or instanceGroupType == "MASTER")
        and isClusterStartSlaBreached
        and clusterState not in CLUSTER_START_ACCEPTABLE_STATES
    ):
        return True
    else:
        return False


# Choose item from the list except the exempt value
def choice_excluding(exempt):
    for i in ALLOWED_INSTANCE_TYPES_TO_USE:
        if i != exempt:
            return i


# Create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceGroupType cloud be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]

    # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"

    # Select new instance types to include in the new createCluster request
    instanceTypeForMaster = (
        instanceTypesFromOriginalRequestMaster
        if instanceGroupType != "MASTER"
        else choice_excluding(instanceTypesFromOriginalRequestMaster)
    )
    instanceTypeForCore = (
        instanceTypesFromOriginalRequestCore
        if instanceGroupType != "CORE"
        else choice_excluding(instanceTypesFromOriginalRequestCore)
    )

    print("Starting to create cluster...")
    instances = {
        "InstanceGroups": [
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForMaster,
                "Market": "ON_DEMAND",
                "Name": "Master",
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForCore,
                "Market": "ON_DEMAND",
                "Name": "Core",
            },
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )

    return response["JobFlowId"]


# Terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")


def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response


def lambda_handler(event, context):
    if is_insufficient_capacity_event(event):
        print(
            "Received insufficient capacity event for instanceGroup, clusterId: "
            + event["detail"]["clusterId"]
        )

        describeClusterResponse = describe_cluster(event)

        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)

            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )

    else:
        print("Received event is not insufficient capacity event, skipping")
```

# 响应 Amazon EMR 集群实例集调整大小超时事件
<a name="emr-events-response-timeout-events"></a>

## 概述
<a name="emr-events-response-timeout-events-overview"></a>

Amazon EMR 集群在对实例集集群执行调整大小操作时会发出[事件](emr-manage-cloudwatch-events.md#emr-cloudwatch-instance-fleet-resize-events)。超时到期后，当 Amazon EMR 停止为实例集预置竞价型或按需容量时，就会发出预置超时事件。用户可以将超时持续时间配置为实例集[调整大小规范](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html)的一部分。在对相同实例集连续调整大小的情况下，当前调整大小操作的超时到期时，Amazon EMR 会发出 `Spot provisioning timeout - continuing resize` 或 `On-Demand provisioning timeout - continuing resize` 事件。然后，它开始为队列的下一次调整大小操作预置容量。

## 响应实例集调整大小超时事件
<a name="emr-events-response-timeout-events-rec"></a>

我们建议您通过以下方式之一来响应预置超时事件：
+ 重访[调整大小规范](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html)，然后重试调整大小操作。由于容量频繁变化，只要 Amazon EC2 容量可用，您的集群就会成功调整大小。我们建议客户为要求更严格的任务配置较低的超时持续时间值SLAs。
+ 或者，您可以：
  + 根据[实例和可用区灵活性的最佳实践](emr-flexibility.md#emr-flexibility-types)启动具有多种实例类型的新集群，或
  + 启动具有按需容量的集群
+ 对于配置超时：继续调整事件大小，您还可以等待调整大小操作的处理。Amazon EMR 将继续按顺序处理针对实例触发的调整大小操作，同时遵守配置的调整大小规范。

您还可以设置对该事件的规则或自动响应，如下一节所述。

## 自动从预置超时事件中恢复
<a name="emr-events-response-timeout-events-ex"></a>

您可以使用 `Spot Provisioning timeout` 事件代码构建自动化以响应 Amazon EMR 事件。例如，以下 AWS Lambda 函数关闭了具有使用竞价型实例作为任务节点的实例集的 EMR 集群，然后创建一个新的 EMR 集群，其实例集包含的实例类型比原始请求更加多样化。在此示例中，为任务节点发出的 `Spot Provisioning timeout` 事件将触发 Lambda 函数的执行。

**Example 响应 `Spot Provisioning timeout` 事件的示例函数**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR
 
import json
import boto3
import datetime
from datetime import timezone
 
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize"
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = (
    "Spot Provisioning timeout"
)
 
CLIENT = boto3.client("emr", region_name="us-east-1")
 
# checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout'
def is_spot_provisioning_timeout_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE
        )
 
 
# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceFleetType could be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # Check if instance fleet receiving Spot provisioning timeout event is TASK
    if (instanceFleetType == "TASK"):
        return True
    else:
        return False
 
 
# create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceFleetType cloud be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"
   
    # select new instance types to include in the new createCluster request
    instanceTypesForTask = [
        "m5.xlarge",
        "m5.2xlarge",
        "m5.4xlarge",
        "m5.8xlarge",
        "m5.12xlarge"
    ]
    
    print("Starting to create cluster...")
    instances = {
        "InstanceFleets": [
            {
                "InstanceFleetType":"MASTER",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestMaster,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"CORE",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestCore,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"TASK",
                "TargetOnDemandCapacity":0,
                "TargetSpotCapacity":100,
                "LaunchSpecifications":{},
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesForTask[0],
                        "WeightedCapacity":1,
                    },
                    {
                        'InstanceType': instanceTypesForTask[1],
                        "WeightedCapacity":2,
                    },
                    {
                        'InstanceType': instanceTypesForTask[2],
                        "WeightedCapacity":4,
                    },
                    {
                        'InstanceType': instanceTypesForTask[3],
                        "WeightedCapacity":8,
                    },
                    {
                        'InstanceType': instanceTypesForTask[4],
                        "WeightedCapacity":12,
                    }
                ],
                "ResizeSpecifications": {
                    "SpotResizeSpecification": {
                        "TimeoutDurationMinutes": 30
                    }
                }
            }
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )
 
    return response["JobFlowId"]
 
 
# terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")
 
 
def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response
 
 
def lambda_handler(event, context):
    if is_spot_provisioning_timeout_event(event):
        print(
            "Received spot provisioning timeout event for instanceFleet, clusterId: "
            + event["detail"]["clusterId"]
        )
 
        describeClusterResponse = describe_cluster(event)
 
        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)
 
            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )
 
    else:
        print("Received event is not spot provisioning timeout event, skipping")
```