

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 管道概述
<a name="pipelines-overview"></a>

Amazon SageMaker AI 管道是定向无环图 (DAG) 中一系列相互关联的步骤，这些步骤是使用 drag-and-drop UI 或 Pipelin [es SDK](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html) 定义的。您还可以使用[管道定义 JSON 模式](https://aws-sagemaker-mlops.github.io/sagemaker-model-building-pipeline-definition-JSON-schema/)构建管道。此 DAG JSON 定义提供了管道中每个步骤的要求和关系信息。管道 DAG 的结构由步骤之间的数据依赖关系决定。当一个步骤的输出属性作为输入传递给另一个步骤时，就会产生这些数据依赖关系。下图是管道 DAG 的示例：

![\[\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/pipeline-full.png)


**示例 DAG 包括以下步骤：**

1. `AbaloneProcess` 是[处理](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)步骤的一个实例，在用于训练的数据上运行预处理脚本。例如，脚本可以填充缺失值，对数值数据进行规范化处理，或将数据分成训练数据集、验证数据集和测试数据集。

1. `AbaloneTrain` 是[训练](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training)步骤的一个实例，用于配置超参数，并根据预处理的输入数据训练模型。

1. `AbaloneEval` 是[处理](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing)步骤的另一个实例，用于评估模型的准确性。本步骤展示了一个数据依赖的例子 - 本步骤使用 `AbaloneProcess` 的测试数据集输出。

1. `AbaloneMSECond`是 Condit [i](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) on 步骤的一个实例，在本例中，该步骤进行检查以确保模型评估的 mean-square-error结果低于特定限制。如果模型不符合标准，管道运行就会停止。

1. 管道运行按以下步骤进行：

   1. `AbaloneRegisterModel`，其中 SageMaker AI 调用[RegisterModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-register-model)步骤将模型作为版本控制的模型包组注册到 Amazon SageMaker 模型注册表中。

   1. `AbaloneCreateModel`，其中 SageMaker AI 调用创建模型的[CreateModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-create-model)步骤，为批量转换做准备。在中`AbaloneTransform`， SageMaker AI 调用 “[转换](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform)” 步骤，对您指定的数据集生成模型预测。

以下主题介绍了 Pipelines 的基本概念。有关描述这些概念的实施的教程，请参阅[Pipelines 操作](pipelines-build.md)。

**Topics**
+ [

# 管道结构和执行
](build-and-manage-pipeline.md)
+ [

# IAM 访问管理
](build-and-manage-access.md)
+ [

# 为 Pipelines 设置跨账户支持
](build-and-manage-xaccount.md)
+ [

# Pipeline 参数
](build-and-manage-parameters.md)
+ [

# Pipelines 步骤
](build-and-manage-steps.md)
+ [

# Lift-and-shift 使用 @step 装饰器的 Python 代码
](pipelines-step-decorator.md)
+ [

# 在步骤之间传递数据
](build-and-manage-propertyfile.md)
+ [

# 缓存管道步骤
](pipelines-caching.md)
+ [

# 管道步骤的重试策略
](pipelines-retry-policy.md)
+ [

# 选择性执行管道步骤
](pipelines-selective-ex.md)
+ [

# 基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤
](pipelines-quality-clarify-baseline-lifecycle.md)
+ [

# 安排管道运行
](pipeline-eventbridge.md)
+ [

# 亚马逊 SageMaker 实验集成
](pipelines-experiments.md)
+ [

# 使用本地模式运行管道
](pipelines-local-mode.md)
+ [

# 对亚马逊 SageMaker 管道进行故障排除
](pipelines-troubleshooting.md)

# 管道结构和执行
<a name="build-and-manage-pipeline"></a>

**Topics**
+ [

## 管线结构
](#build-and-manage-pipeline-structure)
+ [

## 使用并行配置执行管道
](#build-and-manage-pipeline-execution)

## 管线结构
<a name="build-and-manage-pipeline-structure"></a>

Amaz SageMaker on Pipelines 实例由`name`、`parameters`、和组成`steps`。`(account, region)` 对中的管道名称必须是唯一的。步骤定义中使用的所有参数都必须在管道中定义。列出的管道步骤会根据彼此之间的数据依赖关系自动确定其执行顺序。管道服务会解析数据依赖 DAG 中各步骤之间的关系，创建一系列执行完成的步骤。以下是管道结构的示例。

**警告**  
通过可视化编辑器或 SageMaker AI Python SDK 构建管道时，请勿在管道参数或任何步骤定义字段（例如环境变量）中包含敏感信息。这些字段在未来通过 `DescribePipeline` 请求返回时将可见。

```
from sagemaker.workflow.pipeline import Pipeline
  
  pipeline_name = f"AbalonePipeline"
  pipeline = Pipeline(
      name=pipeline_name,
      parameters=[
          processing_instance_type, 
          processing_instance_count,
          training_instance_type,
          model_approval_status,
          input_data,
          batch_data,
      ],
      steps=[step_process, step_train, step_eval, step_cond],
  )
```

## 使用并行配置执行管道
<a name="build-and-manage-pipeline-execution"></a>

默认情况下，管线会执行所有可并行运行的步骤。创建或更新管线以及启动或重试管线执行时，您可以使用 `ParallelismConfiguration` 属性来控制此行为。

每个执行都会应用并行配置。例如，如果启动了两个执行，则每个执行最多可以同时运行 50 个步骤，总共可以同时运行 100 个步骤。此外，启动、重试或更新执行时指定的 `ParallelismConfiguration` 优先于管道中定义的并行配置。

**Example 使用 `ParallelismConfiguration` 创建管道执行**  

```
pipeline = Pipeline(
        name="myPipeline",
        steps=[step_process, step_train]
    )

  pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50})
```

# IAM 访问管理
<a name="build-and-manage-access"></a>

以下各节描述了亚马逊 SageMaker 管道的 AWS Identity and Access Management (IAM) 要求。有关如何实施这些权限的示例，请参阅[先决条件](define-pipeline.md#define-pipeline-prereq)。

**Topics**
+ [

## 管道角色权限
](#build-and-manage-role-permissions)
+ [

## 管道步骤权限
](#build-and-manage-step-permissions)
+ [

## 使用 Amazon S3 存储桶进行 CORS 配置
](#build-and-manage-cors-s3)
+ [

## 自定义 Pipelines 作业的访问管理
](#build-and-manage-step-permissions-prefix)
+ [

## 自定义对管道版本的访问权限
](#build-and-manage-step-permissions-version)
+ [

## 使用管道的服务控制策略
](#build-and-manage-scp)

## 管道角色权限
<a name="build-and-manage-role-permissions"></a>

您的管道需要一个 IAM 管道执行角色，该角色将在您创建管道时传递给管道。 您用于创建管道的 SageMaker AI 实例的角色必须具有指定管道执行角色的`iam:PassRole`权限的策略。这是因为实例需要权限才能将您的管道执行角色传递给 Pipelines 服务，以用于创建和运行管道。有关 IAM 角色的更多信息，请参阅 [IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html)。

您的管道执行角色需要以下权限：
+ 您可以为管道中的任何 SageMaker AI 任务步骤使用唯一角色或自定义角色（而不是默认使用的管道执行角色）。请确保您的管道执行角色已添加一个策略，该策略具有指定每个角色的`iam:PassRole`权限。
+  管道中每种作业类型的 `Create` 和 `Describe` 权限。
+  Amazon S3 使用 `JsonGet` 函数的权限。您可以使用基于资源的策略和基于身份的策略来控制对 Amazon S3 资源的访问。基于资源的策略会应用到 Amazon S3 存储桶，并授予 Pipelines 对存储桶的访问权限。基于身份的策略使您的管道能够从您的账户调用 Amazon S3。有关基于资源的策略和基于身份的策略的更多信息，请参阅[基于身份的策略和基于资源的策略](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html)。

  ```
  {
      "Action": [
          "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::<your-bucket-name>/*",
      "Effect": "Allow"
  }
  ```

## 管道步骤权限
<a name="build-and-manage-step-permissions"></a>

管道包括运行 A SageMaker I 作业的步骤。为了让管道步骤运行这些作业，需要在您的账户中设置一个 IAM 角色，以提供对所需资源的访问权限。此角色由您的管道传递给 SageMaker AI 服务主体。有关 IAM 角色的更多信息，请参阅 [IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html)。

默认情况下，每个步骤都扮演管道执行角色。您可以选择将不同的角色传递给管道中的任何步骤。这样可以确保每个步骤中的代码不会影响其他步骤中使用的资源，除非管道定义中指定的两个步骤之间存在直接关系。您可以在为步骤定义处理器或估算器时传递这些角色。有关如何在这些定义中包含这些角色的示例，请参阅 [SageMaker AI Python SDK 文档](https://sagemaker.readthedocs.io/en/stable/overview.html#using-estimators)。

## 使用 Amazon S3 存储桶进行 CORS 配置
<a name="build-and-manage-cors-s3"></a>

为确保映像以可预测的方式从 Amazon S3 存储桶导入到管道中，必须向从中导入映像的 Amazon S3 存储桶添加 CORS 配置。此部分说明了如何为 Amazon S3 存储桶设置所需的 CORS 配置。Pipelines 所需的 XML `CORSConfiguration` 与[输入映像数据的 CORS 要求](sms-cors-update.md)中的不同，不过您仍可借助其中的信息，进一步了解 Amazon S3 存储桶的 CORS 要求。

对托管映像的 Amazon S3 存储桶使用以下 CORS 配置代码。有关如何配置 CORS 的说明，请参阅《Amazon Simple Storage Service 用户指南》中的[配置跨源资源共享（CORS）](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/add-cors-configuration.html)。如果您使用 Amazon S3 控制台将策略添加到存储桶，则必须使用 JSON 格式。

**JSON**

```
[
    {
        "AllowedHeaders": [
            "*"
        ],
        "AllowedMethods": [
            "PUT"
        ],
        "AllowedOrigins": [
            "*"
        ],
        "ExposeHeaders": [
            "Access-Control-Allow-Origin"
        ]
    }
]
```

**XML**

```
<CORSConfiguration>
 <CORSRule>
   <AllowedHeader>*</AllowedHeader>
   <AllowedOrigin>*</AllowedOrigin>
   <AllowedMethod>PUT</AllowedMethod>
   <ExposeHeader>Access-Control-Allow-Origin</ExposeHeader>
 </CORSRule>
</CORSConfiguration>
```

下面的 GIF 演示了 Amazon S3 文档中使用 Amazon S3 控制台添加 CORS 头策略的说明。

![\[关于如何使用 Amazon S3 管理控制台添加 CORS 标头策略的图片。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/sms/gifs/cors-config.gif)


## 自定义 Pipelines 作业的访问管理
<a name="build-and-manage-step-permissions-prefix"></a>

您可以进一步自定义 IAM 策略，以便组织中的选定成员可以运行任意或所有管道步骤。例如，您可以向某些用户授予创建训练作业的权限，向另一组用户授予创建处理作业的权限，以及向所有用户授予运行其余步骤的权限。要使用此特征，请选择一个以您的作业名称为前缀的自定义字符串。您的管理员在允许 ARNs 的前面加上前缀，而您的数据科学家则在管道实例化中包含此前缀。由于获允许用户的 IAM 策略包含带有指定前缀的作业 ARN，因此您的管道步骤的后续作业具有继续操作所需的权限。作业前缀默认情况下关闭，您必须在 `Pipeline` 类中启用此选项才能使用它。

对于关闭了前缀的作业，作业名称的格式如图所示，该名称由下表中所述的字段串联而成：

`pipelines-<executionId>-<stepNamePrefix>-<entityToken>-<failureCount>`


| 字段 | 定义 | 
| --- | --- | 
|  pipelines   |  静态字符串，始终前置。此字符串将管道编排服务标识为作业的来源。  | 
|  executionId  |  正在运行的管道实例的随机缓冲区。  | 
|  stepNamePrefix  |  用户指定的步骤名称（在管道步骤的 `name` 参数中给出），限于前 20 个字符。  | 
|  entityToken  |  随机令牌，用于确保步骤实体的幂等性。  | 
|  failureCount  |  当前为完成作业而尝试重试的次数。  | 

在这种情况下，作业名称前面没有自定义前缀，相应的 IAM 策略必须与此字符串匹配。

对于开启作业前缀的用户，底层作业名称采用以下形式，自定义前缀指定为 `MyBaseJobName`：

*<MyBaseJobName>*-*<executionId>*-*<entityToken>*-*<failureCount>*

自定义前缀取代了静态`pipelines`字符串，以帮助您缩小可以作为管道一部分运行 SageMaker AI 作业的用户的选择范围。

**前缀长度限制**

作业名称具有特定于各个管道步骤的内部长度限制。此约束还限定了允许的前缀长度。前缀长度要求如下：


| 管道步骤 | 前缀长度 | 
| --- | --- | 
|   `[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`, `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`, `[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`, `[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`, `[ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)`, `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)`, `[RegisterModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`   |  38  | 
|  `[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`, `[AutoML](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`  |  6  | 

### 将作业前缀应用于 IAM 策略
<a name="build-and-manage-step-permissions-prefix-iam"></a>

您的管理员创建 IAM 策略，允许具有特定前缀的用户创建作业。以下示例策略允许数据科学家在使用 `MyBaseJobName` 前缀的情况下创建训练作业。

```
{
    "Action": "sagemaker:CreateTrainingJob",
    "Effect": "Allow",
    "Resource": [
        "arn:aws:sagemaker:region:account-id:*/MyBaseJobName-*"
    ]
}
```

### 将作业前缀应用于管道实例化
<a name="build-and-manage-step-permissions-prefix-inst"></a>

您可以使用作业实例类的 `*base_job_name` 参数来指定前缀。

**注意**  
创建管道步骤之前，您可以将带有 `*base_job_name` 参数的作业前缀传递给作业实例。此作业实例包含使作业作为管道中的一个步骤运行的必要信息。该参数因所使用的作业实例而异。以下列表显示了每种管道步骤类型应使用的参数：  
对于 `[Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)` (`[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`)、`[Processor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html)` (`[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`) 和 `[AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html)` (`[AutoMLStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`) 类，使用 `base_job_name`
对于 `[Tuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html)` 类 (`[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`)，使用 `tuning_base_job_name`
对于 `[Transformer](https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html)` 类 (`[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`)，使用 `transform_base_job_name`
对于 `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)` (Quality Check) 和 `[ClarifyCheckstep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)` (Clarify Check) 类，使用 `[CheckJobConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#checkjobconfig)` 的 `base_job_name`
对于 `[Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html)` 类，使用的参数取决于您在将结果传递给 `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)` 之前是运行 `create` 还是 `register`  
如果您调用 `create`，则自定义前缀来自您构造模型（即 `Model(name=)`）时的 `name` 参数
如果您调用 `register`，则自定义前缀来自您调用 `register`（即 `my_model.register(model_package_name=)`）时的 `model_package_name` 参数

以下示例说明了如何为新的训练作业实例指定前缀。

```
# Create a job instance
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
    subnets=["subnet-0ab12c34567de89f0"],
    base_job_name="MyBaseJobName"
    security_group_ids=["sg-1a2bbcc3bd4444e55"],
    tags = [ ... ]
    encrypt_inter_container_traffic=True, 
)

# Attach your job instance to a pipeline step
step_train = TrainingStep(
    name="TestTrainingJob",
    estimator=xgb_train, 
    inputs={
        "train": TrainingInput(...), 
        "validation": TrainingInput(...) 
    }
)
```

作业前缀默认情况下关闭。要选择使用此特征，请使用 `PipelineDefinitionConfig` 的 `use_custom_job_prefix` 选项，如以下代码片段所示：

```
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
        
# Create a definition configuration and toggle on custom prefixing
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True);

# Create a pipeline with a custom prefix
 pipeline = Pipeline(
     name="MyJobPrefixedPipeline",
     parameters=[...]
     steps=[...]
     pipeline_definition_config=definition_config
)
```

创建并运行管道。以下示例创建并运行管道，还演示了如何关闭作业前缀并重新运行管道。

```
pipeline.create(role_arn=sagemaker.get_execution_role())

# Optionally, call definition() to confirm your prefixed job names are in the built JSON
pipeline.definition()
pipeline.start()
      
# To run a pipeline without custom-prefixes, toggle off use_custom_job_prefix, update the pipeline 
# via upsert() or update(), and start a new run
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=False)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

同样，您可以为现有管道开启该特征，然后启动使用作业前缀的新运行。

```
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

最后，您可以在管道执行时调用 `list_steps` 来查看自定义前缀的作业。

```
steps = execution.list_steps()

prefixed_training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']
```

## 自定义对管道版本的访问权限
<a name="build-and-manage-step-permissions-version"></a>

您可以使用`sagemaker:PipelineVersionId`条件键授予对特定版本的 Amazon Pi SageMaker pelines 的自定义访问权限。例如，以下策略仅允许对版本 ID 为 6 及以上的管道启动执行或更新管道版本。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": {
        "Sid": "AllowStartPipelineExecution",
        "Effect": "Allow",
        "Action": [
            "sagemaker:StartPipelineExecution",
            "sagemaker:UpdatePipelineVersion"
        ],
        "Resource": "*",
        "Condition": {
            "NumericGreaterThanEquals": {
                "sagemaker:PipelineVersionId": 6
            }
        }
    }
}
```

------

有关支持的条件键的更多信息，请参阅 [Amazon A SageMaker I 的条件键](https://docs.aws.amazon.com//service-authorization/latest/reference/list_amazonsagemaker.html#amazonsagemaker-policy-keys)。

## 使用管道的服务控制策略
<a name="build-and-manage-scp"></a>

服务控制策略 (SCPs) 是一种组织策略，可用于管理组织中的权限。 SCPs 集中控制组织中所有账户的最大可用权限。通过在组织中使用 Pipelines，您可以确保数据科学家无需与 AWS 控制台交互即可管理您的管道执行。 

如果您的 SCP 使用的 VPC 限制访问 Amazon S3，则需要采取措施允许您的管道访问其他 Amazon S3 资源。

要允许 Pipelines 使用 `JsonGet` 功能访问 VPC 外部的 Amazon S3，请更新组织的 SCP，确保使用 Pipelines 的角色可以访问 Amazon S3。为此，请使用主体标签和条件键为管道执行器通过管道执行角色使用的角色创建一个异常情况。

**允许 Pipelines 访问 VPC 以外的 Amazon S3**

1. 按照[标记 IAM 用户和角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html)中的步骤为管道执行角色创建唯一标签。

1. 使用您创建的标签的 `Aws:PrincipalTag IAM` 条件键，在您的 SCP 中授予一个例外。有关更多信息，请参阅[创建、更新和删除服务控制策略](https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps_create.html)。

# 为 Pipelines 设置跨账户支持
<a name="build-and-manage-xaccount"></a>

Amazon Pipel SageMaker ines 的跨账户支持使您能够与使用不同 AWS 账户运营的其他团队或组织在机器学习管道上进行协作。通过设置跨账户管道共享，您可以对管道授予受控访问权限，允许其他账户查看管道详情、触发执行和监控运行。以下主题介绍如何设置跨账户管道共享、共享资源可用的不同权限策略，以及如何通过对 SageMaker AI 的直接 API 调用来访问共享管道实体并与之交互。

## 设置跨账户管道共享
<a name="build-and-manage-xaccount-set-up"></a>

SageMaker AI 使用 R [es AWS ource Access Manager](https://docs.aws.amazon.com/ram/latest/userguide/what-is.html) (AWS RAM) 来帮助你安全地跨账户共享管道实体。

### 创建资源共享
<a name="build-and-manage-xaccount-set-up-console"></a>

1. 通过 [AWS RAM 控制台](https://console.aws.amazon.com/ram/home)选择**创建资源共享**。

1. 指定资源共享详细信息时，请选择管道资源类型，并选择要共享的一个或多个管道。当您与任何其他账户共享管道时，其所有执行也会被隐式共享。

1. 将权限与资源共享关联。选择默认只读权限策略或扩展的管道执行权限策略。有关更多详细信息，请参阅 [Pipelines 资源的权限策略](#build-and-manage-xaccount-permissions)。
**注意**  
如果您选择扩展管道执行策略，请注意共享账户调用的任何启动、停止和重试命令都使用共享管道的 AWS 账户中的资源。

1. 使用 AWS 账户指定 IDs 要向其授予共享资源访问权限的账户。

1. 查看您的资源共享配置，然后选择**创建资源共享**。可能需要几分钟时间来完成资源共享和主体关联。

有关更多信息，请参阅《AWS Resource Access Manager 用户指南》**中的[共享 AWS 资源](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-sharing.html)。

### 获取对资源共享邀请的回复
<a name="build-and-manage-xaccount-set-up-responses"></a>

设置资源共享和主体关联后，指定的 AWS 账户会收到加入资源共享的邀请。 AWS 账户必须接受邀请才能访问任何共享资源。

有关通过接受资源共享邀请的更多信息 AWS RAM，请参阅 [Resource Acc *ess Manager 用户指南*中的使用共享 AWS 资源](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-shared.html)。AWS 

## Pipelines 资源的权限策略
<a name="build-and-manage-xaccount-permissions"></a>

创建资源共享时，请选择两个支持的权限策略之一，将其与 SageMaker AI 管道资源类型相关联。这两个策略都授予对任何选定管道及其所有执行的访问权限。

### 默认只读权限
<a name="build-and-manage-xaccount-permissions-default"></a>

`AWSRAMDefaultPermissionSageMakerPipeline` 策略允许执行以下只读操作：

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:Search"
```

### 扩展的管道执行权限
<a name="build-and-manage-xaccount-permissions-extended"></a>

`AWSRAMPermissionSageMakerPipelineAllowExecution` 策略包括默认策略中的所有只读权限，还允许共享账户启动、停止和重试管道执行。

**注意**  
使用扩展管道执行权限策略时，请注意 AWS 资源使用情况。使用此策略时，允许共享账户启动、停止和重试管道执行。用于共享管道执行的所有资源均由拥有者账户使用。

扩展的管道执行权限策略允许执行以下操作：

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:StartPipelineExecution"
"sagemaker:StopPipelineExecution"
"sagemaker:RetryPipelineExecution"
"sagemaker:Search"
```

## 通过直接 API 调用访问共享管道实体
<a name="build-and-manage-xaccount-api-calls"></a>

跨账户渠道共享设置完成后，您可以使用管道 ARN 调用以下 SageMaker API 操作：

**注意**  
只有当 API 命令包含在与资源共享关联的权限中时，才能调用这些命令。如果您选择`AWSRAMPermissionSageMakerPipelineAllowExecution`策略，则启动、停止和重试命令将使用共享管道的 AWS 账户中的资源。
+ [DescribePipeline](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipeline.html)
+ [DescribePipelineDefinitionForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineDefinitionForExecution.html)
+ [DescribePipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html)
+ [ListPipelineExecutions](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutions.html)
+ [ListPipelineExecutionSteps](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutionSteps.html)
+ [ListPipelineParametersForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineParametersForExecution.html)
+ [StartPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+ [StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html)
+ [RetryPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryPipelineExecution.html)

# Pipeline 参数
<a name="build-and-manage-parameters"></a>

您可以使用参数在管道定义中引入变量。您可以在整个管道定义过程中引用您定义的参数。参数具有默认值，您可以在开始执行管道时通过指定参数值来覆盖该值。默认值必须是与参数类型匹配的实例。步骤定义中使用的所有参数都必须在管道定义中定义。本主题介绍可以定义的参数以及如何执行这些参数。

Amaz SageMaker on Pipelines 支持以下参数类型：
+  `ParameterString` - 表示字符串参数。
+  `ParameterInteger` - 表示整数参数。
+  `ParameterFloat` - 表示浮点参数。
+  `ParameterBoolean` - 表示布尔 Python 类型。

参数采用以下格式：

```
<parameter> = <parameter_type>(
    name="<parameter_name>",
    default_value=<default_value>
)
```

下面的示例显示了一个示例参数实施。

```
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
```

创建管道时传递参数，如以下示例所示。

```
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count
    ],
    steps=[step_process]
)
```

还可将不同于默认值的参数值传递给管道执行，如以下示例所示。

```
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceCount="2",
        ModelApprovalStatus="Approved"
    )
)
```

你可以使用 SageMaker Python SDK 函数来操作参数，比如`[ sagemaker.workflow.functions.Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.functions.Join)`。有关参数的更多信息，请参阅[ SageMaker 管道参数](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters)。

有关管道参数的已知限制，请参阅 Amaz [on Pyth SageMaker on](https://sagemaker.readthedocs.io/en/stable) SD *[K 中的限制-参数化](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#parameterization)*。

# Pipelines 步骤
<a name="build-and-manage-steps"></a>

Pipelines 由多个步骤组成。这些步骤使用属性定义管道采取的操作以及各步骤之间的关系。下一页将介绍步骤的类型、属性以及它们之间的关系。

**Topics**
+ [

# 增加一个步骤
](build-and-manage-steps-types.md)
+ [

# 添加集成
](build-and-manage-steps-integration.md)
+ [

## 步骤属性
](#build-and-manage-properties)
+ [

## 步骤并行
](#build-and-manage-parallelism)
+ [

## 步骤之间的数据依赖性
](#build-and-manage-data-dependency)
+ [

## 步骤之间的自定义依赖关系
](#build-and-manage-custom-dependency)
+ [

## 自定义映像一步到位
](#build-and-manage-images)

# 增加一个步骤
<a name="build-and-manage-steps-types"></a>

下文介绍了每种步骤类型的要求，并提供了步骤的实现示例，以及如何将步骤添加到 Pipelines 中。这些都不是可行的实施方案，因为它们没有提供所需的资源和投入。有关实施这些步骤的教程，请参阅[Pipelines 操作](pipelines-build.md)。

**注意**  
您还可以使用 `@step` 装饰器将本地机器学习代码转换为 Pipelines 步骤，从而从本地机器学习代码中创建步骤。有关更多信息，请参阅 [@step decorator](#step-type-custom)。

Amaz SageMaker on Pipelines 支持以下步骤类型：
+ [执行代码](#step-type-executecode)

  [Processing](#step-type-processing)
+ [训练](#step-type-training)
+ [优化](#step-type-tuning)
+ [AutoML](#step-type-automl)
+ [`Model`](#step-type-model)
+ [`Create model`](#step-type-create-model)
+ [`Register model`](#step-type-register-model)
+ [`Deploy model (endpoint)`](#step-type-deploy-model-endpoint)
+ [转换](#step-type-transform)
+ [条件](#step-type-condition)
+ [`Callback`](#step-type-callback)
+ [Lambda](#step-type-lambda)
+ [`ClarifyCheck`](#step-type-clarify-check)
+ [`QualityCheck`](#step-type-quality-check)
+ [EMR](#step-type-emr)
+ [笔记本作业](#step-type-notebook-job)
+ [Fail](#step-type-fail)

## @step decorator
<a name="step-type-custom"></a>

如果您想在 Pipelines 用户界面中编排利用高级 SageMaker AI 功能或其他 AWS 服务的自定义 ML 作业，请使用。 drag-and-drop [执行代码步骤](#step-type-executecode)

您可以使用 `@step` 装饰器从本地机器学习代码中创建一个步骤。测试代码后，您可以通过使用装饰器对其进行注释来将该函数转换为 SageMaker AI 管道步骤。`@step`当您将 `@step` 装饰功能的输出作为一个步骤传递给管道时，Pipelines 会创建并运行管道。您还可以创建一个多步骤 DAG 管道，其中包括一个或多个`@step`经过装饰的函数以及传统的 SageMaker AI 工作流步骤。有关如何使用 `@step` 装饰器创建步骤的更多详情，请参阅 [Lift-and-shift 使用 @step 装饰器的 Python 代码](pipelines-step-decorator.md)。

## 执行代码步骤
<a name="step-type-executecode"></a>

在 Pipelines drag-and-drop UI 中，您可以使用 “**执行代码**” 步骤将自己的代码作为工作流步骤运行。您可以上传一个 Python 函数、脚本或笔记本，作为管道的一部分执行。如果您想编排利用高级 SageMaker AI 功能或其他 AWS 服务的自定义 ML 作业，则应使用此步骤。

“**执行代码**” 步骤将文件上传到你的 Amazon A SageMaker I 默认 Amazon S3 存储桶。该存储桶可能未设置所需的跨源资源共享 (CORS) 权限。要了解有关配置 CORS 权限的更多信息，请参阅 [输入映像数据的 CORS 要求](sms-cors-update.md)。

“**执行代码”** 步骤使用 Amazon SageMaker 训练作业来运行您的代码。确保您的 IAM 角色具有 `sagemaker:DescribeTrainingJob` 和 `sagemaker:CreateTrainingJob` API 权限。要详细了解 Amazon A SageMaker I 的所有必需权限以及如何设置这些权限，请参阅[Amazon SageMaker AI API 权限：操作、权限和资源参考](api-permissions-reference.md)。

要使用管道设计器向管道添加执行代码步骤，请执行以下操作：

1. 按照中的说明打开 Amazon SageMaker Studio 控制台[启动亚马逊 SageMaker Studio](studio-updated-launch.md)。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**执行代码**，然后将其拖到画布上。

1. 在画布中，选择添加的**执行代码**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。

1. 您可以上传单个文件来执行，也可以上传包含多个构件的压缩文件夹。

1. 对于单个文件上传，您可以为笔记本、python 函数或脚本提供可选参数。

1. 在提供 Python 函数时，必须以 `file.py:<function_name>` 格式提供处理程序。

1. 对于上传压缩文件夹，必须提供代码的相对路径，您还可以选择提供压缩文件夹内 `requirements.txt` 文件或初始化脚本的路径。

1. 如果画布上有任何步骤紧接在您添加的**执行代码**步骤之前，请单击并拖动光标从该步骤到**执行代码**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**执行代码**步骤的步骤，请单击并拖动光标从**执行代码**步骤到该步骤，以创建边缘。可以引用**执行代码**步骤的输出作为 Python 函数。

## 处理步骤
<a name="step-type-processing"></a>

使用处理步骤创建用于数据处理的处理作业。有关处理作业的更多信息，请参阅[处理数据和评估模型](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html)。

------
#### [ Pipeline Designer ]

要使用管道设计器向管道添加处理步骤，请执行以下操作：

1. 按照中的说明打开 Amazon SageMaker Studio 控制台[启动亚马逊 SageMaker Studio](studio-updated-launch.md)。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 在左侧边栏中选择**处理数据**，然后将其拖到画布上。

1. 在画布中，选择添加的**处理数据**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.steps。 ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep)。

1. 如果画布上有任何步骤紧接在您添加的**处理数据**步骤之前，请单击并拖动光标从该步骤到**处理数据**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**处理数据**步骤的步骤，请单击并拖动光标从**处理数据**步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

处理步骤需要处理器、定义处理代码的 Python 脚本、用于处理的输出以及作业参数。以下示例说明如何创建 `ProcessingStep` 定义。

```
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='1.0-1',
                                     role=<role>,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=1)
```

```
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

inputs = [
    ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),
]

outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args = sklearn_processor.run(inputs=inputs, outputs=outputs,
        code="abalone/preprocessing.py")
)
```

**传递运行时参数**

以下示例说明如何将运行时参数从 PySpark处理器传递给`ProcessingStep`。

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

pipeline_session = PipelineSession()

pyspark_processor = PySparkProcessor(
    framework_version='2.4',
    role=<role>,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    sagemaker_session=pipeline_session,
)

step_args = pyspark_processor.run(
    inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="preprocess.py",
    arguments=None,
)


step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args=step_args,
)
```

有关处理步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep)文档。有关深入的示例，请参阅使用 [Amazon Pipelines 编排任务以训练和评估模型的 SageMaker 示例](https://github.com/aws/amazon-sagemaker-examples/blob/62de6a1fca74c7e70089d77e36f1356033adbe5f/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb)笔记本。*为特征工程定义处理步骤*部分包含更多信息。

------

## 训练步骤
<a name="step-type-training"></a>

您可以使用训练步骤创建训练作业来训练模型。有关训练作业的更多信息，请参阅[使用 Amazon A SageMaker I 训练模型](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html)。

训练步骤需要估算器以及训练和验证数据输入。

------
#### [ Pipeline Designer ]

要使用管道设计器向管道添加训练步骤，请执行以下操作：

1. 按照中的说明打开 Amazon SageMaker Studio 控制台[启动亚马逊 SageMaker Studio](studio-updated-launch.md)。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**训练模型**，然后将其拖到画布上。

1. 在画布中选择添加的**训练模型**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.steps。 TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep)。

1. 如果画布上有任何步骤紧接在您添加的**训练模型**步骤之前，请单击并拖动光标从该步骤到**训练模型**步骤，以创建边缘。

1. 如果画布上有任何步骤紧接着您添加的**训练模型**步骤，请单击并拖动光标从**训练模型**步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

以下示例说明如何创建 `TrainingStep` 定义。有关训练步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep)文档。

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.xgboost.estimator import XGBoost

pipeline_session = PipelineSession()

xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session)

step_args = xgb_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
)
```

------

## 优化步骤
<a name="step-type-tuning"></a>

您可以使用优化步骤来创建超参数优化作业，也称为超参数优化 (HPO)。超参数调整作业运行多个训练作业，每个作业产生一个模型版本。有关超参数优化的更多信息，请参阅[使用 SageMaker AI 自动调整模型](automatic-model-tuning.md)。

调优作业与管道的 SageMaker AI 实验相关联，而训练作业是作为试验创建的。有关更多信息，请参阅 [实验集成](pipelines-experiments.md)。

调整步骤需要[HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html)和训练输入。您可以通过指定 `HyperparameterTuner` 的 `warm_start_config` 参数来再训练以前的优化作业。有关超参数优化和温启动的更多信息，请参阅[运行热启动超参数调优作业](automatic-model-tuning-warm-start.md)。

[你可以使用 sagemaker.workflow.step [s 的 get\$1top\$1model\$1s3\$1uri](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) 方法。 TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep)class，用于从性能最好的模型版本之一中获取模型工件。有关显示如何在 SageMaker AI 管道中使用调整步骤的笔记本，请参阅 [sagemaker-pipelines-tuning-step.ipynb。](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/tuning-step/sagemaker-pipelines-tuning-step.ipynb)

**重要**  
Amaz SageMaker on Python SDK v2.48.0 和 Amazon Studio Classic v3.8.0 中引入了调整步骤。 SageMaker 使用调整步骤前必须更新 Studio Classic，否则不会显示管道 DAG。要更新 Studio Classic，请参阅 [关闭并更新 Amazon SageMaker Studio 经典版](studio-tasks-update-studio.md)。

以下示例说明如何创建 `TuningStep` 定义。

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep

tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession())
    
step_tuning = TuningStep(
    name = "HPTuning",
    step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data"))
)
```

**获取最佳模型版本**

以下示例说明如何使用 `get_top_model_s3_uri` 方法从优化作业中获取最佳模型版本。最多只能根据排名前50位的版本进行排名[HyperParameterTuningJobObjective](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HyperParameterTuningJobObjective.html)。`top_k` 参数是版本的索引，其中 `top_k=0` 是性能最好的版本，`top_k=49` 是性能最差的版本。

```
best_model = Model(
    image_uri=image_uri,
    model_data=step_tuning.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=sagemaker_session.default_bucket()
    ),
    ...
)
```

有关调整步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep)文档。

## 微调步骤
<a name="step-type-fine-tuning"></a>

Fine-tuning 在新数据集上训练来自 Amaz SageMaker JumpStart on 的预训练基础模型。这个过程也称为转移学习，可以使用较小数据集和较短的训练时间生成准确模型。对模型进行微调时，可以使用默认数据集，也可以选择自己的数据。要了解有关微调基础模型的更多信息 JumpStart，请参阅[微调模型](jumpstart-fine-tune.md)。

微调步骤使用 Amazon SageMaker 训练作业自定义您的模型。确保您的 IAM 角色具有 `sagemaker:DescribeTrainingJob` 和 `sagemaker:CreateTrainingJob` API 权限，以便在管道中执行微调作业。API 权限，以便在管道中执行微调作业。要了解有关 Amazon A SageMaker I 所需权限以及如何设置权限的更多信息，请参阅[Amazon SageMaker AI API 权限：操作、权限和资源参考](api-permissions-reference.md)。

要使用 drag-and-drop编辑器向管道中添加 Fine-**Tune 模型**步骤，请执行以下步骤：

1. 按照 [启动亚马逊 SageMaker Studio](studio-updated-launch.md) 中的说明打开 Studio 管理控制台。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**微调模型**，然后将其拖动到画布上。

1. 在画布中，选择添加的**微调模型**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。

1. 如果画布上有任何步骤紧接在您添加的**微调模型**步骤之前，请单击并拖动光标从该步骤到**微调模型**步骤，以创建边缘。

1. 如果画布上有任何步骤紧接着您添加的**微调模型**步骤，请单击并拖动光标从**微调模型**步骤到该步骤，以创建边缘。

## AutoML 步骤
<a name="step-type-automl"></a>

使用 [AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html) API 创建 AutoML 作业以自动训练模型。有关 AutoML 任务的更多信息，请参阅[使用 Ama SageMaker zon Autopilot 自动开发模型](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-automate-model-development.html)。

**注意**  
目前，AutoML 步骤仅支持[组合训练模式](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-model-support-validation.html)。

以下示例说明如何使用 `AutoMLStep` 创建定义。

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.automl_step import AutoMLStep

pipeline_session = PipelineSession()

auto_ml = AutoML(...,
    role="<role>",
    target_attribute_name="my_target_attribute_name",
    mode="ENSEMBLING",
    sagemaker_session=pipeline_session) 

input_training = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-training-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="training",
)
input_validation = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-validation-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="validation",
)

step_args = auto_ml.fit(
    inputs=[input_training, input_validation]
)

step_automl = AutoMLStep(
    name="AutoMLStep",
    step_args=step_args,
)
```

**获取最佳模型版本**

AutoML 步骤会自动训练多个候选模型。使用 `get_best_auto_ml_model` 方法从 AutoML 作业中获取目标指标最佳的模型，如下所示。您还必须使用 IAM `role` 访问模型构件。

```
best_model = step_automl.get_best_auto_ml_model(role=<role>)
```

有关更多信息，请参阅 Pyth SageMaker on 软件开发工具包中的 [AutoML](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.automl_step.AutoMLStep) 步骤。

## 模型步骤
<a name="step-type-model"></a>

`ModelStep`使用创建或注册 A SageMaker I 模型。有关`ModelStep`要求的更多信息，请参阅 sagemaker.workflow. [model\$1step。 ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep)文档。

### 创建模型
<a name="step-type-model-create"></a>

您可以使用创建 `ModelStep` A SageMaker I 模型。A `ModelStep` 需要模型工件和有关创建模型所需的 SageMaker AI 实例类型的信息。有关 SageMaker AI 模型的更多信息，请参阅使用 [Amazon A SageMaker I 训练模型](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html)。

以下示例说明如何创建 `ModelStep` 定义。

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

step_train = TrainingStep(...)
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=PipelineSession(),
    role=role,
)

step_model_create = ModelStep(
   name="MyModelCreationStep",
   step_args=model.create(instance_type="ml.m5.xlarge"),
)
```

### 注册模型
<a name="step-type-model-register"></a>

您可以使用 a `ModelStep` 在 Amazon SageMaker 模型注册中注册`sagemaker.model.Model`或。`sagemaker.pipeline.PipelineModel``PipelineModel` 表示推理管道，是一个由处理推理请求的容器线性序列组成的模型。有关如何注册模型的更多信息，请参阅[利用模型注册中心进行模型注册部署](model-registry.md)。

以下示例说明如何创建一个注册 `PipelineModel` 的 `ModelStep`。

```
import time

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

pipeline_session = PipelineSession()

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(
   model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
   entry_point='inference.py',
   source_dir='sklearn_source_dir/',
   code_location=code_location,
   framework_version='1.0-1',
   role=role,
   sagemaker_session=pipeline_session,
   py_version='py3'
)

xgboost_model = XGBoostModel(
   model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
   entry_point='inference.py',
   source_dir='xgboost_source_dir/',
   code_location=code_location,
   framework_version='0.90-2',
   py_version='py3',
   sagemaker_session=pipeline_session,
   role=role
)

from sagemaker.workflow.model_step import ModelStep
from sagemaker import PipelineModel

pipeline_model = PipelineModel(
   models=[sklearn_model, xgboost_model],
   role=role,sagemaker_session=pipeline_session,
)

register_model_step_args = pipeline_model.register(
    content_types=["application/json"],
   response_types=["application/json"],
   inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
   transform_instances=["ml.m5.xlarge"],
   model_package_group_name='sipgroup',
)

step_model_registration = ModelStep(
   name="AbaloneRegisterModel",
   step_args=register_model_step_args,
)
```

## 创建模型步骤
<a name="step-type-create-model"></a>

您可以使用 “创建模型” 步骤来创建 A SageMaker I 模型。有关 SageMaker AI 模型的更多信息，请参阅[使用 Amazon 训练模型 SageMaker](how-it-works-training.md)。

创建模型步骤需要模型工件和有关创建模型所需的 SageMaker AI 实例类型的信息。下面的示例展示了如何创建创建模型步骤定义。有关创建模型步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep)文档。

------
#### [ Pipeline Designer ]

要在管道中添加创建模型步骤，请执行以下操作：

1. 按照 [启动亚马逊 SageMaker Studio](studio-updated-launch.md) 中的说明打开 Studio 管理控制台。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**创建模型**，然后将其拖到画布上。

1. 在画布中，选择添加的**创建模型**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.steps。 CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep)。

1. 如果画布上有任何步骤紧接在您添加的**创建模型**步骤之前，请单击并拖动光标从该步骤到**创建模型**步骤，以创建边缘。

1. 如果画布上有任何步骤紧接着您添加的**创建模型**步骤，请单击并拖动光标从**创建模型**步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

**重要**  
从 AI SageMaker Py [模型步骤](#step-type-model) thon SDK 版本 2.90.0 起，我们建议使用创建模型。 `CreateModelStep`将继续在以前版本的 SageMaker Python SDK 中运行，但不再受支持。

```
from sagemaker.workflow.steps import CreateModelStep

step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=best_model,
    inputs=inputs
)
```

------

## 注册模型步骤
<a name="step-type-register-model"></a>

“注册模型” 步骤将模型注册到 “ SageMaker 模型注册表”。

------
#### [ Pipeline Designer ]

要使用管道设计器从管道中注册模型，请执行以下操作：

1. 按照中的说明打开 Amazon SageMaker Studio 控制台[启动亚马逊 SageMaker Studio](studio-updated-launch.md)。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**注册模型**，然后将其拖到画布上。

1. 在画布中，选择添加的**注册模型**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.step\$1collections。 RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel)。

1. 如果画布上有任何步骤紧接在您添加的**注册模型**步骤之前，请单击并拖动光标从该步骤到**注册模型**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**注册模型**步骤的步骤，请单击并拖动光标从**注册模型**步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

**重要**  
我们建议使用[模型步骤](#step-type-model)从 AI SageMaker Python SDK 版本 2.90.0 起注册模型。 `RegisterModel`将继续在以前版本的 SageMaker Python SDK 中运行，但不再受支持。

[你可以使用`RegisterModel`步骤注册 sagemaker.model.model 或 [sagemaker.pipeline。](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html) PipelineModel](https://sagemaker.readthedocs.io/en/stable/api/inference/pipeline.html#pipelinemodel)使用 Amazon SageMaker 模型注册表。`PipelineModel` 表示推理管道，是一个由处理推理请求的容器线性序列组成的模型。

有关如何注册模型的更多信息，请参阅[利用模型注册中心进行模型注册部署](model-registry.md)。有关`RegisterModel`步骤要求的更多信息，请参阅 s [agemaker.workflow.step\$1collections。 RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel)文档。

以下示例说明如何创建注册 `PipelineModel` 的 `RegisterModel` 步骤。

```
import time
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
 entry_point='inference.py',
 source_dir='sklearn_source_dir/',
 code_location=code_location,
 framework_version='1.0-1',
 role=role,
 sagemaker_session=sagemaker_session,
 py_version='py3')

xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 entry_point='inference.py',
 source_dir='xgboost_source_dir/',
 code_location=code_location,
 framework_version='0.90-2',
 py_version='py3',
 sagemaker_session=sagemaker_session,
 role=role)

from sagemaker.workflow.step_collections import RegisterModel
from sagemaker import PipelineModel
pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session)

step_register = RegisterModel(
 name="AbaloneRegisterModel",
 model=pipeline_model,
 content_types=["application/json"],
 response_types=["application/json"],
 inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name='sipgroup',
)
```

如果未提供 `model`，则注册模型步骤需要使用估算器，如以下示例所示。

```
from sagemaker.workflow.step_collections import RegisterModel

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)
```

------

## 部署模型（端点）步骤
<a name="step-type-deploy-model-endpoint"></a>

在管道设计器中，使用部署模型（端点）步骤将模型部署到端点。您可以创建一个新端点或使用现有端点。实时推理非常适合有实时、交互式、低延迟要求的推理工作负载。您可以将模型部署到 SageMaker AI Hosting 服务，并获得可用于推理的实时终端节点。这些端点可完全托管，并支持自动扩缩。要了解有关 SageMaker AI 中实时推理的更多信息，请参阅[实时推理](realtime-endpoints.md)。

在管道中添加部署模型步骤之前，请确保您的 IAM 角色具有以下权限：
+ `sagemaker:CreateModel`
+ `sagemaker:CreateEndpointConfig`
+ `sagemaker:CreateEndpoint`
+ `sagemaker:UpdateEndpoint`
+ `sagemaker:DescribeModel`
+ `sagemaker:DescribeEndpointConfig`
+ `sagemaker:DescribeEndpoint`

要详细了解 SageMaker AI 所需的所有权限以及如何设置这些权限，请参阅[Amazon SageMaker AI API 权限：操作、权限和资源参考](api-permissions-reference.md)。

要在 drag-and-drop编辑器中向流水线添加模型部署步骤，请完成以下步骤：

1. 按照 [启动亚马逊 SageMaker Studio](studio-updated-launch.md) 中的说明打开 Studio 管理控制台。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**部署模型（端点）**，然后将其拖到画布上。

1. 在画布中，选择添加的**部署模型（端点）**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。

1. 如果画布上有任何步骤紧接在您添加的**部署模型（端点）**步骤之前，请单击并拖动光标，从该步骤到**部署模型（端点）**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**部署模型（端点）**步骤的步骤，请单击并将光标从**部署模型（端点）**步骤拖动到该步骤，以创建边缘。

## 转换步骤
<a name="step-type-transform"></a>

您可以使用转换步骤进行批量转换，在整个数据集上运行推理。有关批量转换的更多信息，请参阅[利用推理管道进行批量转换](inference-pipeline-batch.md)。

转换步骤需要转换器和用于运行批量转换的数据。下面的示例显示了如何创建变换步骤定义。有关转换步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep)文档。

------
#### [ Pipeline Designer ]

要使用 drag-and-drop可视化编辑器向管道添加批量转换步骤，请执行以下操作：

1. 按照 [启动亚马逊 SageMaker Studio](studio-updated-launch.md) 中的说明打开 Studio 管理控制台。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**部署模型（批量转换）**，然后将其拖到画布上。

1. 在画布中，选择添加的**部署模型（批量转换）**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.steps。 TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep)。

1. 如果画布上有任何步骤紧接在您添加的**部署模型（批量转换）**步骤之前，请单击并拖动光标从该步骤到**部署模型（批量转换）**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**部署模型（批量转换）**步骤的步骤，请单击并将光标从**部署模型（批量转换）**步骤拖动到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(..., sagemaker_session=PipelineSession())

step_transform = TransformStep(
    name="AbaloneTransform",
    step_args=transformer.transform(data="s3://amzn-s3-demo-bucket/my-data"),
)
```

------

## 条件步骤
<a name="step-type-condition"></a>

您可以使用条件步骤来评估步骤属性的条件，以评估管道中下一步应该采取的操作。

需要一个条件步骤：
+ 一个条件列表。
+ 如果条件评估结果为 `true`，则要运行的步骤列表。
+ 如果条件评估结果为 `false`，则要运行的步骤列表。

------
#### [ Pipeline Designer ]

要使用 Pipeline Designer 向管道添加条件步骤，请执行以下操作：

1. 按照中的说明打开 Amazon SageMaker Studio 控制台[启动亚马逊 SageMaker Studio](studio-updated-launch.md)。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择**条件**，然后将其拖到画布上。

1. 在画布中，选择添加的**条件**步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.condition\$1step。 ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep)。

1. 如果画布上有任何步骤紧接在您添加的**条件**步骤之前，请单击并拖动光标从该步骤到**条件**步骤，以创建边缘。

1. 如果画布中包含任何紧接您添加的**条件**步骤的步骤，请单击并拖动光标从**条件**步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

 以下示例说明如何创建 `ConditionStep` 定义。

**限制**
+ 管道不支持使用嵌套条件步骤。不能将一个条件步骤作为另一个条件步骤的输入进行传递。
+ 一个条件步骤不能在两个分支中使用相同的步骤。如果需要在两个分支中使用相同的步骤功能，请复制该步骤并为其指定不同的名称。

```
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[]
)
```

有关`ConditionStep`要求的更多信息，请参阅 sagemaker.workflow.con [dition\$1step。 ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep)API 参考。有关支持条件的更多信息，请参阅 *[A SageMaker I Python SDK 文档中的 Amazon Pipelin SageMaker es-条件](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions)*。

------

## 回调步骤
<a name="step-type-callback"></a>

使用`Callback`步骤向您的工作流程中添加并非由 Amazon Pipelines 直接提供的其他流程和 AWS SageMaker 服务。当 `Callback` 步骤运行时，会发生以下过程：
+ Pipelines 向客户指定的 Amazon Simple Queue Service (Amazon SQS) 队列发送消息。该信息包含一个 Pipelines 生成的标记和客户提供的输入参数列表。发送信息后，Pipelines 会等待客户的回复。
+ 客户从 Amazon SQS 队列中检索消息并启动他们的自定义流程。
+ 该过程完成后，客户调用以下方法之一 APIs 并提交 Pipelines 生成的令牌：
  +  [SendPipelineExecutionStepSuccess](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepSuccess.html)，以及输出参数列表
  +  [SendPipelineExecutionStepFailure](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepFailure.html)，以及失败原因
+ API 调用会导致 Pipelines 继续执行管道流程或使流程失败。

有关`Callback`步骤要求的更多信息，请参阅 sagemaker.workflow.cal [lback\$1step。 CallbackStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.callback_step.CallbackStep)文档。有关完整的解决方案，请参阅[使用回调步骤扩展 SageMaker 管道以包含自定义步骤](https://aws.amazon.com/blogs/machine-learning/extend-amazon-sagemaker-pipelines-to-include-custom-steps-using-callback-steps/)。

**重要**  
`Callback`亚马逊 SageMaker Python SDK v2.45.0 和 Amazon SageMaker Studio Classic v3.6.2 中引入了步骤。在使用 `Callback` 步骤之前，您必须更新 Studio Classic，否则不会显示管道 DAG。要更新 Studio Classic，请参阅 [关闭并更新 Amazon SageMaker Studio 经典版](studio-tasks-update-studio.md)。

下面的示例展示了上述程序的执行过程。

```
from sagemaker.workflow.callback_step import CallbackStep

step_callback = CallbackStep(
    name="MyCallbackStep",
    sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue",
    inputs={...},
    outputs=[...]
)

callback_handler_code = '
    import boto3
    import json

    def handler(event, context):
        sagemaker_client=boto3.client("sagemaker")

        for record in event["Records"]:
            payload=json.loads(record["body"])
            token=payload["token"]

            # Custom processing

            # Call SageMaker AI to complete the step
            sagemaker_client.send_pipeline_execution_step_success(
                CallbackToken=token,
                OutputParameters={...}
            )
'
```

**注意**  
不得嵌套 `CallbackStep` 的输出参数。例如，如果您使用嵌套字典作为输出参数，则该字典将被视为单个字符串（例如 `{"output1": "{\"nested_output1\":\"my-output\"}"}`)。 如果您提供嵌套值，则当您尝试引用特定的输出参数时， SageMaker AI 会抛出不可重试的客户端错误。

**停止行为**

当 `Callback` 步骤运行时，管道进程不会停止。

当您使用正在运行的`Callback`步骤调用[StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html)管道进程时，Pipelines 会向 SQS 队列发送一条 Amazon SQS 消息。SQS 消息正文包含一个**状态**字段，该字段设置为 `Stopping`。以下是 SQS 消息正文示例。

```
{
  "token": "26vcYbeWsZ",
  "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a",
  "arguments": {
    "number": 5,
    "stringArg": "some-arg",
    "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv"
  },
  "status": "Stopping"
}
```

您应该在 Amazon SQS 消息用户中添加逻辑，以便在收到消息后采取任何必要的措施（例如，资源清理）。然后添加对 `SendPipelineExecutionStepSuccess` 或 `SendPipelineExecutionStepFailure` 的调用。

只有当 Pipelines 收到这些调用时，它才会停止管道进程。

## Lambda 步骤
<a name="step-type-lambda"></a>

您可以使用 Lambda 步骤来运行函数。 AWS Lambda 您可以运行现有的 Lambda 函数，或者 SageMaker AI 可以创建并运行新的 Lambda 函数。如果您选择使用现有 Lambda 函数，则该函数必须与 SageMaker AI 管 AWS 区域 道相同。[有关展示如何在 SageMaker AI 管道中使用 Lambda 步骤的笔记本，请参阅 sagemaker-pipelines-lambda-step .ipynb。](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb)

**重要**  
亚马逊 Pyth SageMaker on SDK v2.51.0 和亚马 SageMaker 逊 Studio Classic v3.9.1 中引入了 Lambda 步骤。您必须在使用 Lambda 步骤之前更新 Studio Classic，否则不会显示管道 DAG。要更新 Studio Classic，请参阅 [关闭并更新 Amazon SageMaker Studio 经典版](studio-tasks-update-studio.md)。

SageMaker AI 提供了 [sagemaker.lambda\$1Helper.Lambda 类来创建、更新、调用和删除 Lambda 函](https://sagemaker.readthedocs.io/en/stable/api/utility/lambda_helper.html)数。 `Lambda`具有以下签名。

```
Lambda(
    function_arn,       # Only required argument to invoke an existing Lambda function

    # The following arguments are required to create a Lambda function:
    function_name,
    execution_role_arn,
    zipped_code_dir,    # Specify either zipped_code_dir and s3_bucket, OR script
    s3_bucket,          # S3 bucket where zipped_code_dir is uploaded
    script,             # Path of Lambda function script
    handler,            # Lambda handler specified as "lambda_script.lambda_handler"
    timeout,            # Maximum time the Lambda function can run before the lambda step fails
    ...
)
```

sagemaker. [workflow.lambda\$1step。 LambdaStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.lambda_step.LambdaStep)类的`lambda_func`参数类型为`Lambda`。要调用现有 Lambda 函数，唯一的要求是将函数的 Amazon 资源名称 (ARN) 提供给 `function_arn`。如果不提供 `function_arn` 的值，则必须指定 `handler` 和以下内容之一：
+ `zipped_code_dir` - 压缩后的 Lambda 函数的路径

  `s3_bucket` - 要上传 `zipped_code_dir` 的 Amazon S3 存储桶
+ `script` - Lambda 函数脚本文件的路径

以下示例说明如何创建调用现有 Lambda 函数的 `Lambda` 步骤定义。

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda"
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

以下示例说明如何创建 `Lambda` 步骤定义，该定义使用 Lambda 函数脚本创建和调用 Lambda 函数。

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
      function_name="split-dataset-lambda",
      execution_role_arn=execution_role_arn,
      script="lambda_script.py",
      handler="lambda_script.lambda_handler",
      ...
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

**输入和输出**

如果您的 `Lambda` 函数有输入或输出，则还必须在 `Lambda` 步骤中定义这些输入或输出。

**注意**  
不得嵌套输入和输出参数。例如，如果您使用嵌套字典作为输出参数，则该字典将被视为单个字符串（例如 `{"output1": "{\"nested_output1\":\"my-output\"}"}`）。如果您提供嵌套值并尝试稍后再引用该值，则会引发不可重试的客户端错误。

定义 `Lambda` 步骤时，`inputs` 必须是键值对的字典。`inputs` 字典的每个值都必须是基元类型（字符串、整数或浮点数）。不支持嵌套对象。如果 `inputs` 值未定义，则默认为 `None`。

`outputs` 值必须是键列表。这些键是指 `Lambda` 函数输出中定义的字典。比如 `inputs`，这些键必须是基元类型，并且不支持嵌套对象。

**超时和停止行为**

`Lambda` 类有一个 `timeout` 参数，用于指定 Lambda 函数可以运行的最长时间。默认值为 120 秒，最大值为 10 分钟。如果在达到超时条件时 Lambda 函数正在运行，则 Lambda 步骤将失败；但 Lambda 函数会继续运行。

当 Lambda 步骤正在运行时，无法停止管道进程，因为无法停止 Lambda 步骤调用的 Lambda 函数。如果您在 Lambda 函数运行时停止进程，管道会等待函数完成或超时。这取决于哪个先发生。进程随即停止。如果 Lambda 函数完成，则管道进程状态为 `Stopped`。如果达到超时，则管道进程状态为 `Failed`。

## ClarifyCheck 步
<a name="step-type-clarify-check"></a>

您可以使用 `ClarifyCheck` 步骤对照先前的基准进行基准偏差检查，以进行偏差分析和实现模型可解释性。然后，您可以使用 `model.register()` 方法生成和[注册基准](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations)，并使用 `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)` 将该方法的输出传递给 [模型步骤](#step-type-model)。Amazon SageMaker 模型监控器可以将这些偏差检查基准用于您的模型终端节点。因此，您无需单独提出[基线](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html)建议。

`ClarifyCheck` 步骤还可以从模型注册表中提取偏差检查基准。该`ClarifyCheck`步骤使用 Clari SageMaker fy 预建容器。该容器提供一系列模型监测功能，包括约束建议和根据给定基线进行约束验证。有关更多信息，请参阅 [预建的 SageMaker 澄清容器](clarify-processing-job-configure-container.md)。

### 配置 ClarifyCheck 步骤
<a name="configuring-step-type-clarify"></a>

您可以将 `ClarifyCheck` 步骤配置为每次在管道中使用时仅执行以下一种类型的检查。
+ 数据偏差检查
+ 模型偏差检查
+ 模型可解释性检查

为此，请将 `clarify_check_config` 参数设置为以下检查类型值之一：
+ `DataBiasCheckConfig`
+ `ModelBiasCheckConfig`
+ `ModelExplainabilityCheckConfig`

该`ClarifyCheck`步骤启动一个处理作业，该作业运行 SageMaker AI Clarify 预建容器，并且需要[为支票和处理作业进行专用配置](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html)。 `ClarifyCheckConfig`和`CheckJobConfig`是这些配置的辅助函数。这些辅助函数与 Clarify 处理 SageMaker 作业的计算方式一致，用于检查模型偏差、数据偏差或模型可解释性。有关更多信息，请参阅 [运行 Cl SageMaker arify 处理作业以实现偏见分析和可解释性](clarify-processing-job-run.md)。

### 控制偏差检查的步骤行为
<a name="controlling-step-type-clarify"></a>

`ClarifyCheck` 步骤需要以下两个布尔标志来控制其行为：
+ `skip_check`：此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 `False`，则配置的检查类型的先前基准必须可用。
+ `register_new_baseline`：此参数表示是否可以通过步骤属性 `BaselineUsedForDriftCheckConstraints` 访问新计算的基准。如果将其设置为 `False`，则配置的检查类型的先前基准也必须可用。这可以通过 `BaselineUsedForDriftCheckConstraints` 属性访问。

有关更多信息，请参阅 [基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤](pipelines-quality-clarify-baseline-lifecycle.md)。

### 使用基准
<a name="step-type-clarify-working-with-baselines"></a>

您可以选择指定 `model_package_group_name` 来定位现有基线。然后，`ClarifyCheck` 步骤在模型软件包组中最新批准的模型软件包上提取 `DriftCheckBaselines`。

或者，您可以通过 `supplied_baseline_constraints` 参数提供先前的基准。如果同时指定 `model_package_group_name` 和 `supplied_baseline_constraints`，则 `ClarifyCheck` 步骤将使用 `supplied_baseline_constraints` 参数指定的基准。

有关使用`ClarifyCheck`步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.clarify_check_step.ClarifyCheckStep)在适用于 *Python 的亚马逊 SageMaker AI SageMaker 人工智能开发工具包*中。有关展示如何使用 Pipelines 中`ClarifyCheck`步骤的 Amazon SageMaker Studio Classic 笔记本，请参阅 [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb)。

**Example 创建 `ClarifyCheck` 步骤以进行数据偏差检查**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_bias_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']),
    label=0,
    dataset_type="text/csv",
    s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
)

data_bias_config = BiasConfig(
    label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]]  
)

data_bias_check_config = DataBiasCheckConfig(
    data_config=data_bias_data_config,
    data_bias_config=data_bias_config,
)h

data_bias_check_step = ClarifyCheckStep(
    name="DataBiasCheckStep",
    clarify_check_config=data_bias_check_config,
    check_job_config=check_job_config,
    skip_check=False,
    register_new_baseline=False
   supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## QualityCheck 步
<a name="step-type-quality-check"></a>

使用 `QualityCheck` 步骤对管道中的数据质量或模型质量进行[基线建议](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html)和漂移检查。然后，您可以使用 `model.register()` 方法生成和[注册基准](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations)，并使用 `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)` 将该方法的输出传递给 [模型步骤](#step-type-model)。

Model Monitor 可以将这些偏差检查基准用于模型端点，这样您就无需单独提出基准建议。`QualityCheck` 步骤还可以从模型注册表中提取偏差检查基准。该`QualityCheck`步骤利用了 Amazon A SageMaker I 模型监控器预先构建的容器。该容器具有一系列模型监测功能，包括约束建议、统计数据生成和对照基线进行约束验证。有关更多信息，请参阅 [Amazon SageMaker 模型监控器预建容器](model-monitor-pre-built-container.md)。

### 配置 QualityCheck 步骤
<a name="configuring-step-type-quality"></a>

您可以对 `QualityCheck` 步骤进行配置，使其每次在管道中使用时只运行以下一种检查类型。
+ 数据质量检查
+ 模型质量检查

您可以通过将 `quality_check_config` 参数设置为以下检查类型值之一来实现此目的：
+ `DataQualityCheckConfig`
+ `ModelQualityCheckConfig`

`QualityCheck` 步骤会启动一个处理作业，该作业运行 Model Monitor 预构建容器，并且需要专门的用于检查和处理作业的配置。`QualityCheckConfig` 和 `CheckJobConfig` 是这些配置的辅助功能。这些辅助功能与模型监控器为模型质量或数据质量监控创建基线的方式一致。有关 Model Monitor 基准建议的更多信息，请参阅[创建基准](model-monitor-create-baseline.md)和[创建模型质量基线](model-monitor-model-quality-baseline.md)。

### 控制偏差检查的步骤行为
<a name="controlling-step-type-quality"></a>

`QualityCheck` 步骤需要以下两个布尔标志来控制其行为：
+ `skip_check`：此参数表示是否跳过针对先前基准的偏差检查。如果将其设置为 `False`，则配置的检查类型的先前基准必须可用。
+ `register_new_baseline`：此参数表示是否可以通过步骤属性 `BaselineUsedForDriftCheckConstraints` 和 `BaselineUsedForDriftCheckStatistics` 访问新计算的基准。如果将其设置为 `False`，则配置的检查类型的先前基准也必须可用。它们可以通过 `BaselineUsedForDriftCheckConstraints` 和 `BaselineUsedForDriftCheckStatistics` 属性进行访问。

有关更多信息，请参阅[基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤](pipelines-quality-clarify-baseline-lifecycle.md)。

### 使用基准
<a name="step-type-quality-working-with-baselines"></a>

您可以通过 `supplied_baseline_statistics` 和 `supplied_baseline_constraints` 参数直接指定上一条基线。您还可以指定 `model_package_group_name` 和 `QualityCheck` 步骤，在模型软件包组中最新批准的模型软件包上拉动 `DriftCheckBaselines`。

指定以下内容时，`QualityCheck` 步骤将使用 `supplied_baseline_constraints` 和 `supplied_baseline_statistics` 在 `QualityCheck` 步骤的校验类型上指定的基线。
+ `model_package_group_name`
+ `supplied_baseline_constraints`
+ `supplied_baseline_statistics`

有关使用`QualityCheck`步骤要求的更多信息，请参阅 [sagemaker.workflow.steps。 QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.quality_check_step.QualityCheckStep)在适用于 *Python 的亚马逊 SageMaker AI SageMaker 人工智能开发工具包*中。有关展示如何使用 Pipelines 中`QualityCheck`步骤的 Amazon SageMaker Studio Classic 笔记本，请参阅 [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb)。

**Example 创建 `QualityCheck` 步骤以进行数据质量检查**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep'])
)

data_quality_check_step = QualityCheckStep(
    name="DataQualityCheckStep",
    skip_check=False,
    register_new_baseline=False,
    quality_check_config=data_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json",
    supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## EMR 步骤
<a name="step-type-emr"></a>

使用 Amazon P [ip SageMaker elines EMR 步骤可](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-overview.html)以：
+ 在运行中的 Amazon EMR 集群上处理 [Amazon EMR 步骤](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html)。
+ 让管道为您创建和管理 Amazon EMR 集群。

有关 Amazon EMR 的更多信息，请参阅 [Amazon EMR 入门](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs.html)。

EMR 步骤要求 `EMRStepConfig` 包括 Amazon EMR 集群使用的 JAR 文件的位置以及要传递的任何参数。如果您要在运行中的 EMR 集群上运行该步骤，还需提供 Amazon EMR 集群 ID。您还可以通过集群配置，在它为您创建、管理和终止的集群上运行 EMR 步骤。以下几节包括演示这两种方法的示例笔记本的示例和链接。

**注意**  
EMR 步骤要求传递给管道的角色具有其他权限。将 [AWS 托管式策略附加到管道角色：`AmazonSageMakerPipelinesIntegrations`](https://docs.aws.amazon.com/sagemaker/latest/dg/security-iam-awsmanpol-pipelines.html#security-iam-awsmanpol-AmazonSageMakerPipelinesIntegrations) 到管道角色，或确保角色包含该策略中的权限。
如果您在运行中的集群上处理 EMR 步骤，则只能使用处于以下状态之一的集群：  
`STARTING`
`BOOTSTRAPPING`
`RUNNING`
`WAITING`
如果您在正在运行的集群上处理 EMR 步骤，则在 EMR 集群上最多可以有 256 个处于 `PENDING` 状态的 EMR 步骤。提交的 EMR 步骤超过此限制会导致管道执行失败。您可以考虑使用[管道步骤的重试策略](pipelines-retry-policy.md)。
您可以指定集群 ID 或集群配置，但不能同时指定两者。
EMR 步骤依靠 Amazon EventBridge 来监控 EMR 步骤或集群状态的变化。如果您在正在运行的集群上处理 Amazon EMR 作业，则 EMR 步骤将使用 `SageMakerPipelineExecutionEMRStepStatusUpdateRule` 规则来监控 EMR 步骤的状态。如果您在 EMR 步骤创建的集群上处理作业，则该步骤会使用 `SageMakerPipelineExecutionEMRClusterStatusRule` 规则监控集群状态的变化。如果您在 AWS 账户中看到其中任何一条 EventBridge 规则，请不要将其删除，否则您的 EMR 步骤可能无法完成。

**在您的管道中添加 Amazon EMR 步骤**

要向您的管道添加 EMR 步骤，请执行以下操作：
+ 按照[启动 Amazon Studio 中的说明打开 SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html) 控制台。
+ 在左侧导航窗格中，选择 **Pipelines**。
+ 选择**创建**。
+ 选择**空白**。
+ 在左侧边栏中选择**处理数据**，然后将其拖到画布上。
+ 在画布中，选择添加的**处理数据**步骤。
+ 在右侧边栏的 “模式” 下，选择 “**EMR（托管）**”。
+ 在右侧边栏中，填写 “**设置” 和 “详细信息**” 选项卡中的表单。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.fail\$1step。 EMRstep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.emr_step.EMRStep)。

**在正在运行的 Amazon EMR 集群上启动新作业**

要在运行中的 Amazon EMR 集群上启动新作业，请将集群 ID 作为字符串传递给 `EMRStep` 的 `cluster_id` 参数。以下示例演示了此过程。

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

step_emr = EMRStep (
    name="EMRSampleStep", # required
    cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster
    step_config=emr_config, # required
    display_name="My EMR Step",
    description="Pipeline step to execute EMR job"
)
```

有关指导您完成完整示例的示例笔记本，请参阅[运行 EMR 集群的 Pipelines EMR 步骤](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-running-emr-cluster.ipynb)。

**在新的 Amazon EMR 集群上启动新作业**

要在 `EMRStep` 为您创建的新集群上启动新作业，请将集群配置作为字典提供。字典必须与[RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html)请求具有相同的结构。但是，请勿在集群配置中包含以下字段：
+ [`Name`]
+ [`Steps`]
+ [`AutoTerminationPolicy`]
+ [`Instances`][`KeepJobFlowAliveWhenNoSteps`]
+ [`Instances`][`TerminationProtected`]

所有其他 `RunJobFlow` 参数均可在您的集群配置中使用。有关请求语法的详细信息，请参阅[RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html)。

下面的示例将集群配置传递给 EMR 步骤定义。这将提示在新的 EMR 集群上启动新作业的步骤。此示例中的 EMR 集群配置包括主节点和核心 EMR 集群节点的规范。有关 Amazon EMR 节点类型的更多信息，请参阅[了解节点类型：主节点、核心节点和任务节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-master-core-task-nodes.html)。

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_step_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

# include your cluster configuration as a dictionary
emr_cluster_config = {
    "Applications": [
        {
            "Name": "Spark", 
        }
    ],
    "Instances":{
        "InstanceGroups":[
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": "m5.2xlarge"
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 2,
                "InstanceType": "m5.2xlarge"
            }
        ]
    },
    "BootstrapActions":[],
    "ReleaseLabel": "emr-6.6.0",
    "JobFlowRole": "job-flow-role",
    "ServiceRole": "service-role"
}

emr_step = EMRStep(
    name="emr-step",
    cluster_id=None,
    display_name="emr_step",
    description="MyEMRStepDescription",
    step_config=emr_step_config,
    cluster_config=emr_cluster_config
)
```

有关指导您完成完整示例的示例笔记本，请参阅[使用集群生命周期管理的管道 EMR 步骤](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb)。

## EMR 无服务器步骤
<a name="step-type-serverless"></a>

要向您的管道中添加 EMR 无服务器步骤，请执行以下操作：
+ 按照[启动 Amazon Studio 中的说明打开 SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html) 控制台。
+ 在左侧导航窗格中，选择 **Pipelines**。
+ 选择**创建**。
+ 选择**空白**。
+ 在左侧边栏中选择**处理数据**，然后将其拖到画布上。
+ 在画布中，选择添加的**处理数据**步骤。
+ 在右侧边栏的模式下，选择 **EMR（无服务器）**。
+ 在右侧边栏中，填写 “**设置” 和 “详细信息**” 选项卡中的表单。

## 笔记本作业步骤
<a name="step-type-notebook-job"></a>

使用`NotebookJobStep`以非交互方式运行您的 SageMaker Notebook Job 作为工作流步骤。如果您在 Pipelines drag-and-drop 用户界面中构建管道，请使用[执行代码步骤](#step-type-executecode)来运行您的笔记本。有关 SageMaker 笔记本作业的更多信息，请参阅[SageMaker 笔记本职位](notebook-auto-run.md)。

`NotebookJobStep` 至少需要输入笔记本、映像 URI 和内核名称。有关 Notebook Job 步骤要求以及可以设置为自定义步骤的其他参数的更多信息，请参阅 [sagemaker.workflow.steps。 NotebookJobStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.notebook_job_step.NotebookJobStep)。

下面的示例使用最小参数定义了 `NotebookJobStep`。

```
from sagemaker.workflow.notebook_job_step import NotebookJobStep


notebook_job_step = NotebookJobStep(
    input_notebook=input_notebook,
    image_uri=image_uri,
    kernel_name=kernel_name
)
```

您的工作`NotebookJobStep`流步骤被视为 SageMaker 笔记本作业。因此，在 Studio Classic UI 笔记本作业控制面板中，通过使用 `tags` 参数包含特定标签来跟踪执行状态。有关包含标签的更多详情，请参阅 [在 Studio UI 面板上查看笔记本作业](create-notebook-auto-run-sdk.md#create-notebook-auto-run-dash)。

此外，如果您使用 SageMaker Python SDK 安排笔记本作业，则只能指定某些图像来运行笔记本作业。有关更多信息，请参阅 [SageMaker AI Python SDK 笔记本作业的图像限制](notebook-auto-run-constraints.md#notebook-auto-run-constraints-image-sdk)。

## Fail 步骤
<a name="step-type-fail"></a>

当未达到所需条件或状态时，使用 “失败” 步骤停止 Amazon Pipelin SageMaker es 的执行。在 Fail 步骤中还可以输入自定义错误信息，说明管道执行失败的原因。

**注意**  
当一个 Fail 步骤和其他管道步骤同时执行时，管道在所有并发步骤完成之前不会终止。

### 使用 Fail 步骤的限制
<a name="step-type-fail-limitations"></a>
+ 您不能将 Fail 步骤添加到其他步骤的 `DependsOn` 列表中。有关更多信息，请参阅 [步骤之间的自定义依赖关系](build-and-manage-steps.md#build-and-manage-custom-dependency)。
+ 其他步骤不能引用 Fail 步骤。它*始终* 是管道执行的最后一步。
+ 您不能重试以 Fail 步骤结束的管道执行。

您可以创建静态文本字符串形式的 Fail 步骤错误信息。另外，如果您使用 SDK，也可以使用 [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html)、[Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html?highlight=Join#sagemaker.workflow.functions.Join) 操作或其他[步骤属性](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-properties)来创建信息量更大的错误信息。

------
#### [ Pipeline Designer ]

要在管道中添加 Fail 步骤，请执行以下操作：

1. 按照 [启动亚马逊 SageMaker Studio](studio-updated-launch.md) 中的说明打开 Studio 管理控制台。

1. 在左侧导航窗格中，选择 **Pipelines**。

1. 选择**创建**。

1. 选择**空白**。

1. 在左侧边栏中选择 **Fail**，然后将其拖到画布上。

1. 在画布中，选择添加的 **Fail** 步骤。

1. 在右侧边栏中，填写**设置**和**详情**标签中的表格。有关这些选项卡中字段的信息，请参阅 [sagemaker.workflow.fail\$1step。 FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep)。

1. 如果画布上有任何步骤紧接在您添加的 **Fail** 步骤之前，请单击并拖动光标从该步骤到 **Fail** 步骤，以创建边缘。

1. 如果画布上有任何步骤紧接着您添加的 **Fail** 步骤，请单击并拖动光标从 **Fail** 步骤到该步骤，以创建边缘。

------
#### [ SageMaker Python SDK ]

**Example**  
以下示例代码片段使用了一个 `FailStep`（带有 `ErrorMessage`，并配置了管道参数）和一项 `Join` 操作。  

```
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterInteger

mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5)
step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(
        on=" ", values=["Execution failed due to MSE >", mse_threshold_param]
    ),
)
```

------

# 添加集成
<a name="build-and-manage-steps-integration"></a>

MLflow 集成允许您 MLflow 与管道一起使用来选择跟踪服务器或无服务器应用程序、选择实验和记录指标。

## 重要概念
<a name="add-integration-key-concepts"></a>

**默认应用程序创建**-当您进入管道可视化编辑器时，将创建默认 MLflow 应用程序。

**集成面板**-包含一个新的集成面板 MLflow，您可以选择和配置该面板。

**更新应用程序和实验**-在管道执行期间覆盖所选应用程序和实验的选项。

## 工作原理
<a name="add-integration-how-it-works"></a>
+ 前往**管道可视化编辑器**
+ 在工具栏上选择 “**集成**”
+ 选择 **MLflow**
+ 配置 MLflow 应用程序并进行实验

## 屏幕截图示例
<a name="add-integration-example-screenshots"></a>

集成侧面板

![\[待办事项描述。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/screenshot-pipeline-1.png)


MLflow 配置

![\[待办事项描述。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/screenshot-pipeline-2.png)


如何在管道执行期间重写实验

![\[待办事项描述。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/screenshot-pipeline-3.png)


## 步骤属性
<a name="build-and-manage-properties"></a>

使用 `properties` 属性在管道中的步骤之间添加数据依赖关系。管道使用这些数据依赖关系来根据管道定义构建 DAG。这些属性可以作为占位符值引用，并在运行时解析。

Pi `properties` pelines 步骤的属性与`Describe`调用相应 SageMaker AI 作业类型返回的对象相匹配。对于每种作业类型，`Describe` 调用都会返回以下响应对象：
+ `ProcessingStep` – [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html)
+ `TrainingStep` – [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html)
+ `TransformStep` – [DescribeTransformJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTransformJob.html)

要在创建数据依赖关系期间查看每种步骤类型哪些属性是可引用的，请参阅 [Amaz SageMaker on Python SDK](https://sagemaker.readthedocs.io/en/stable) 中的*[数据依赖关系-属性参考](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)*。

## 步骤并行
<a name="build-and-manage-parallelism"></a>

当一个步骤不依赖于任何其他步骤时，它会在管道执行后立即运行。但是，并行执行过多的管道步骤可能会很快耗尽可用资源。使用 `ParallelismConfiguration` 控制管道执行的并发步骤数。

以下示例使用 `ParallelismConfiguration` 将并发步骤数限制设置为 5。

```
pipeline.create(
    parallelism_config=ParallelismConfiguration(5),
)
```

## 步骤之间的数据依赖性
<a name="build-and-manage-data-dependency"></a>

您可以通过指定步骤之间的数据关系来定义 DAG 的结构。要在步骤之间创建数据依赖关系，请将一个步骤的属性作为输入传递给管道中的另一个步骤。接收输入的步骤要等到提供输入的步骤完成运行后才会开始。

数据依赖关系使用以下格式的 JsonPath 符号。这种格式会遍历 JSON 属性文件。这意味着您可以根据需要追加任意数量的*<property>*实例，以达到文件中所需的嵌套属性。有关 JsonPath 符号的更多信息，请参阅 [JsonPath repo](https://github.com/json-path/JsonPath)。

```
<step_name>.properties.<property>.<property>
```

以下内容说明了如何使用处理步骤的 `ProcessingOutputConfig` 属性指定 Amazon S3 存储桶。

```
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
```

要创建数据依赖关系，请按如下方式将存储桶传递给训练步骤。

```
from sagemaker.workflow.pipeline_context import PipelineSession

sklearn_train = SKLearn(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="CensusTrain",
    step_args=sklearn_train.fit(inputs=TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"].S3Output.S3Uri
    ))
)
```

要在创建数据依赖关系期间查看每种步骤类型哪些属性是可引用的，请参阅 [Amaz SageMaker on Python SDK](https://sagemaker.readthedocs.io/en/stable) 中的*[数据依赖关系-属性参考](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)*。

## 步骤之间的自定义依赖关系
<a name="build-and-manage-custom-dependency"></a>

指定数据依赖关系时，Pipelines 会在各步骤之间提供数据连接。另外，一个步骤可以访问前一个步骤的数据，而无需直接使用管道。在这种情况下，您可以创建一个自定义依赖关系，告诉 Pipelines 在另一个步骤运行完之前不要启动该步骤。您可以通过指定步骤的 `DependsOn` 属性来创建自定义依赖关系。

例如，以下内容定义了步骤 `C`，该步骤在步骤 `A` 和步骤 `B` 都完成运行后才开始。

```
{
  'Steps': [
    {'Name':'A', ...},
    {'Name':'B', ...},
    {'Name':'C', 'DependsOn': ['A', 'B']}
  ]
}
```

如果依赖关系会产生循环依赖关系，Pipelines 会抛出验证异常。

以下示例创建了一个在处理步骤完成运行后开始的训练步骤。

```
processing_step = ProcessingStep(...)
training_step = TrainingStep(...)

training_step.add_depends_on([processing_step])
```

以下示例创建了一个训练步骤，该步骤要等到两个不同的处理步骤完成运行后才会开始。

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(...)

training_step.add_depends_on([processing_step_1, processing_step_2])
```

下面提供了另一种创建自定义依赖关系的方法。

```
training_step.add_depends_on([processing_step_1])
training_step.add_depends_on([processing_step_2])
```

以下示例创建了一个训练步骤，该步骤接收来自一个处理步骤的输入，然后等待另一个处理步骤完成运行。

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(
    ...,
    inputs=TrainingInput(
        s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri
    )

training_step.add_depends_on([processing_step_2])
```

以下示例说明如何检索步骤的自定义依赖关系的字符串列表。

```
custom_dependencies = training_step.depends_on
```

## 自定义映像一步到位
<a name="build-and-manage-images"></a>

 在管道中创建步骤时，您可以使用任何可用的 SageMaker AI [深度学习容器镜像](https://github.com/aws/deep-learning-containers/blob/master/available_images.md)。

您还可以在管道步骤中使用自己的容器。由于无法在 Studio Classic 中创建映像，因此在使用 Pipelines 之前，必须使用其他方法创建映像。

要在为管道创建步骤时使用自己的容器，请在估算器定义中包含映像 URI。有关将自己的容器与 SageMaker AI 配合使用的更多信息，请参阅将 [Docker 容器与 SageMaker AI 配合使用](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html)。

# Lift-and-shift 使用 @step 装饰器的 Python 代码
<a name="pipelines-step-decorator"></a>

`@step` 装饰器是将本地机器学习 (ML) 代码转换为一个或多个管道步骤的功能。您可以像编写任何 ML 项目一样编写 ML 函数。在本地测试或使用装饰器作为训练作业进行测试后，您可以通过添加`@remote`装饰器将该函数转换为 A SageMaker I 管道步骤。`@step`然后，您可以将 `@step` 装饰函数调用的输出作为一个步骤传递给 Pipelines，以创建并运行管道。您还可以使用 `@step` 装饰器串联一系列函数，创建多步骤有向无环图 (DAG) 管道。

使用 `@step` 装饰器的设置与使用 `@remote` 装饰器的设置相同。关于如何[设置环境](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html#train-remote-decorator-env)和[使用配置文件](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)设置默认值，请参阅远程功能文档。有关 `@step` 装饰器的更多信息，请参阅 [sagemaker.workflow.function\$1step.step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.step)。

要查看演示如何使用 `@step` 装饰器的示例笔记本，请参阅 [@step 装饰器示例笔记本](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-pipelines/step-decorator)。

下文将介绍如何使用 `@step` 装饰器注释本地 ML 代码以创建步骤、使用步骤创建和运行管道，以及如何根据使用场景定制体验。

**Topics**
+ [

# 使用 `@step` 装饰函数创建管道
](pipelines-step-decorator-create-pipeline.md)
+ [

# 运行管道
](pipelines-step-decorator-run-pipeline.md)
+ [

# 配置您的管道
](pipelines-step-decorator-cfg-pipeline.md)
+ [

# 最佳实践
](pipelines-step-decorator-best.md)
+ [

# 限制
](pipelines-step-decorator-limit.md)

# 使用 `@step` 装饰函数创建管道
<a name="pipelines-step-decorator-create-pipeline"></a>

您可以使用 `@step` 装饰器将 Python 函数转换为管道步骤，在这些函数之间创建依赖关系以创建管道图（或有向无环图 (DAG)），并将该图的叶节点作为步骤列表传递给管道，从而创建管道。下文将结合示例详细解释这一程序。

**Topics**
+ [

## 将函数转换为步骤
](#pipelines-step-decorator-run-pipeline-convert)
+ [

## 在各步骤之间建立依赖关系
](#pipelines-step-decorator-run-pipeline-link)
+ [

## 使用 `ConditionStep` 和 `@step` 装饰步骤
](#pipelines-step-decorator-condition)
+ [

## 使用步骤的 `DelayedReturn` 输出定义管道
](#pipelines-step-define-delayed)
+ [

## 创建管道
](#pipelines-step-decorator-pipeline-create)

## 将函数转换为步骤
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

要使用 `@step` 装饰器创建一个步骤，请使用 `@step` 对功能进行注释。下面的示例展示了一个预处理数据的 `@step` 装饰功能。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

当你调用`@step`装饰过的函数时， SageMaker AI 会返回一个`DelayedReturn`实例，而不是运行该函数。`DelayedReturn` 实例是该功能实际返回值的代理。`DelayedReturn` 实例可以作为参数传递给其他函数，也可以作为步骤直接传递给管道实例。有关该`DelayedReturn`课程的信息，请参阅 [sagemaker.workflow.function\$1step。 DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)。

## 在各步骤之间建立依赖关系
<a name="pipelines-step-decorator-run-pipeline-link"></a>

在两个步骤之间创建依赖关系时，就在管道图中的步骤之间创建了连接。下文将介绍在管道步骤之间创建依赖关系的多种方法。

### 通过输入参数实现数据依赖
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

将一个函数的 `DelayedReturn` 输出作为另一个函数的输入，会自动在管道 DAG 中创建数据依赖关系。在下面的示例中，将 `preprocess` 函数的 `DelayedReturn` 输出传递给 `train` 函数会在 `preprocess` 和 `train` 之间产生依赖关系。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

上一个示例定义了一个训练函数，该函数用 `@step` 修饰。调用该函数时，它会接收预处理管道步骤的 `DelayedReturn` 输出作为输入。调用训练函数会返回另一个 `DelayedReturn` 实例。该实例保存了该函数中定义的所有先前步骤的信息（即本例中的 `preprocess` 步骤），这些步骤构成了管道 DAG。

在上一个示例中，`preprocess` 函数返回一个值。有关列表或元组等更复杂的返回类型，请参阅 [限制](pipelines-step-decorator-limit.md)。

### 定义自定义依赖关系
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

在上一个示例中，`train` 函数接收了 `preprocess` 的 `DelayedReturn` 输出，并创建了一个依赖关系。如果想明确定义依赖关系，而不传递前一步的输出结果，请在步骤中使用 `add_depends_on` 函数。您可以使用 `get_step()` 函数从其 `DelayedReturn` 实例中获取基础步骤，然后将依赖关系作为输入调用 `add_depends_on`\$1on。要查看 `get_step()` 函数定义，请参阅 [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)。下面的示例演示了如何使用 `get_step()` 和 `add_depends_on()` 在 `preprocess` 和 `train` 之间创建依赖关系。

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### 将数据从 `@step` 装饰函数传递到传统管道步骤
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

您可以创建一个包含 `@step` 装饰步骤和传统管道步骤的管道，并在两者之间传递数据。例如，您可以使用 `ProcessingStep` 处理数据，并将处理结果传递给 `@step` 装饰的训练函数。在下面的示例中，`@step` 装饰的训练步骤引用了处理步骤的输出。

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## 使用 `ConditionStep` 和 `@step` 装饰步骤
<a name="pipelines-step-decorator-condition"></a>

管道支持一个 `ConditionStep` 类，该类会评估前几个步骤的结果，以决定在管道中采取什么行动。您也可以在 `@step` 装饰的步骤中使用 `ConditionStep`。要使用 `ConditionStep` 中任何 `@step` 装饰步骤的输出，请将该步骤的输出作为 `ConditionStep` 的参数输入。在下面的示例中，条件步骤接收 `@step` 装饰模型评测步骤的输出。

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## 使用步骤的 `DelayedReturn` 输出定义管道
<a name="pipelines-step-define-delayed"></a>

无论是否使用 `@step` 装饰器，定义管道的方式都是一样的。向管道传递 `DelayedReturn` 实例时，无需传递完整的步骤列表来构建管道。SDK 会根据您定义的依赖关系自动推导出前面的步骤。您传递给管道的所有上一步骤 `Step` 对象或相关 `DelayedReturn` 对象都包含在管道图中。在下面的示例中，管道为 `train` 函数接收 `DelayedReturn` 对象。 SageMaker AI 将该`preprocess`步骤作为上一步添加到管道图中。`train`

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

如果步骤之间没有数据或自定义依赖关系，并且并行运行多个步骤，管道图就会有多个叶节点。如下例所示，将所有这些叶节点以列表形式传递给管道定义中的 `steps` 参数：

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

管道运行时，两个步骤并行。

您只需将图的叶节点传递给管道，因为叶节点包含通过数据或自定义依赖关系定义的所有先前步骤的信息。在编译管道时， SageMaker AI 还会推断出构成管道图的所有后续步骤，并将每个步骤作为单独的步骤添加到管道中。

## 创建管道
<a name="pipelines-step-decorator-pipeline-create"></a>

通过调用 `pipeline.create()` 创建管道，如以下代码所示。有关 `create()` 的详细信息，请参阅 [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)。

```
role = "pipeline-role"
pipeline.create(role)
```

当你调用时`pipeline.create()`， SageMaker AI 会编译所有定义为管道实例一部分的步骤。 SageMaker AI 将序列化函数、参数和所有其他与步骤相关的项目上传到 Amazon S3。

数据按照以下结构存放在 S3 存储桶中：

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`在 SageMaker AI 配置文件中定义，适用于整个管道。如果未定义，则使用默认 SageMaker AI 存储桶。

**注意**  
每次 SageMaker AI 编译管道时， SageMaker AI 都会将步骤的序列化函数、参数和依赖项保存在带有当前时间戳的文件夹中。每次运行 `pipeline.create()`、`pipeline.update()`、`pipeline.upsert()` 或 `pipeline.definition()` 时都会出现这种情况。

# 运行管道
<a name="pipelines-step-decorator-run-pipeline"></a>

以下页面介绍如何使用 Amazon Pipelines 运行 SageMaker 管道，无论是使用 SageMaker AI 资源还是本地运行。

使用该`pipeline.start()`函数开始新的管道运行，就像传统的 A SageMaker I 管道运行一样。有关 `start()` 函数的信息，请参阅 [sagemaker.workflow.pipeline.Pipeline.start](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.start)。

**注意**  
使用 `@step` 装饰器定义的步骤将作为训练作业运行。因此，请注意以下限制：  
账户中的实例限制和训练作业限制。相应更新您的限制，以避免任何节流或资源限制问题。
管道中每运行一个训练步骤的相关货币成本。有关更多详情，请参阅 [Amazon SageMaker 定价](https://aws.amazon.com/sagemaker/pricing/)。

## 从本地运行的管道中读取结果
<a name="pipelines-step-decorator-run-pipeline-retrieve"></a>

要查看管道运行的任何步骤的结果，请使用 [execution.result()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline._PipelineExecution.result           )，如以下代码段所示：

```
execution = pipeline.start()
execution.result(step_name="train")
```

**注意**  
Pipelines 在本地模式下不支持 `execution.result()`。

每次只能检索一个步骤的结果。如果步骤名称由 SageMaker AI 生成，则可以通过以下方式调用`list_steps`来检索步骤名称：

```
execution.list_step()
```

## 在本地运行管道
<a name="pipelines-step-decorator-run-pipeline-local"></a>

您可以像运行传统管道步骤一样，在本地运行带有 `@step` 装饰步骤的管道。有关本地模式管道运行的详细信息，请参阅 [使用本地模式运行管道](pipelines-local-mode.md)。要使用本地模式，请在管道定义中使用 `LocalPipelineSession` 代替 `SageMakerSession`，如下例所示：

```
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model
    
step_train_result = train()

local_pipeline_session = LocalPipelineSession()

local_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=local_pipeline_session # needed for local mode
)

local_pipeline.create(role_arn="role_arn")

# pipeline runs locally
execution = local_pipeline.start()
```

# 配置您的管道
<a name="pipelines-step-decorator-cfg-pipeline"></a>

建议您使用 SageMaker AI 配置文件来设置管道的默认值。有关 SageMaker AI 配置文件的信息，请参阅[在 SageMaker Python SDK 中配置和使用默认值](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk)。添加到配置文件中的任何配置都适用于管道中的所有步骤。如果您要覆盖任何步骤的选项，请在 `@step` 装饰器参数中提供新值。下面的主题介绍了如何设置配置文件。

配置文件中 `@remote` 装饰器的配置与 `@step` 装饰器的配置完全相同。要在配置文件中设置管道角色 ARN 和管道标签，请使用以下代码段中的 `Pipeline` 部分：

```
SchemaVersion: '1.0'
SageMaker:
  Pipeline:
    RoleArn: 'arn:aws:iam::555555555555:role/IMRole'
    Tags:
    - Key: 'tag_key'
      Value: 'tag_value'
```

在配置文件中设置的大部分默认值，都可以通过向 `@step` 装饰器传递新值来覆盖。例如，您可以覆盖配置文件中为预处理步骤设置的实例类型，如下例所示：

```
@step(instance_type="ml.m5.large")
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
```

一些参数不是`@step`装饰器参数列表的一部分，只能通过 SageMaker AI 配置文件为整个管道配置这些参数。它们列举如下：
+ `sagemaker_session`(`sagemaker.session.Session`): SageMaker AI 委托服务调用的底层 SageMaker AI 会话。如果未指定，则使用默认配置创建会话，如下所示：

  ```
  SageMaker:
    PythonSDK:
      Modules:
        Session:
          DefaultS3Bucket: 'default_s3_bucket'
          DefaultS3ObjectKeyPrefix: 'key_prefix'
  ```
+ `custom_file_filter`（`CustomFileFilter)`：`CustomFileFilter` 对象，用于指定要包含在管道步骤中的本地目录和文件。如果未指定，默认为 `None`。要使 `custom_file_filter` 生效，您必须将 `IncludeLocalWorkdir` 设置为 `True`。下面的示例显示了忽略所有笔记本文件以及名为 `data` 的文件和目录的配置。

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          IncludeLocalWorkDir: true
          CustomFileFilter: 
            IgnoreNamePatterns: # files or directories to ignore
            - "*.ipynb" # all notebook files
            - "data" # folder or file named "data"
  ```

  有关如何使用 `IncludeLocalWorkdir` 和 `CustomFileFilter` 的更多详情，请参阅 [将模块化代码用于 @remote 装饰器](train-remote-decorator-modular.md)。
+ `s3_root_uri (str)`：Amazon S3 根文件夹， SageMaker AI 将代码档案和数据上传到该文件夹。如果未指定，则使用默认 SageMaker AI 存储桶。
+ `s3_kms_key (str)`：用于加密输入和输出数据的键。您只能在 SageMaker AI 配置文件中配置此参数，并且该参数适用于管道中定义的所有步骤。如果未指定，默认为 `None`。请参阅下面的 S3 KMS 密钥配置示例片段：

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          S3KmsKeyId: 's3kmskeyid'
          S3RootUri: 's3://amzn-s3-demo-bucket/my-project
  ```

# 最佳实践
<a name="pipelines-step-decorator-best"></a>

以下各节建议了在管道步骤中使用 `@step` 装饰器时应遵循的最佳实践。

## 使用暖池
<a name="pipelines-step-decorator-best-warmpool"></a>

要加快管道步骤运行速度，可使用为训练作业提供的热池功能。您可以通过为 `@step` 装饰器提供 `keep_alive_period_in_seconds` 参数来开启暖池功能，如以下代码段所示：

```
@step(
   keep_alive_period_in_seconds=900
)
```

有关暖池的更多信息，请参阅 [SageMaker AI 管理的暖池](train-warm-pools.md)。

## 目录结构
<a name="pipelines-step-decorator-best-dir"></a>

建议您在使用 `@step` 装饰器时使用代码模块。将调用步骤函数和定义管道的 `pipeline.py` 模块放在工作区的根部。建议的结构如下：

```
.
├── config.yaml # the configuration file that define the infra settings
├── requirements.txt # dependencies
├── pipeline.py  # invoke @step-decorated functions and define the pipeline here
├── steps/
| ├── processing.py
| ├── train.py
├── data/
├── test/
```

# 限制
<a name="pipelines-step-decorator-limit"></a>

以下各节概述了在管道步骤中使用 `@step` 装饰器时应注意的限制。

## 函数参数限制
<a name="pipelines-step-decorator-arg"></a>

向 `@step` 装饰函数传递输入参数时，会受到以下限制：
+ 您可以将 `DelayedReturn`、`Properties`（其他类型的步骤）、`Parameter` 和 `ExecutionVariable` 对象作为参数传递给 `@step` 装饰函数。但 `@step` 装饰的函数不支持将 `JsonGet` 和 `Join` 对象作为参数。
+ 您不能通过 `@step` 函数直接访问管道变量。下面的示例会产生一个错误：

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func():
      print(param)
  
  func() # this raises a SerializationError
  ```
+ 您不能将管道变量嵌套到另一个对象中，并将其传递给 `@step` 函数。下面的示例会产生一个错误：

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func(arg):
      print(arg)
  
  func(arg=(param,)) # this raises a SerializationError because param is nested in a tuple
  ```
+ 由于函数的输入和输出是序列化的，因此可以作为函数的输入或输出传递的数据类型受到限制。更多详情，请参阅 [调用远程函数](train-remote-decorator-invocation.md) 中的*数据序列化和反序列化*部分。同样的限制也适用于 `@step` 装饰函数。
+ 任何具有 boto 客户端的对象都不能被序列化，因此不能将此类对象作为 `@step` 装饰函数的输入或输出。例如， SageMaker Python SDK 客户端类（例如`Estimator``Predictor`、和）`Processor`无法序列化。

## 功能导入
<a name="pipelines-step-decorator-best-import"></a>

应在函数内部而不是外部导入步骤所需的库。如果在全局范围内导入，就有可能在序列化函数时发生导入碰撞。例如，`sklearn.pipeline.Pipeline` 可以被 `sagemaker.workflow.pipeline.Pipeline` 覆盖。

## 引用函数返回值的子成员
<a name="pipelines-step-decorator-best-child"></a>

如果引用 `@step` 装饰函数返回值的子成员，则会受到以下限制：
+ 如果 `DelayedReturn` 对象表示元组、列表或 dict，则可以用 `[]` 引用子成员，如下例所示：

  ```
  delayed_return[0]
  delayed_return["a_key"]
  delayed_return[1]["a_key"]
  ```
+ 由于调用函数时无法知道基础元组或列表的确切长度，因此无法解包元组或列表输出。下面的示例会产生一个错误：

  ```
  a, b, c = func() # this raises ValueError
  ```
+ 您不能遍历 `DelayedReturn` 对象。以下示例会引发错误：

  ```
  for item in func(): # this raises a NotImplementedError
  ```
+ 您不能使用“`.`”引用任意子成员。下面的示例会产生一个错误：

  ```
  delayed_return.a_child # raises AttributeError
  ```

## 不支持的现有管道功能
<a name="pipelines-step-decorator-best-unsupported"></a>

您不能使用具有以下管道功能的 `@step` 装饰器：
+ [Pipeline 步骤缓存](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)
+ [Property 文件](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html#build-and-manage-propertyfile-property)

# 在步骤之间传递数据
<a name="build-and-manage-propertyfile"></a>

使用 Amazon Pipelines 构建 SageMaker 管道时，您可能需要将数据从一个步骤传递到下一个步骤。例如，您可能希望将训练步骤生成的模型构件作为模型评测或部署步骤的输入。您可以使用该功能创建相互依赖的管道步骤，并构建您的 ML 工作流程。

当您需要从管道步骤的输出中获取信息时，可以使用 `JsonGet`。`JsonGet` 可以帮助您从 Amazon S3 或属性文件中提取信息。下文将介绍使用 `JsonGet` 提取步进输出的方法。

## 使用 Amazon S3 在不同步骤之间传递数据
<a name="build-and-manage-propertyfile-s3"></a>

您可以在 `ConditionStep` 中使用 `JsonGet`，直接从 Amazon S3 获取 JSON 输出。Amazon S3 URI 可以是包含原始字符串、管道运行变量或管道参数的 `Std:Join` 函数。下面的示例展示了如何在 `ConditionStep` 中使用 `JsonGet`：

```
# Example json file in s3 bucket generated by a processing_step
{
   "Output": [5, 10]
}

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name="<step-name>",
        s3_uri="<s3-path-to-json>",
        json_path="Output[1]"
    ),
    right=6.0
)
```

如果在条件步骤中使用带有 Amazon S3 路径的 `JsonGet`，则必须在条件步骤和生成 JSON 输出的步骤之间明确添加依赖关系。在下面的示例中，条件步骤的创建依赖于处理步骤：

```
cond_step = ConditionStep(
        name="<step-name>",
        conditions=[cond_lte],
        if_steps=[fail_step],
        else_steps=[register_model_step],
        depends_on=[processing_step],
)
```

## 使用属性文件在各步骤之间传递数据
<a name="build-and-manage-propertyfile-property"></a>

使用属性文件存储处理步骤输出中的信息。这在分析处理步骤的结果以决定如何执行条件步骤时特别有用。该`JsonGet`函数处理属性文件，并允许您使用 JsonPath 符号来查询属性 JSON 文件。有关 JsonPath 符号的更多信息，请参阅 [JsonPath repo](https://github.com/json-path/JsonPath)。

要存储属性文件以备日后使用，必须先创建一个具有以下格式的 `PropertyFile` 实例。`path` 参数是保存属性文件的 JSON 文件的名称。任何 `output_name` 都必须与您在处理步骤中定义的 `ProcessingOutput` 的 `output_name` 相匹配。这使属性文件能够捕获步骤中的 `ProcessingOutput`。

```
from sagemaker.workflow.properties import PropertyFile

<property_file_instance> = PropertyFile(
    name="<property_file_name>",
    output_name="<processingoutput_output_name>",
    path="<path_to_json_file>"
)
```

创建`ProcessingStep`实例时，添加`property_files`参数以列出 Amazon SageMaker Pipelines 服务必须索引的所有参数文件。这将保存属性文件以备日后使用。

```
property_files=[<property_file_instance>]
```

要在条件步骤中使用属性文件，请将 `property_file` 添加到传递给条件步骤的条件中（如以下示例所示），以便使用 `json_path` 参数查询 JSON 文件中的所需属性。

```
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=<property_file_instance>,
        json_path="mse"
    ),
    right=6.0
)
```

有关更深入的示例，请参阅 [Amaz SageMaker on Python 软件开发工具包](https://sagemaker.readthedocs.io/en/stable)中的*[属性文件](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#property-file)*。

# 缓存管道步骤
<a name="pipelines-caching"></a>

在 Amazon Pipelin SageMaker es 中，您可以在重新运行管道时使用步骤缓存来节省时间和资源。当某个步骤的配置和输入相同时，步骤缓存会重复使用该步骤前一次成功运行的输出（而不是重新计算）。这可帮助您在使用相同参数重新运行管道时获得一致的结果。下面的主题将向您介绍如何为管道配置和开启步骤缓存。

使用步骤签名缓存时，Pipelines 会尝试查找当前管道步骤中某些属性值相同的前一次运行。如果发现，Pipelines 会传播上一次运行的输出结果，而不是重新计算步骤。所检查的属性特定于步骤类型，并在 [按管道步骤类型划分的默认缓存键属性](pipelines-default-keys.md) 中列出。

您必须选择步骤缓存 - 默认情况下，它处于关闭状态。开启步骤缓存时，还必须定义超时。此超时定义了之前运行可以持续多久才能继续作为重复使用的候选项。

步骤缓存仅考虑成功的运行 - 它从不重复使用失败的运行。如果超时时间内有多次成功运行，Pipelines 会使用最近一次成功运行的结果。如果在超时时间内没有成功运行，Pipelines 会重新运行该步骤。如果执行程序发现之前的运行符合条件但仍在进行中，则两个步骤都将继续运行，如果成功运行，则会更新缓存。

步骤缓存仅适用于单个管道，因此即使存在步骤签名匹配项，也无法重复使用另一个管道中的步骤。

步骤缓存可用于以下步骤类型：
+ [Processing](build-and-manage-steps-types.md#step-type-processing)
+ [训练](build-and-manage-steps-types.md#step-type-training)
+ [优化](build-and-manage-steps-types.md#step-type-tuning)
+ [AutoML](build-and-manage-steps-types.md#step-type-automl)
+ [转换](build-and-manage-steps-types.md#step-type-transform)
+ [`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check)
+ [`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check)
+ [EMR](build-and-manage-steps-types.md#step-type-emr)

**Topics**
+ [

# 开启步骤缓存
](pipelines-caching-enabling.md)
+ [

# 关闭步骤缓存
](pipelines-caching-disabling.md)
+ [

# 按管道步骤类型划分的默认缓存键属性
](pipelines-default-keys.md)
+ [

# 缓存数据访问控制
](pipelines-access-control.md)

# 开启步骤缓存
<a name="pipelines-caching-enabling"></a>

要开启步骤缓存，必须在步骤定义中添加 `CacheConfig` 属性。管道定义文件中的 `CacheConfig` 属性使用以下格式：

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

`Enabled` 字段指示是否为特定步骤开启了缓存。您可以将该字段设置为`true`，这会让 SageMaker AI 尝试查找具有相同属性的该步骤的上一次运行。或者，您可以将该字段设置为`false`，这会让 SageMaker AI 在每次管道运行时运行该步骤。 `ExpireAfter`是 [ISO 8601 持续时间](https://en.wikipedia.org/wiki/ISO_8601#Durations)格式的字符串，用于定义超时时间。`ExpireAfter` 持续时间可以是年、月、周、日、小时或分钟值。每个值都由一个数字和一个表示持续时间单位的字母组成。例如：
+ “30d”= 30 天
+ “5y”= 5 年
+ “T16m”= 16 分钟
+ “30dT5h”= 30 天零 5 小时。

以下讨论描述了使用 Amaz SageMaker on Python 软件开发工具包为新的或预先存在的管道开启缓存的过程。

**为新管道开启缓存**

对于新管道，请通过 `enable_caching=True` 初始化 `CacheConfig` 实例，并将其作为管道步骤的输入。以下示例为训练步骤开启缓存，并设置 1 小时的超时时间：

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
      
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)
```

**为预先存在的管道开启缓存**

要为预先存在、已经定义的管道开启缓存，请打开该步骤的 `enable_caching` 属性，然后将 `expire_after` 设置为超时值。最后，使用 `pipeline.upsert()` 或 `pipeline.update()` 更新管道。再次运行后，以下代码示例将为训练步骤开启缓存，超时时间为 1 小时：

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

或者，在定义（预先存在的）管道之后更新缓存配置，这样就可以连续运行一段代码。以下代码示例演示了此方法：

```
# turn on caching with timeout period of one hour
pipeline.steps[0].cache_config.enable_caching = True 
pipeline.steps[0].cache_config.expire_after = "PT1H" 

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

有关更详细的代码示例以及有关 Python 软件开发工具包参数如何影响缓存的讨论，请参阅 Amaz SageMaker on Python 软件开发工具包文档中的[缓存配置](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration)。

# 关闭步骤缓存
<a name="pipelines-caching-disabling"></a>

如果您更改未在 [按管道步骤类型划分的默认缓存键属性](pipelines-default-keys.md) 中列出的管道步骤类型的任何属性，则不会重新运行该管道步骤。不过，您可能会决定无论如何都要重新运行该管道步骤。在这种情况下，您需要关闭步骤缓存。

要关闭步骤缓存，请将步骤定义的 `CacheConfig` 属性中的 `Enabled` 属性设置为 `false`，如以下代码片段所示：

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

请注意，当 `Enabled` 为 `false` 时，将忽略 `ExpireAfter` 属性。

要使用 Amaz SageMaker on Python SDK 关闭工作流工序的缓存，请定义工作流工序的管道，关闭该`enable_caching`属性，然后更新管道。

再次运行后，以下代码示例会关闭训练步骤的缓存：

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=False, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

或者，在定义管道之后关闭 `enable_caching` 属性，这样就可以连续运行一段代码。以下代码示例演示了此解决方案：

```
# turn off caching for the training step
pipeline.steps[0].cache_config.enable_caching = False

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

有关更详细的代码示例以及有关 Python 软件开发工具包参数如何影响缓存的讨论，请参阅 Amaz SageMaker on Python 软件开发工具包文档中的[缓存配置](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration)。

# 按管道步骤类型划分的默认缓存键属性
<a name="pipelines-default-keys"></a>

在决定是否重用之前的管道步骤或重新运行该步骤时，Pipelines 会检查某些属性是否发生了变化。如果这组属性与超时时间段内所有之前的运行不同，则将再次运行该步骤。这些属性包括输入构件、应用程序或算法规范以及环境变量。以下列表显示了每种管道步骤类型和属性，这些属性如果发生更改，则会启动该步骤的重新运行。有关使用哪些 Python 开发工具包参数来创建以下属性的更多信息，请参阅 Amaz SageMaker on Python 软件开发工具包文档中的[缓存配置](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration)。

## [处理步骤](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html)
<a name="collapsible-caching-section-1"></a>
+ AppSpecification
+ 环境
+ ProcessingInputs。 此属性包含有关预处理脚本的信息。

  

## [训练步骤](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html)
<a name="collapsible-caching-section-2"></a>
+ AlgorithmSpecification
+ CheckpointConfig
+ DebugHookConfig
+ DebugRuleConfigurations
+ 环境
+ HyperParameters
+ InputDataConfig。 此属性包含有关训练脚本的信息。

  

## [优化步骤](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateHyperParameterTuningJob.html)
<a name="collapsible-caching-section-3"></a>
+ HyperParameterTuningJobConfig
+ TrainingJobDefinition。 此属性由多个子属性组成，并非所有子属性都会导致步骤重新运行。可能导致重新运行（如果已更改）的子属性如下：
  + AlgorithmSpecification
  + HyperParameterRanges
  + InputDataConfig
  + StaticHyperParameters
  + TuningObjective
+ TrainingJobDefinitions

  

## [AutoML 步骤](https://docs.aws.amazon.com//sagemaker/latest/APIReference/API_AutoMLJobConfig.html)
<a name="collapsible-caching-section-4"></a>
+ Auto MLJob Config。此属性由多个子属性组成，并非所有子属性都会导致该步骤重新运行。可能导致重新运行（如果已更改）的子属性如下：
  + CompletionCriteria
  + CandidateGenerationConfig
  + DataSplitConfig
  + Mode
+ 自动MLJob物镜
+ InputDataConfig
+ ProblemType

  

## [转换步骤](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html)
<a name="collapsible-caching-section-5"></a>
+ DataProcessing
+ 环境
+ ModelName
+ TransformInput

  

## [ClarifyCheck 步](build-and-manage-steps-types.md#step-type-clarify-check)
<a name="collapsible-caching-section-6"></a>
+ ClarifyCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints

  

## [QualityCheck 步](build-and-manage-steps-types.md#step-type-quality-check)
<a name="collapsible-caching-section-7"></a>
+ QualityCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints
+ SuppliedBaselineStatistics

  

## [EMR 步骤](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-emr)
<a name="collapsible-caching-section-8"></a>
+ ClusterId
+ StepConfig

  

# 缓存数据访问控制
<a name="pipelines-access-control"></a>

当 A SageMaker I 管道运行时，它会缓存与管道启动的 SageMaker AI 作业相关的参数和元数据，并将其保存以供后续运行中重复使用。除了缓存的管道步骤外，还可以通过各种来源访问此元数据，包括以下类型：
+ `Describe*Job` 请求
+ CloudWatch 日志
+ CloudWatch 活动
+ CloudWatch 指标
+ SageMaker 人工智能搜索

请注意，对列表中每个数据源的访问都受其自身 IAM 权限集的控制。删除特定角色对一个数据源的访问权限不会影响对其他数据源的访问级别。例如，账户管理员可能会从调用方的角色中删除对 `Describe*Job` 请求的 IAM 权限。虽然调用方无法再发出 `Describe*Job` 请求，但只要他们有权运行管道，就仍然可以从使用缓存步骤的管道运行中检索元数据。如果账户管理员想要从特定 SageMaker AI 作业中完全移除对元数据的访问权限，则需要移除提供数据访问权限的每项相关服务的权限。

# 管道步骤的重试策略
<a name="pipelines-retry-policy"></a>

重试策略可帮助您在发生错误后自动重试 Pipelines 步骤。任何管道步骤都可能遇到异常，而发生异常的原因多种多样。在某些情况下，重试可以解决这些问题。通过管道步骤的重试策略，您可以选择是否重试特定管道步骤。

重试策略仅支持以下管道步骤：
+ [处理步骤](build-and-manage-steps-types.md#step-type-processing) 
+ [训练步骤](build-and-manage-steps-types.md#step-type-training) 
+ [优化步骤](build-and-manage-steps-types.md#step-type-tuning) 
+ [AutoML 步骤](build-and-manage-steps-types.md#step-type-automl) 
+ [创建模型步骤](build-and-manage-steps-types.md#step-type-create-model) 
+ [注册模型步骤](build-and-manage-steps-types.md#step-type-register-model) 
+ [转换步骤](build-and-manage-steps-types.md#step-type-transform) 
+ [笔记本作业步骤](build-and-manage-steps-types.md#step-type-notebook-job) 

**注意**  
在优化步骤和 AutoML 步骤中运行的作业在内部进行重试，即使配置了重试策略，也不会重试 `SageMaker.JOB_INTERNAL_ERROR` 异常类型。您可以使用 SageMaker API [编写自己的重试策略](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryStrategy.html)。

## 重试策略支持的异常类型
<a name="pipelines-retry-policy-supported-exceptions"></a>

管道步骤的重试策略支持以下异常类型：
+ `Step.SERVICE_FAULT`：如果调用下游服务时发生内部服务器错误或瞬时错误，则会发生这些异常。Pipelines 会自动重试此类错误。使用重试策略，您可以覆盖此异常类型的默认重试操作。
+ `Step.THROTTLING`：调用下游服务时可能会出现节流异常。Pipelines 会自动重试此类错误。使用重试策略，您可以覆盖此异常类型的默认重试操作。
+ `SageMaker.JOB_INTERNAL_ERROR`：这些异常发生在 SageMaker AI 任务返回时`InternalServerError`。在这种情况下，启动新作业可能会解决暂时性问题。
+ `SageMaker.CAPACITY_ERROR`: SageMaker AI 作业可能会遇到 Amazon EC2`InsufficientCapacityErrors`，这会导致 SageMaker AI 任务失败。您可以通过启动新的 SageMaker AI 作业来重试，以避免出现此问题。
+ `SageMaker.RESOURCE_LIMIT`：运行 SageMaker AI 作业时，您可以超出资源限制配额。您可以稍等片刻，然后在短时间后重试运行 SageMaker AI 作业，看看资源是否已释放。

## 重试策略的 JSON 架构
<a name="pipelines-retry-policy-json-schema"></a>

管道的重试策略具有以下 JSON 架构：

```
"RetryPolicy": {
   "ExceptionType": [String]
   "IntervalSeconds": Integer
   "BackoffRate": Double
   "MaxAttempts": Integer
   "ExpireAfterMin": Integer
}
```
+ `ExceptionType`：此字段需要以下字符串数组格式的异常类型。
  + `Step.SERVICE_FAULT`
  + `Step.THROTTLING`
  + `SageMaker.JOB_INTERNAL_ERROR`
  + `SageMaker.CAPACITY_ERROR`
  + `SageMaker.RESOURCE_LIMIT`
+ `IntervalSeconds`（可选）：第一次重试前的秒数（默认为 1）。`IntervalSeconds` 的最大值为 43200 秒（12 小时）。
+ `BackoffRate`（可选）：每次尝试时重试间隔增加的乘数（默认为 2.0）。
+ `MaxAttempts`（可选）：一个正整数，表示重试的最大次数（默认为 5）。如果错误重复发生超过 `MaxAttempts` 指定次数，则停止重试并恢复正常错误处理。值为 0 表示永不重试错误。`MaxAttempts` 最大值为 20。
+ `ExpireAfterMin`（可选）：一个正整数，表示重试的最大时间跨度。如果在执行步骤的 `ExpireAfterMin` 分钟数后再次出现错误，则停止重试并恢复正常的错误处理。值为 0 表示永不重试错误。`ExpireAfterMin ` 的最大值为 14400 分钟（10 天）。
**注意**  
只能给出 `MaxAttempts` 或 `ExpireAfterMin` 中的一个，但不能同时给出两个；如果两者均*未* 指定，则 `MaxAttempts` 变为默认值。如果在一个策略中标识了这两个属性，则重试策略会生成验证错误。

# 配置重试策略
<a name="pipelines-configuring-retry-policy"></a>

虽然 SageMaker Pipelines 提供了一种强大的自动化方式来编排机器学习工作流程，但在运行它们时可能会遇到故障。为了从容应对此类情况并提高管道的可靠性，您可以配置重试策略，定义遇到异常后如何以及何时自动重试特定步骤。重试策略允许您指定重试的异常类型、重试尝试的最大次数、重试间隔以及增加重试间隔的回退率。以下部分提供了一些示例，说明如何使用 JSON 和使用 SageMaker Python SDK 为管道中的训练步骤配置重试策略。

以下是使用重试策略的训练步骤的示例。

```
{
    "Steps": [
        {
            "Name": "MyTrainingStep",
            "Type": "Training",
            "RetryPolicies": [
                {
                    "ExceptionType": [
                        "SageMaker.JOB_INTERNAL_ERROR",
                        "SageMaker.CAPACITY_ERROR"
                    ],
                    "IntervalSeconds": 1,
                    "BackoffRate": 2,
                    "MaxAttempts": 5
                }
            ]
        }
    ]
}
```



以下示例说明如何使用重试策略在 SDK for Python (Boto3) 中构建 `TrainingStep`。

```
from sagemaker.workflow.retry import (
    StepRetryPolicy, 
    StepExceptionTypeEnum,
    SageMakerJobExceptionTypeEnum,
    SageMakerJobStepRetryPolicy
)

step_train = TrainingStep(
    name="MyTrainingStep",
    xxx,
    retry_policies=[
        // override the default 
        StepRetryPolicy(
            exception_types=[
                StepExceptionTypeEnum.SERVICE_FAULT, 
                StepExceptionTypeEnum.THROTTLING
            ],
            expire_after_mins=5,
            interval_seconds=10,
            backoff_rate=2.0 
        ),
        // retry when resource limit quota gets exceeded
        SageMakerJobStepRetryPolicy(
            exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
            expire_after_mins=120,
            interval_seconds=60,
            backoff_rate=2.0
        ),
        // retry when job failed due to transient error or EC2 ICE.
        SageMakerJobStepRetryPolicy(
            failure_reason_types=[
                SageMakerJobExceptionTypeEnum.INTERNAL_ERROR,
                SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
            ],
            max_attempts=10,
            interval_seconds=30,
            backoff_rate=2.0
        )
    ]
)
```

有关为某些步骤类型配置重试行为的更多信息，请参阅 *[Amazon Python SDK 文档中的 Amaz SageMaker on Pipelin SageMaker es-重试政策](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#retry-policy)*。

# 选择性执行管道步骤
<a name="pipelines-selective-ex"></a>

在使用管道创建工作流程和编排 ML 训练步骤时，您可能需要进行多个实验阶段。您可能只想重复某些步骤，而不是每次都运行完整的管道。通过管道，您可以有选择性地执行管道步骤。这有助于优化您的 ML 训练。选择性执行在以下情况下很有用：
+ 您想使用更新的实例类型、超参数或其他变量重新启动特定步骤，同时保留上游步骤中的参数。
+ 您的管道未能完成一个中间步骤。执行过程中的先前步骤（如数据准备或特征提取）的重新运行成本很高。您可能需要引入一个修复程序，并手动重新运行某些步骤来完成管道。

使用选择性执行，您可以选择运行任何步骤子集，前提是这些步骤在管道的有向无环图 (DAG) 中相连。以下 DAG 显示了管道工作流示例：

![\[示例管道的有向无环图 (DAG)。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/pipeline-full.png)


您可以在选择性执行中选择步骤 `AbaloneTrain` 和 `AbaloneEval`，但不能只选择 `AbaloneTrain` 和 `AbaloneMSECond` 步骤，因为这些步骤在 DAG 中没有连接。对于工作流程中的非选定步骤，选择性执行会重复使用参考管道执行的输出，而不是重新运行这些步骤。此外，处于选定步骤下游的非选定步骤不会在选择性执行中运行。

如果您选择在管道中运行中间步骤的子集，则您的步骤可能取决于之前的步骤。 SageMaker AI 需要一个参考管道执行来为这些依赖项提供资源。例如，如果选择运行步骤 `AbaloneTrain` 和 `AbaloneEval`，则需要 `AbaloneProcess` 步骤的输出。您可以提供参考执行 ARN，也可以指示 SageMaker AI 使用最新的管道执行（这是默认行为）。如果您有参考执行，还可以从参考运行中创建运行时参数，并通过重载将其提供给选择性执行运行。有关更多信息，请参阅 [重复使用参考执行中的运行时参数值](#pipelines-selective-ex-reuse)。

具体来说，您可以使用 `SelectiveExecutionConfig` 为选择性执行管道运行提供配置。如果您为参考管道执行添加 ARN（带`source_pipeline_execution_arn`参数）， SageMaker AI 将使用您提供的管道执行中前一步的依赖关系。如果您未包含 ARN 并且存在最新的管道执行，则 SageMaker AI 会默认将其用作参考。如果您不包含 ARN 且不希望 SageMaker AI 使用您最新的管道执行，请设置为`reference_latest_execution`。`False` SageMaker AI 最终用作参考的管道执行，无论是最新的还是用户指定的，都必须处于`Success`或`Failed`状态。

下表汇总了 SageMaker AI 如何选择参考执行。


| `source_pipeline_execution_arn` 参数值 | `reference_latest_execution` 参数值 | 使用的参考执行 | 
| --- | --- | --- | 
| 管道 ARN | `True` 或未指定 | 指定的管道 ARN | 
| 管道 ARN | `False` | 指定的管道 ARN | 
| null 或未指定 | `True` 或未指定 | 最新的管道执行 | 
| null 或未指定 | `False` | 无 - 在这种情况下，请选择没有上游依赖关系的步骤 | 

有关选择性执行配置要求的更多信息，请参阅 s [agemaker.workflow.selective\$1execution\$1config。 SelectiveExecutionConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#selective-execution-config)文档。

以下讨论包括适用于一些情况的示例，这些情况是：您要指定管道参考执行；使用最新的管道执行作为参考；或者在没有参考管道执行的情况下运行选择性执行。

## 使用用户指定的管道参考运行选择性执行
<a name="pipelines-selective-ex-arn"></a>

下面的示例演示了使用参考管道执行方式选择性执行步骤 `AbaloneTrain` 和 `AbaloneEval`。

```
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig

selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef", 
    selected_steps=["AbaloneTrain", "AbaloneEval"]
)

selective_execution = pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## 以最新管道执行作为参考的选择性执行
<a name="pipelines-selective-ex-latest"></a>

下面的示例演示了以最新的管道执行为参考，有选择地执行步骤 `AbaloneTrain` 和 `AbaloneEval`。由于 SageMaker AI 默认使用最新的管道执行，因此您可以选择将`reference_latest_execution`参数设置为`True`。

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneTrain", "AbaloneEval"],
    # optional
    reference_latest_execution=True
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## 无参考管道的选择性执行
<a name="pipelines-selective-ex-none"></a>

以下示例演示了在`AbaloneTrain`不提供参考 ARN 的情况下选择性地执行这些步骤`AbaloneProcess`，并关闭了使用最新管道运行作为参考的选项。 SageMaker AI 允许这种配置，因为这部分步骤不依赖于之前的步骤。

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneProcess", "AbaloneTrain"],
    reference_latest_execution=False
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## 重复使用参考执行中的运行时参数值
<a name="pipelines-selective-ex-reuse"></a>

您可以使用 `build_parameters_from_execution` 从参考管道执行中构建参数，并将结果提供给选择性执行管道。您可以使用参考执行中的原始参数，也可以使用 `parameter_value_overrides` 参数应用任何覆盖。

以下示例说明了如何从参考执行构建参数以及如何对 `MseThreshold` 参数应用覆盖。

```
# Prepare a new selective execution.
selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    selected_steps=["AbaloneTrain", "AbaloneEval", "AbaloneMSECond"],
)
# Define a new parameters list to test.
new_parameters_mse={
    "MseThreshold": 5,
}

# Build parameters from reference execution and override with new parameters to test.
new_parameters = pipeline.build_parameters_from_execution(
    pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    parameter_value_overrides=new_parameters_mse
)

# Start pipeline execution with new parameters.
execution = pipeline.start(
    selective_execution_config=selective_execution_config,
    parameters=new_parameters
)
```

# 基准计算、偏差检测和生命周期以及 Amazon Pi ClarifyCheck pelin SageMaker es 中的 QualityCheck 步骤
<a name="pipelines-quality-clarify-baseline-lifecycle"></a>

以下主题讨论了使用和[`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check)步骤时，Amazon Pipelin SageMaker es 中的基准[`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check)和模型版本是如何演变的。

对于 `ClarifyCheck` 步骤，基准是位于带有后缀 `constraints` 的步骤属性中的单个文件。对于 `QualityCheck` 步骤，基准是位于步骤属性中的两个文件的组合：一个文件带有后缀 `statistics`，另一个文件带有后缀 `constraints`。在以下主题中，我们将讨论这些属性，并在前缀中说明它们的使用方式，以及在这两个管道步骤中对基准行为和生命周期的影响。例如，`ClarifyCheck` 步骤始终在 `CalculatedBaselineConstraints` 属性中计算和分配新的基准，而 `QualityCheck` 步骤在 `CalculatedBaselineConstraints` 和 `CalculatedBaselineStatistics` 属性中也执行相同的操作。

## 和 QualityCheck 步骤的基线计算 ClarifyCheck 和登记
<a name="pipelines-quality-clarify-baseline-calculations"></a>

`ClarifyCheck` 和 `QualityCheck` 步骤均始终根据底层处理作业运行中的步骤输入来计算新的基准。这些新计算的基准可通过带有前缀 `CalculatedBaseline` 的属性进行访问。您可以在[模型步骤](build-and-manage-steps-types.md#step-type-model)中将这些属性记录为模型包的 `ModelMetrics`。此模型包可以注册 5 种不同的基准。您可以针对每种检查类型注册其中一种基准：运行 `ClarifyCheck` 步骤时进行的数据偏差、模型偏差和模型可解释性检查，以及运行 `QualityCheck` 步骤时进行的模型质量和数据质量检查。`register_new_baseline` 参数决定了步骤运行后在属性中设置的前缀为 `BaselineUsedForDriftCheck` 的值。

下表列出了可能的使用案例，显示了您可以为 `ClarifyCheck` 和 `QualityCheck` 步骤设置的步骤参数所产生的不同行为：


| 选择此配置时可能考虑的使用案例  | `skip_check` / `register_new_baseline` | 步骤会进行偏差检查吗？ | 步骤属性 `CalculatedBaseline` 的值 | 步骤属性 `BaselineUsedForDriftCheck` 的值 | 
| --- | --- | --- | --- | --- | 
| 您正在定期进行再训练，并启用了检查以获得新的模型版本，但您*希望保留先前基准* 作为新模型版本的模型注册表中的 `DriftCheckBaselines`。 | False/ False | 根据现有基准进行偏差检查 | 通过运行该步骤计算出的新基准 | 模型注册表中最新批准模型的基准或作为步骤参数提供的基准 | 
| 您正在定期进行再训练，并启用了检查以获得新的模型版本，但您*希望用新计算出的基准对新模型版本的模型注册表中的 `DriftCheckBaselines` 进行刷新*。 | False/ True | 根据现有基准进行偏差检查 | 通过运行该步骤计算出的新基准 | 通过运行该步骤新计算出的基准（属性 CalculatedBaseline 的值） | 
| 您之所以启动管道是为了重新训练新模型版本，是因为 Amazon SageMaker Model Monitor 在终端节点上检测到*针对特定类型检查的违规行为，并且您想跳过对照先前基准的此类检查，但要像新模型版本的模型注册表`DriftCheckBaselines`中那样延续之前的基准*。 | True/ False | 没有偏差检查 | 通过运行该步骤计算的新基线 | 模型注册表中最新批准模型的基准或作为步骤参数提供的基准 | 
| 这发生在以下情况下：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html)  | True/ True | 没有偏差检查 | 通过运行该步骤计算出的新基准 | 通过运行该步骤新计算出的基准（属性 CalculatedBaseline 的值） | 

**注意**  
如果您在约束中使用科学记数法，则需要转换为浮点数。有关如何执行此操作的预处理脚本示例，请参阅[创建模型质量基准](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-baseline.html)。

使用[模型步骤](build-and-manage-steps-types.md#step-type-model)注册模型时，可以将 `BaselineUsedForDriftCheck` 属性注册为 `DriftCheckBaselines`。然后，Model Monitor 可以使用这些基准文件进行模型和数据质量检查。此外，这些基线还可以在 ClarifyCheckStep 和`QualityCheck`步骤中使用，将新训练的模型与在模型注册表中注册的现有模型进行比较，以备将来的管道运行。

## 根据管道中以前的基线进行漂移检测
<a name="pipelines-quality-clarify-baseline-drift-detection"></a>

就 `QualityCheck` 步骤而言，当您启动管道进行定期再训练以获得新的模型版本时，如果数据质量和数据偏差在先前批准的模型版本的基准上存在 [违规情况的架构（constraint\$1violations.json 文件）](model-monitor-interpreting-violations.md)，则您可能不想运行训练步骤。在运行 `ClarifyCheck` 步骤时，如果模型质量、模型偏差或模型可解释性违反了先前批准的模型版本的已注册基准，则您可能不想注册新训练的模型版本。在这些情况下，您可以通过将相应检查步骤的 `skip_check` 属性设置为 `False` 来启用所需的检查，如果对照先前基准检测到违规行为，则 `ClarifyCheck` 和 `QualityCheck` 步骤将失败。然后，管道进程将不再继续，这样就不会注册偏离基准的模型。`ClarifyCheck` 和 `QualityCheck` 步骤能够获得给定模型包组的最新批准模型版本的 `DriftCheckBaselines`，并与之进行比较。对于 `QualityCheck` 步骤，除了 `supplied_baseline_statistics` 之外，也可以直接通过 `supplied_baseline_constraints` 提供先前的基准，并且这些基准始终优先于从模型包组中提取的任何基准。

## 基线和模型版本的生命周期以及管道的演变
<a name="pipelines-quality-clarify-baseline-evolution"></a>

将 `ClarifyCheck` 和 `QualityCheck` 步骤的 `register_new_baseline` 设置为 `False`，即可通过步骤属性前缀 `BaselineUsedForDriftCheck` 访问先前的基准。然后，当您使用[模型步骤](build-and-manage-steps-types.md#step-type-model)注册模型时，就可以将这些基准注册为新模型版本中的 `DriftCheckBaselines`。在模型注册表中批准此新模型版本后，此模型版本中的 `DriftCheckBaseline` 将可用于下一个管道进程中的 `ClarifyCheck` 和 `QualityCheck` 步骤。如果要刷新某种检查类型的基准以用于将来的模型版本，可以将 `register_new_baseline` 设置为 `True`，以便带前缀 `BaselineUsedForDriftCheck` 的属性成为新计算出的基准。通过这些方式，您可以为将来训练的模型保留首选基准，或者在需要时刷新基准以进行偏差检查，从而在整个模型训练迭代中管理基准演进和生命周期。

下图说明了基线演变和生命周期的 model-version-centric视图。

![\[基线演变和生命周期 model-version-centric视图。\]](http://docs.aws.amazon.com/zh_cn/sagemaker/latest/dg/images/pipelines/Baseline-Lifecycle.png)


# 安排管道运行
<a name="pipeline-eventbridge"></a>

[您可以使用亚马逊安排亚马逊 SageMaker 管道的执行 EventBridge。](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html)[亚马逊支持将亚马逊 SageMaker 管道作为目标 EventBridge。](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html)这样，您就可以根据事件总线中的任何事件启动建模管线的执行。借 EventBridge助，您可以自动执行管道并自动响应诸如训练作业或端点状态更改之类的事件。事件包括上传到您的 Amazon S3 存储桶的新文件、由于偏移而导致的 Amazon SageMaker AI 终端节点状态发生变化以及*亚马逊简单通知服务* (SNS) 主题。

可自动启动以下 Pipelines 操作：  
+  `StartPipelineExecution` 

有关安排 SageMaker AI 任务的更多信息，请参阅使用 A [mazon EventBridge 自动化 SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html)。

**Topics**
+ [

## 向 Amazon 安排管道 EventBridge
](#pipeline-eventbridge-schedule)
+ [

## 使用 SageMaker Python 软件开发工具包安排管道
](#build-and-manage-scheduling)

## 向 Amazon 安排管道 EventBridge
<a name="pipeline-eventbridge-schedule"></a>

要使用 Amazon Event CloudWatch s 开始管道执行，您必须创建 EventBridge[规则](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_Rule.html)。在为事件创建规则时，您可以指定在 EventBridge 收到与该规则匹配的事件时要采取的目标操作。当事件与规则匹配时， EventBridge 会将该事件发送到指定的目标并启动规则中定义的操作。

 以下教程展示了如何 EventBridge 使用 EventBridge控制台或来安排管道执行 AWS CLI。  

### 先决条件
<a name="pipeline-eventbridge-schedule-prerequisites"></a>
+  EventBridge 可以凭`SageMaker::StartPipelineExecution`权限代入的角色。如果您从 EventBridge控制台创建规则，则可以自动创建此角色；否则，您需要自己创建此角色。 有关创建 A SageMaker I 角色的信息，请参阅[SageMaker 角色](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html)。
+ 有待安排的亚马逊 SageMaker AI 管道。要创建 Amazon SageMaker AI 管道，请参阅[定义管道](https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html)。

### 使用 EventBridge 控制台创建 EventBridge 规则
<a name="pipeline-eventbridge-schedule-console"></a>

 以下过程说明如何使用 EventBridge 控制台创建 EventBridge 规则。  

1. 导航至 [EventBridge 控制台](https://console.aws.amazon.com/events)。

1. 选择左侧的**规则**。

1.  选择 `Create Rule`。

1. 为您的规则输入名称和描述。

1.  选择启动此规则的方式。您可以为规则提供以下选择：
   + **事件模式**：当发生与模式匹配的事件时，您的规则会启动。您可以选择与特定类型事件相匹配的预定义模式，也可以创建自定义模式。如果选择预定义模式，则可以编辑该模式以对其进行自定义。有关事件模式的更多信息，请参阅[事件中的 CloudWatch 事件模式](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html)。
   + **计划**：您的规则将按指定的计划定期启动。您可以使用固定速率的计划，该计划会定期启动并持续指定的分钟数、小时数或周数。您还可以使用 [cron 表达式](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html#CronExpressions)来创建更精细的计划，例如“每月第一个星期一上午 8 点”。自定义或合作伙伴事件总线不支持计划。

1. 选择所需的事件总线。

1. 选择当某个事件与您的事件模式匹配或当计划启动时要调用的目标。最多可为每项规则添加 5 个目标。在目标下拉列表中选择 `SageMaker Pipeline`。

1. 从管道下拉列表中选择要启动的管道。

1. 使用名称和值对添加要传递给管道执行的参数。参数值可以是静态的，也可以是动态的。有关 Amazon A SageMaker I Pipeline 参数的更多信息，请参阅[AWS::Events::Rule SagemakerPipelineParameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-pipeline.html#aws-resource-sagemaker-pipeline-properties)。
   + 每次启动管道时，都会将静态值传递给管道执行。例如，如果`{"Name": "Instance_type", "Value": "ml.4xlarge"}`在参数列表中指定，则`StartPipelineExecutionRequest`每次 EventBridge 启动管道时都会将其作为参数传递。
   + 动态值是使用 JSON 路径指定的。 EventBridge 解析事件负载中的值，然后将其传递给管道执行。例如：*`$.detail.param.value`*

1. 选择要用于此规则的角色。您可以使用现有角色，也可以创建新的角色。

1. （可选）添加标签。

1. 选择 `Create` 以最终确定您的规则。

 您的规则现已生效，可以启动管道执行了。

### 使用创建 EventBridge 规则 [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/events/index.html)
<a name="pipeline-eventbridge-schedule-cli"></a>

 以下过程说明如何使用创建 EventBridge 规则 AWS CLI。

1. 创建要启动的规则。使用创建 EventBridge 规则时 AWS CLI，您可以选择两个启动规则的方式，即事件模式和时间表。
   +  **事件模式**：当发生与模式匹配的事件时，您的规则会启动。您可以选择与特定类型事件相匹配的预定义模式，也可以创建自定义模式。如果选择预定义模式，则可以编辑该模式以对其进行自定义。  您可以使用以下命令创建具有事件模式的规则：

     ```
     aws events put-rule --name <RULE_NAME> ----event-pattern <YOUR_EVENT_PATTERN> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```
   +  **计划**：您的规则将按指定的计划定期启动。您可以使用固定速率的计划，该计划会定期启动并持续指定的分钟数、小时数或周数。您还可以使用 cron 表达式来创建更精细的计划，例如“每月第一个星期一上午 8 点”。自定义或合作伙伴事件总线不支持计划。您可以使用以下命令创建具有计划的规则：

     ```
     aws events put-rule --name <RULE_NAME> --schedule-expression <YOUR_CRON_EXPRESSION> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```

1. 添加目标，以便在某个事件与您的事件模式匹配或当计划启动时调用。最多可为每项规则添加 5 个目标。  对于每个目标，您必须指定：  
   +  ARN：管道的资源 ARN。
   +  角色 ARN： EventBridge 应假设角色的 ARN 来执行管道。
   +  参数：要传递的 SageMaker Amazon AI 管道参数。

1. 运行以下命令，使用 p [ut-](https://docs.aws.amazon.com/cli/latest/reference/events/put-targets.html) targets SageMaker 将 Amazon AI 管道作为目标传递给您的规则：

   ```
   aws events put-targets --rule <RULE_NAME> --event-bus-name <EVENT_BUS_NAME> --targets "[{\"Id\": <ID>, \"Arn\": <RESOURCE_ARN>, \"RoleArn\": <ROLE_ARN>, \"SageMakerPipelineParameter\": { \"SageMakerParameterList\": [{\"Name\": <NAME>, \"Value\": <VALUE>}]} }]"] 
   ```

## 使用 SageMaker Python 软件开发工具包安排管道
<a name="build-and-manage-scheduling"></a>

以下各节介绍如何使用 SageMaker Python SDK 设置 EventBridge 资源访问权限和创建管道计划。

### 所需的权限
<a name="build-and-manage-scheduling-permissions"></a>

您需要获得必要的权限才能使用管道调度程序。完成以下步骤设置权限：

1. 将以下最低权限策略附加到用于创建管道触发器或使用 AWS 托管策略的 IAM 角色`AmazonEventBridgeSchedulerFullAccess`。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement":
       [
           {
               "Action":
               [
                   "scheduler:ListSchedules",
                   "scheduler:GetSchedule",
                   "scheduler:CreateSchedule",
                   "scheduler:UpdateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Effect": "Allow",
               "Resource":
               [
                   "*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": "iam:PassRole",
               "Resource": "arn:aws:iam::*:role/*", 
               "Condition": {
                   "StringLike": {
                       "iam:PassedToService": "scheduler.amazonaws.com"
                   }
               }
           }
       ]
   }
   ```

------

1.  EventBridge 通过将服务主体`scheduler.amazonaws.com`添加到该角色的信任策略中，与建立信任关系。如果您在 SageMaker Studio 中启动笔记本，请务必将以下信任策略附加到执行角色。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com"
                ]
            },
        "Action": "sts:AssumeRole"
        }
    ]
}
```

------

### 创建管道时间表
<a name="build-and-manage-scheduling-create"></a>

使用 `PipelineSchedule` 构造函数，可以将管道调度为运行一次或按预定时间间隔运行。管道计划必须是 `at`、`rate` 或 `cron` 类型。这组计划类型是[EventBridge 计划选项](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html)的扩展。有关如何使用该`PipelineSchedule`类的更多信息，请参阅 [sagemaker.workflow.triggers。 PipelineSchedule](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#pipeline-schedule)。下面的示例演示了如何使用 `PipelineSchedule` 创建每种调度类型。

```
from sagemaker.workflow.triggers import PipelineSchedule

# schedules a pipeline run for 12/13/2023 at time 10:15:20 UTC
my_datetime_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(2023, 12, 13, 10, 15, 20)
)

# schedules a pipeline run every 5 minutes
my_rate_schedule = PipelineSchedule(
    name="<schedule-name>", 
    rate=(5, "minutes")
)

# schedules a pipeline run at 10:15am UTC on the last Friday of each month during the years 2022 to 2023
my_cron_schedule = PipelineSchedule(
    name="<schedule-name>", 
    cron="15 10 ? * 6L 2022-2023"
)
```

**注意**  
如果创建的是一次性计划表，需要访问当前时间，请使用 `datetime.utcnow()` 而不是 `datetime.now()`。后者不存储当前区域上下文，并导致传递到的时间不正确 EventBridge。

### 将触发器连接到管道
<a name="build-and-manage-scheduling-attach"></a>

要将 `PipelineSchedule` 连接到管道，请在创建的管道对象上调用 `put_triggers`，并附上触发器列表。如果您收到响应 ARN，则表示您在账户中成功创建了计划，并 EventBridge 开始按指定的时间或速率调用目标管道。您必须指定具有正确权限的角色，才能将触发器附加到父管道。如果您不提供，Pipelines 会从[配置文件](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)中获取用于创建管道的默认角色。

下面的示例演示了如何将计划附加到管道。

```
scheduled_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[...],
    sagemaker_session=<sagemaker-session>,
)
custom_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(year=2023, month=12, date=25, hour=10, minute=30, second=30)
)
scheduled_pipeline.put_triggers(triggers=[custom_schedule], role_arn=<role>)
```

### 描述当前的触发因素
<a name="build-and-manage-scheduling-describe"></a>

要检索已创建的管道触发器的相关信息，您可以调用有触发器名称的 `describe_trigger()` API。此命令返回已创建计划表达式的详细信息，如开始时间、启用状态和其他有用信息。下面的代码段显示了一个调用示例：

```
scheduled_pipeline.describe_trigger(name="<schedule-name>")
```

### 清理触发资源
<a name="build-and-manage-scheduling-clean"></a>

删除管道前，请清理现有触发器，以避免账户资源泄漏。应在销毁父管道之前删除触发器。您可以通过向 `delete_triggers` API 传递触发器名称列表来删除触发器。API 传递触发器名称列表，即可删除触发器。下面的代码段演示了如何删除触发器。

```
pipeline.delete_triggers(trigger_names=["<schedule-name>"])
```

**注意**  
删除触发器时请注意以下限制：  
通过指定触发器名称来删除触发器的选项仅在 SageMaker Python SDK 中可用。在 CLI 或 `DeletePipeline` API 调用中删除管道不会删除触发器。结果，触发器变成孤立状态， SageMaker AI 会尝试为不存在的管道开始运行。
此外，如果您正在使用另一个 notebook 会话或已经删除了管道目标，请通过调度程序 CL [I](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/scheduler/delete-schedule.html) 或 EventBridge 控制台清理孤立的计划。

# 亚马逊 SageMaker 实验集成
<a name="pipelines-experiments"></a>

Amaz SageMaker on Pipelines 与亚马逊 SageMaker 实验紧密集成。默认情况下，当 Pipelines 创建并执行管道时，如果不存在，则会创建以下 SageMaker 实验实体：
+ 管道的实验
+ 每次执行管道时的运行组
+ 为在管道执行步骤中创建的每个 SageMaker AI 作业添加到运行组的运行

您可以比较多个管道执行中的模型训练准确性等指标，就像在 SageMaker AI 模型训练实验的多个运行组中比较此类指标一样。

以下示例显示了 [Amaz SageMaker on Python 软件开发工具包](https://sagemaker.readthedocs.io/en/stable)中 Pip [elin](https://github.com/aws/sagemaker-python-sdk/blob/v2.41.0/src/sagemaker/workflow/pipeline.py) e 类的相关参数。

```
Pipeline(
    name="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[...]
)
```

如果您不想为管道创建实验和运行组，则将 `pipeline_experiment_config` 设置为 `None`。

**注意**  
Amaz SageMaker on Python SDK v2.41.0 中引入了实验集成。

根据为 `pipeline_experiment_config` 的 `ExperimentName` 和 `TrialName` 参数指定的内容，应用以下命名规则：
+ 如果不指定 `ExperimentName`，则将管道 `name` 用作实验名称。

  如果指定 `ExperimentName`，则将其用作实验名称。如果存在具有该名称的实验，则管道创建的运行组将添加到现有实验中。如果不存在具有该名称的实验，则会创建一个新的实验。
+ 如果不指定 `TrialName`，则将管道执行 ID 用作运行组名称。

  如果指定 `TrialName`，则将其用作运行组名称。如果存在具有该名称的运行组，则管道创建的运行将添加到现有的运行组中。如果不存在具有该名称的运行组，则会创建一个新的运行组。

**注意**  
删除创建了实验实体的管道时，不会删除这些实体。您可以使用 SageMaker 实验 API 来删除实体。

有关如何查看与管道关联的 SageMaker AI 实验实体的信息，请参阅[从管道访问实验数据](pipelines-studio-experiments.md)。有关 SageMaker 实验的更多信息，请参阅[Studio 经典版中的亚马逊 SageMaker 实验](experiments.md)。

下面几节将展示上述规则的示例，以及如何在管道定义文件中表示这些规则。有关管道定义文件的更多信息，请参阅 [管道概述](pipelines-overview.md)。

**Topics**
+ [

# 默认行为
](pipelines-experiments-default.md)
+ [

# 禁用实验集成
](pipelines-experiments-none.md)
+ [

# 指定自定义实验名称
](pipelines-experiments-custom-experiment.md)
+ [

# 指定自定义运行组名称
](pipelines-experiments-custom-trial.md)

# 默认行为
<a name="pipelines-experiments-default"></a>

**创建管道**

创建 A SageMaker I 管道时的默认行为是自动将其与 SageMaker 实验集成。如果您未指定任何自定义配置， SageMaker AI 会创建一个与管道同名的实验，使用管道执行 ID 作为名称为管道的每次执行创建一个运行组，并在每个运行组中为作为管道步骤一部分启动的每个 SageMaker AI 作业单独运行。您可以无缝跟踪和比较不同管道执行的指标，类似于分析模型训练实验的方法。下一节演示了在定义管道而未明确配置实验集成时的默认行为。

省略了 `pipeline_experiment_config`。`ExperimentName` 默认为管道 `name`。`TrialName` 默认为执行 ID。

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    steps=[step_train]
)
```

**管道定义文件**

```
{
  "Version": "2020-12-01",
  "Parameters": [
    {
      "Name": "InputDataSource"
    },
    {
      "Name": "InstanceCount",
      "Type": "Integer",
      "DefaultValue": 1
    }
  ],
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# 禁用实验集成
<a name="pipelines-experiments-none"></a>

**创建管道**

通过在定义管道`None`时将`pipeline_experiment_config`参数设置为，可以禁用管道与 SageMaker 实验的集成。这样， SageMaker AI 就不会自动创建用于跟踪与您的管道执行相关的指标和工件的实验、运行组或单独运行。下面的示例将管道配置参数设置为 `None`。

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=None,
    steps=[step_train]
)
```

**管道定义文件**

这与前面的默认示例相同，但没有 `PipelineExperimentConfig`。

# 指定自定义实验名称
<a name="pipelines-experiments-custom-experiment"></a>

虽然默认行为是使用管道名称作为实验中的 SageMaker 实验名称，但您可以覆盖该名称并改为指定自定义实验名称。如果要将多个管道执行归入同一实验，以便于分析和比较，这将非常有用。运行组名称仍默认为管道执行 ID，除非您也明确设置了自定义名称。下一节将演示如何使用自定义实验名称创建管道，同时保留运行组名称作为默认执行 ID。

**创建管道**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      "CustomExperimentName",
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[step_train]
)
```

**管道定义文件**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": "CustomExperimentName",
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# 指定自定义运行组名称
<a name="pipelines-experiments-custom-trial"></a>

除了设置自定义实验名称外，您还可以为管道执行期间 SageMaker 实验创建的运行组指定自定义名称。该名称会附加管道执行 ID，以确保唯一性。您可以指定自定义运行组名称，以识别和分析同一实验中的相关管道运行。下面将介绍如何使用自定义运行组名称定义管道，同时将默认管道名称用于实验名称。

**创建管道**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      Join(on="-", values=["CustomTrialName", ExecutionVariables.PIPELINE_EXECUTION_ID])
    ),
    steps=[step_train]
)
```

**管道定义文件**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {
      "On": "-",
      "Values": [
         "CustomTrialName",
         {"Get": "Execution.PipelineExecutionId"}
       ]
    }
  },
  "Steps": [...]
}
```

# 使用本地模式运行管道
<a name="pipelines-local-mode"></a>

SageMaker 在托管 A SageMaker I 服务上执行管道之前，Pipelines 本地模式是测试训练、处理和推理脚本以及[管道参数](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#pipeline-parameters)的运行时兼容性的简便方法。通过使用本地模式，您可以使用较小的数据集在本地测试 SageMaker AI 管道。这样可以快速轻松地调试用户脚本和管道定义本身中的错误，而不会产生使用托管服务的成本。下面的主题将介绍如何在本地定义和运行管道。

Pipelines 本地模式在幕后利用 [SageMaker AI 作业本地模式](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode)。这是 SageMaker Python SDK 中的一项功能，允许你使用 Docker 容器在本地运行 SageMaker AI 内置镜像或自定义镜像。管道本地模式建立在 SageMaker AI 作业本地模式之上。因此，您将会看到与单独运行这些作业时相同的结果。例如，本地模式仍使用 Amazon S3 上传模型构件和处理输出。如果要将本地作业生成的数据存储在本地磁盘上，可以使用[本地模式](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode)中提到的设置。

管道本地模式目前支持以下步骤类型：
+ [训练步骤](build-and-manage-steps-types.md#step-type-training)
+ [处理步骤](build-and-manage-steps-types.md#step-type-processing)
+ [转换步骤](build-and-manage-steps-types.md#step-type-transform)
+ [模型步骤](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model-create)（仅具有“创建模型”参数）
+ [条件步骤](build-and-manage-steps-types.md#step-type-condition)
+ [Fail 步骤](build-and-manage-steps-types.md#step-type-fail)

托管管道服务允许使用[并行配置](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parallelism-configuration)并行执行多个步骤，而本地管道执行程序则是按顺序运行步骤。因此，本地管道的总体执行性能可能比在云上运行的管道差，这主要取决于数据集的大小、算法以及本地计算机的性能。另请注意，在本地模式下运行的流水线不会记录在[SageMaker 实验](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-experiments.html)中。

**注意**  
管道本地模式与 SageMaker AI 算法不兼容，例如 XGBoost。如果要使用这些算法，则必须在[脚本模式](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-script-mode/sagemaker-script-mode.html)下使用它们。

为了在本地执行管道，与管道步骤和管道本身关联的 `sagemaker_session` 字段必须是 `LocalPipelineSession` 类型。以下示例显示了如何定义要在本地执行的 A SageMaker I 管道。

```
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline

local_pipeline_session = LocalPipelineSession()

pytorch_estimator = PyTorch(
    sagemaker_session=local_pipeline_session,
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)

step = TrainingStep(
    name="MyTrainingStep",
    step_args=pytorch_estimator.fit(
        inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data/train"),
    )
)

pipeline = Pipeline(
    name="MyPipeline",
    steps=[step],
    sagemaker_session=local_pipeline_session
)

pipeline.create(
    role_arn=sagemaker.get_execution_role(), 
    description="local pipeline example"
)

// pipeline will execute locally
execution = pipeline.start()

steps = execution.list_steps()

training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']

step_outputs = pipeline_session.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)
```

准备好在托管 Pipelin SageMaker es 服务上执行管道后，可以通过将前面的代码片段`LocalPipelineSession`中的替换为`PipelineSession`（如以下代码示例所示），然后重新运行代码来实现。

```
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()
```

# 对亚马逊 SageMaker 管道进行故障排除
<a name="pipelines-troubleshooting"></a>

在使用 Amaz SageMaker on Pipelines 时，您可能会因为各种原因遇到问题。本主题提供有关常见错误以及如何解决这些错误的信息。

 **管道定义问题** 

您的管道定义可能格式不正确。这可能会导致  执行失败或作业不准确。可以在创建管道或执行管道时捕获这些错误。如果定义未通过验证，Pipelines 会返回一条错误信息，指出 JSON 文件畸形的字符。要修复此问题，请查看使用 SageMaker AI Python SDK 创建的步骤以确保准确性。

您只能在管道定义中包含一次步骤。因此，步骤不能作为条件步骤*和* 管道的一部分存在于同一管道中。

 **检查管道日志** 

您可以使用以下命令查看步骤的状态：

```
execution.list_steps()
```

每个步骤包含以下信息：
+ 管道启动的实体的 ARN，例如 SageMaker AI 作业 ARN、模型 ARN 或模型包 ARN。
+ 失败原因包括对步骤失败的简要说明。
+ 如果该步骤是条件步骤，则包括条件评估为 true 还是 false。  
+ 如果执行重复使用先前的作业执行，则 `CacheHit` 会列出源执行。  

您还可以在 Amazon SageMaker Studio 界面中查看错误消息和日志。有关如何在 Studio 中查看日志的信息，请参阅[查看管道运行的详细信息](pipelines-studio-view-execution.md)。

 **缺少权限** 

创建管道执行的角色以及在管道执行中创建每个作业的步骤都需要正确权限。如果没有这些权限，您可能无法按预期提交管道执行或运行 SageMaker AI 作业。要确保您的权限设置正确，请参阅 [IAM 访问管理](build-and-manage-access.md)。

 **作业执行错误** 

由于定义 SageMaker AI 作业功能的脚本存在问题，您在执行步骤时可能会遇到问题。每个作业都有一组日 CloudWatch 志。 要从 Studio 查看这些日志，请参阅[查看管道运行的详细信息](pipelines-studio-view-execution.md)。有关在 SageMaker AI 中使用 CloudWatch 日志的信息，请参阅[CloudWatch 亚马逊 A SageMaker I 的日志](logging-cloudwatch.md)。

 **属性文件错误** 

如果在管道中不正确地实施属性文件，可能会出现问题。要确保属性文件的实施按预期运行，请参阅[在步骤之间传递数据](build-and-manage-propertyfile.md)。

 **将脚本复制到 Dockerfile 中的容器的问题** 

您可以将脚本复制到容器中，也可以通过 `entry_point` 参数（估算器实体）或 `code` 参数（处理器实体）来传递脚本，如以下代码示例所示。

```
step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    processor=sklearn_processor,
    inputs = [
        ProcessingInput(
            input_name='dataset',
            source=...,
            destination="/opt/ml/processing/code",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination = processed_data_path),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination = processed_data_path),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination = processed_data_path),
    ],
    code=os.path.join(BASE_DIR, "process.py"), ## Code is passed through an argument
    cache_config = cache_config,
    job_arguments = ['--input', 'arg1']
)

sklearn_estimator = SKLearn(
    entry_point=os.path.join(BASE_DIR, "train.py"), ## Code is passed through the entry_point
    framework_version="0.23-1",
    instance_type=training_instance_type,
    role=role,
    output_path=model_path, # New
    sagemaker_session=sagemaker_session, # New
    instance_count=1, # New
    base_job_name=f"{base_job_prefix}/pilot-train",
    metric_definitions=[
        {'Name': 'train:accuracy', 'Regex': 'accuracy_train=(.*?);'},
        {'Name': 'validation:accuracy', 'Regex': 'accuracy_validation=(.*?);'}
    ],
)
```