

# Pipelines
ML Pipelines

Amazon SageMaker Pipelines is a purpose-built workflow orchestration service to automate machine learning (ML) development.

Pipelines provide the following advantages over other AWS workflow offerings:

**Auto-scaling serverless infrastructure** You don't need to manage the underlying orchestration infrastructure to run Pipelines, which allows you to focus on core ML tasks. SageMaker AI automatically provisions, scales, and shuts down the pipeline orchestration compute resources as your ML workload demands.

**Intuitive user experience** Pipelines can be created and managed through your interface of choice: visual editor, SDK, APIs, or JSON. You can drag-and-drop the various ML steps to author your pipelines in the Amazon SageMaker Studio visual interface. The following screenshot shows the Studio visual editor for pipelines.

![\[Screenshot of the visual drag-and-drop interface for Pipelines in Studio.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipelines/pipelines-studio-overview.png)


If you prefer managing your ML workflows programmatically, the SageMaker Python SDK offers advanced orchestration features. For more information, see [Amazon SageMaker Pipelines](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html) in the SageMaker Python SDK documentation.

**AWS integrations** Pipelines provide seamless integration with all SageMaker AI features and other AWS services to automate data processing, model training, fine-tuning, evaluation, deployment, and monitoring jobs. You can incorporate the SageMaker AI features in your Pipelines and navigate across them using deep links to create, monitor, and debug your ML workflows at scale.

**Reduced costs** With Pipelines, you only pay for the SageMaker Studio environment and the underlying jobs that are orchestrated by Pipelines (for example, SageMaker Training, SageMaker Processing, SageMaker AI Inference, and Amazon S3 data storage).

**Auditability and lineage tracking** With Pipelines, you can track the history of pipeline updates and executions using built-in versioning. Amazon SageMaker ML Lineage Tracking helps you analyze the data sources and data consumers in the end-to-end ML development lifecycle.

**Topics**
+ [

# Pipelines overview
](pipelines-overview.md)
+ [

# Pipelines actions
](pipelines-build.md)

# Pipelines overview
Pipelines overview

An Amazon SageMaker AI pipeline is a series of interconnected steps in directed acyclic graph (DAG) that are defined using the drag-and-drop UI or [Pipelines SDK](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html). You can also build your pipeline using the [pipeline definition JSON schema](https://aws-sagemaker-mlops.github.io/sagemaker-model-building-pipeline-definition-JSON-schema/). This DAG JSON definition gives information on the requirements and relationships between each step of your pipeline. The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is an example of a pipeline DAG:

![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipeline-full.png)


**The example DAG includes the following steps:**

1. `AbaloneProcess`, an instance of the [Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) step, runs a preprocessing script on the data used for training. For example, the script could fill in missing values, normalize numerical data, or split data into the train, validation, and test datasets.

1. `AbaloneTrain`, an instance of the [Training](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) step, configures hyperparameters and trains a model from the preprocessed input data.

1. `AbaloneEval`, another instance of the [Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) step, evaluates the model for accuracy. This step shows an example of a data dependency—this step uses the test dataset output of the `AbaloneProcess`.

1. `AbaloneMSECond` is an instance of a [Condition](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) step which, in this example, checks to make sure the mean-square-error result of model evaluation is below a certain limit. If the model does not meet the criteria, the pipeline run stops.

1. The pipeline run proceeds with the following steps:

   1. `AbaloneRegisterModel`, where SageMaker AI calls a [RegisterModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-register-model) step to register the model as a versioned model package group into the Amazon SageMaker Model Registry.

   1. `AbaloneCreateModel`, where SageMaker AI calls a [CreateModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-create-model) step to create the model in preparation for batch transform. In `AbaloneTransform`, SageMaker AI calls a [Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform) step to generate model predictions on a dataset you specify.

The following topics describe fundamental Pipelines concepts. For a tutorial describing the implementation of these concepts, see [Pipelines actions](pipelines-build.md).

**Topics**
+ [

# Pipeline Structure and Execution
](build-and-manage-pipeline.md)
+ [

# IAM Access Management
](build-and-manage-access.md)
+ [

# Set up cross-account support for Pipelines
](build-and-manage-xaccount.md)
+ [

# Pipeline parameters
](build-and-manage-parameters.md)
+ [

# Pipelines steps
](build-and-manage-steps.md)
+ [

# Lift-and-shift Python code with the @step decorator
](pipelines-step-decorator.md)
+ [

# Pass Data Between Steps
](build-and-manage-propertyfile.md)
+ [

# Caching pipeline steps
](pipelines-caching.md)
+ [

# Retry Policy for Pipeline Steps
](pipelines-retry-policy.md)
+ [

# Selective execution of pipeline steps
](pipelines-selective-ex.md)
+ [

# Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines
](pipelines-quality-clarify-baseline-lifecycle.md)
+ [

# Schedule Pipeline Runs
](pipeline-eventbridge.md)
+ [

# Amazon SageMaker Experiments Integration
](pipelines-experiments.md)
+ [

# Run pipelines using local mode
](pipelines-local-mode.md)
+ [

# Troubleshooting Amazon SageMaker Pipelines
](pipelines-troubleshooting.md)

# Pipeline Structure and Execution
Structure and Execution

**Topics**
+ [

## Pipeline Structure
](#build-and-manage-pipeline-structure)
+ [

## Pipeline Execution using Parallelism Configuration
](#build-and-manage-pipeline-execution)

## Pipeline Structure
Structure

An Amazon SageMaker Pipelines instance is composed of a `name`, `parameters`, and `steps`. Pipeline names must be unique within an `(account, region)` pair. All parameters used in step definitions must be defined in the pipeline. Pipeline steps listed automatically determine their order of execution by their data dependencies on one another. The Pipelines service resolves the relationships between steps in the data dependency DAG to create a series of steps that the execution completes. The following is an example of a pipeline structure.

**Warning**  
When building a pipeline through the visual editor or SageMaker AI Python SDK, do not include sensitive information in pipeline parameters or any step definition field (such as environment variables). These fields will be visible in the future when returned in a `DescribePipeline` request.

```
from sagemaker.workflow.pipeline import Pipeline
  
  pipeline_name = f"AbalonePipeline"
  pipeline = Pipeline(
      name=pipeline_name,
      parameters=[
          processing_instance_type, 
          processing_instance_count,
          training_instance_type,
          model_approval_status,
          input_data,
          batch_data,
      ],
      steps=[step_process, step_train, step_eval, step_cond],
  )
```

## Pipeline Execution using Parallelism Configuration
Execution

By default, a pipeline performs all steps that are available to run in parallel. You can control this behavior by using the `ParallelismConfiguration` property when creating or updating a pipeline, as well as when starting or retrying a pipeline execution. 

Parallelism configurations are applied per execution. For example, if two executions are started they can each run a maximum of 50 steps concurrently, for a total of 100 concurrently running steps. Also, `ParallelismConfiguration`(s) specified when starting, retrying or updating an execution take precedence over parallelism configurations defined in the pipeline.

**Example Creating a pipeline execution with `ParallelismConfiguration`**  

```
pipeline = Pipeline(
        name="myPipeline",
        steps=[step_process, step_train]
    )

  pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50})
```

# IAM Access Management
Access Management

The following sections describe the AWS Identity and Access Management (IAM) requirements for Amazon SageMaker Pipelines. For an example of how you can implement these permissions, see [Prerequisites](define-pipeline.md#define-pipeline-prereq).

**Topics**
+ [

## Pipeline Role Permissions
](#build-and-manage-role-permissions)
+ [

## Pipeline Step Permissions
](#build-and-manage-step-permissions)
+ [

## CORS configuration with Amazon S3 buckets
](#build-and-manage-cors-s3)
+ [

## Customize access management for Pipelines jobs
](#build-and-manage-step-permissions-prefix)
+ [

## Customize access to pipeline versions
](#build-and-manage-step-permissions-version)
+ [

## Service Control Policies with Pipelines
](#build-and-manage-scp)

## Pipeline Role Permissions


Your pipeline requires an IAM pipeline execution role that is passed to Pipelines when you create a pipeline. The role for the SageMaker AI instance you're using to create the pipeline must have a policy with the `iam:PassRole` permission that specifies the pipeline execution role. This is because the instance needs permission to pass your pipeline execution role to the Pipelines service for use in creating and running pipelines. For more information on IAM roles, see [IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html).

Your pipeline execution role requires the following permissions:
+ You can use a unique or customized role for any of the SageMaker AI job steps in your pipeline (rather than the pipeline execution role, which is used by default). Make sure that your pipeline execution role has added a policy with the `iam:PassRole` permission that specifies each of these roles.
+  `Create` and `Describe` permissions for each of the job types in the pipeline. 
+  Amazon S3 permissions to use the `JsonGet` function. You control access to your Amazon S3 resources using resource-based policies and identity-based policies. A resource-based policy is applied to your Amazon S3 bucket and grants Pipelines access to the bucket. An identity-based policy gives your pipeline the ability to make Amazon S3 calls from your account. For more information on resource-based policies and identity-based policies, see [Identity-based policies and resource-based policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). 

  ```
  {
      "Action": [
          "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::<your-bucket-name>/*",
      "Effect": "Allow"
  }
  ```

## Pipeline Step Permissions


Pipelines include steps that run SageMaker AI jobs. In order for the pipeline steps to run these jobs, they require an IAM role in your account that provides access for the needed resource. This role is passed to the SageMaker AI service principal by your pipeline. For more information on IAM roles, see [IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html). 

By default, each step takes on the pipeline execution role. You can optionally pass a different role to any of the steps in your pipeline. This ensures that the code in each step does not have the ability to impact resources used in other steps unless there is a direct relationship between the two steps specified in the pipeline definition. You pass these roles when defining the processor or estimator for your step. For examples of how to include these roles in these definitions, see the [SageMaker AI Python SDK documentation](https://sagemaker.readthedocs.io/en/stable/overview.html#using-estimators). 

## CORS configuration with Amazon S3 buckets


To ensure your images are imported into your Pipelines from an Amazon S3 bucket in a predictable manner, a CORS configuration must be added to Amazon S3 buckets where images are imported from. This section provides instructions on how to set the required CORS configuration to your Amazon S3 bucket. The XML `CORSConfiguration` required for Pipelines differs from the one in [CORS Requirement for Input Image Data](sms-cors-update.md), otherwise you can use the information there to learn more about the CORS requirement with Amazon S3 buckets.

Use the following CORS configuration code for the Amazon S3 buckets that host your images. For instructions on configuring CORS, see [Configuring cross-origin resource sharing (CORS)](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/add-cors-configuration.html) in the Amazon Simple Storage Service User Guide. If you use the Amazon S3 console to add the policy to your bucket, you must use the JSON format.

**JSON**

```
[
    {
        "AllowedHeaders": [
            "*"
        ],
        "AllowedMethods": [
            "PUT"
        ],
        "AllowedOrigins": [
            "*"
        ],
        "ExposeHeaders": [
            "Access-Control-Allow-Origin"
        ]
    }
]
```

**XML**

```
<CORSConfiguration>
 <CORSRule>
   <AllowedHeader>*</AllowedHeader>
   <AllowedOrigin>*</AllowedOrigin>
   <AllowedMethod>PUT</AllowedMethod>
   <ExposeHeader>Access-Control-Allow-Origin</ExposeHeader>
 </CORSRule>
</CORSConfiguration>
```

The following GIF demonstrates the instructions found in the Amazon S3 documentation to add a CORS header policy using the Amazon S3 console.

![\[Gif on how to add a CORS header policy using the Amazon S3 console.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/sms/gifs/cors-config.gif)


## Customize access management for Pipelines jobs


You can further customize your IAM policies so selected members in your organization can run any or all pipeline steps. For example, you can give certain users permission to create training jobs, and another group of users permission to create processing jobs, and all of your users permission to run the remaining steps. To use this feature, you select a custom string which prefixes your job name. Your admin prepends the permitted ARNs with the prefix while your data scientist includes this prefix in pipeline instantiations. Because the IAM policy for permitted users contains a job ARN with the specified prefix, subsequent jobs of your pipeline step have necessary permissions to proceed. Job prefixing is off by default—you must toggle on this option in your `Pipeline` class to use it. 

For jobs with prefixing turned off, the job name is formatted as shown and is a concatenation of fields described in the following table:

`pipelines-<executionId>-<stepNamePrefix>-<entityToken>-<failureCount>`


| Field | Definition | 
| --- | --- | 
|  pipelines   |  A static string always prepended. This string identifies the pipeline orchestration service as the job's source.  | 
|  executionId  |  A randomized buffer for the running instance of the pipeline.  | 
|  stepNamePrefix  |  The user-specified step name (given in the `name` argument of the pipeline step), limited to the first 20 characters.  | 
|  entityToken  |  A randomized token to ensure idempotency of the step entity.  | 
|  failureCount  |  The current number of retries attempted to complete the job.  | 

In this case, no custom prefix is prepended to the job name, and the corresponding IAM policy must match this string.

For users who turn on job prefixing, the underlying job name takes the following form, with the custom prefix specified as `MyBaseJobName`:

*<MyBaseJobName>*-*<executionId>*-*<entityToken>*-*<failureCount>*

The custom prefix replaces the static `pipelines` string to help you narrow the selection of users who can run the SageMaker AI job as a part of a pipeline.

**Prefix length restrictions**

The job names have internal length constraints specific to individual pipeline steps. This constraint also limits the length of the allowed prefix. The prefix length requirements are as follows:


| Pipeline step | Prefix length | 
| --- | --- | 
|   `[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`, `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`, `[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`, `[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`, `[ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)`, `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)`, `[RegisterModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`   |  38  | 
|  `[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`, `[AutoML](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`  |  6  | 

### Apply job prefixes to an IAM policy


Your admin creates IAM policies allowing users of specific prefixes to create jobs. The following example policy permits data scientists to create training jobs if they use the `MyBaseJobName` prefix. 

```
{
    "Action": "sagemaker:CreateTrainingJob",
    "Effect": "Allow",
    "Resource": [
        "arn:aws:sagemaker:region:account-id:*/MyBaseJobName-*"
    ]
}
```

### Apply job prefixes to pipeline instantiations


You specify your prefix with the `*base_job_name` argument of the job instance class.

**Note**  
You pass your job prefix with the `*base_job_name` argument to the job instance before creating a pipeline step. This job instance contains the necessary information for the job to run as a step in a pipeline. This argument varies depending upon the job instance used. The following list shows which argument to use for each pipeline step type:  
`base_job_name` for the `[Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)` (`[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`), `[Processor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html)` (`[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`), and `[AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html)` (`[AutoMLStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`) classes
`tuning_base_job_name` for the `[Tuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html)` class (`[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`)
`transform_base_job_name` for the `[Transformer](https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html)` class (`[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`)
`base_job_name` of `[CheckJobConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#checkjobconfig)` for the `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)` (Quality Check) and `[ClarifyCheckstep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)` (Clarify Check) classes
For the `[Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html)` class, the argument used depends on if you run `create` or `register` on your model before passing the result to `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`  
If you call `create`, the custom prefix comes from the `name` argument when you construct your model (i.e., `Model(name=)`)
If you call `register`, the custom prefix comes from the `model_package_name` argument of your call to `register` (i.e., `my_model.register(model_package_name=)`)

The following example shows how to specify a prefix for a new training job instance.

```
# Create a job instance
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
    subnets=["subnet-0ab12c34567de89f0"],
    base_job_name="MyBaseJobName"
    security_group_ids=["sg-1a2bbcc3bd4444e55"],
    tags = [ ... ]
    encrypt_inter_container_traffic=True, 
)

# Attach your job instance to a pipeline step
step_train = TrainingStep(
    name="TestTrainingJob",
    estimator=xgb_train, 
    inputs={
        "train": TrainingInput(...), 
        "validation": TrainingInput(...) 
    }
)
```

Job prefixing is off by default. To opt into this feature, use the `use_custom_job_prefix` option of `PipelineDefinitionConfig` as shown in the following snippet:

```
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
        
# Create a definition configuration and toggle on custom prefixing
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True);

# Create a pipeline with a custom prefix
 pipeline = Pipeline(
     name="MyJobPrefixedPipeline",
     parameters=[...]
     steps=[...]
     pipeline_definition_config=definition_config
)
```

Create and run your pipeline. The following example creates and runs a pipeline, and also demonstrates how you can turn off job prefixing and rerun your pipeline.

```
pipeline.create(role_arn=sagemaker.get_execution_role())

# Optionally, call definition() to confirm your prefixed job names are in the built JSON
pipeline.definition()
pipeline.start()
      
# To run a pipeline without custom-prefixes, toggle off use_custom_job_prefix, update the pipeline 
# via upsert() or update(), and start a new run
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=False)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

Similarly, you can toggle the feature on for existing pipelines and start a new run which uses job prefixes.

```
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

Finally, you can view your custom-prefixed job by calling `list_steps` on the pipeline execution.

```
steps = execution.list_steps()

prefixed_training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']
```

## Customize access to pipeline versions


You can grant customized access to specific versions of Amazon SageMaker Pipelines by using the `sagemaker:PipelineVersionId` condition key. For example, the policy below grants access to start executions or update pipeline version only for version ID 6 and above.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": {
        "Sid": "AllowStartPipelineExecution",
        "Effect": "Allow",
        "Action": [
            "sagemaker:StartPipelineExecution",
            "sagemaker:UpdatePipelineVersion"
        ],
        "Resource": "*",
        "Condition": {
            "NumericGreaterThanEquals": {
                "sagemaker:PipelineVersionId": 6
            }
        }
    }
}
```

------

For more information about supported condition keys, see [Condition keys for Amazon SageMaker AI](https://docs.aws.amazon.com//service-authorization/latest/reference/list_amazonsagemaker.html#amazonsagemaker-policy-keys).

## Service Control Policies with Pipelines


Service control policies (SCPs) are a type of organization policy that you can use to manage permissions in your organization. SCPs offer central control over the maximum available permissions for all accounts in your organization. By using Pipelines within your organization, you can ensure that data scientists manage your pipeline executions without having to interact with the AWS console. 

If you're using a VPC with your SCP that restricts access to Amazon S3, you need to take steps to allow your pipeline to access other Amazon S3 resources. 

To allow Pipelines to access Amazon S3 outside of your VPC with the `JsonGet` function, update your organization's SCP to ensure that the role using Pipelines can access Amazon S3. To do this, create an exception for roles that are being used by the Pipelines executor via the pipeline execution role using a principal tag and condition key. 

**To allow Pipelines to access Amazon S3 outside of your VPC**

1. Create a unique tag for your pipeline execution role following the steps in [Tagging IAM users and roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html). 

1. Grant an exception in your SCP using the `Aws:PrincipalTag IAM` condition key for the tag you created. For more information, see [Creating, updating, and deleting service control policies](https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps_create.html). 

# Set up cross-account support for Pipelines
Set up cross-account support

Cross-account support for Amazon SageMaker Pipelines enables you to collaborate on machine learning pipelines with other teams or organizations that operate in different AWS accounts. By setting up cross-account pipeline sharing, you can grant controlled access to pipelines, allow other accounts to view pipeline details, trigger executions, and monitor runs. The following topic covers how to set up cross-account pipeline sharing, the different permission policies available for shared resources, and how to access and interact with shared pipeline entities through direct API calls to SageMaker AI.

## Set up cross-account pipeline sharing
Set up cross-account pipeline sharing

SageMaker AI uses [AWS Resource Access Manager](https://docs.aws.amazon.com/ram/latest/userguide/what-is.html) (AWS RAM) to help you securely share your pipeline entities across accounts. 

### Create a resource share


1. Select **Create a resource share** through the [AWS RAM console](https://console.aws.amazon.com/ram/home).

1. When specifying resource share details, choose the Pipelines resource type and select one or more pipelines that you want to share. When you share a pipeline with any other account, all of its executions are also shared implicitly.

1. Associate permissions with your resource share. Choose either the default read-only permission policy or the extended pipeline execution permission policy. For more detailed information, see [Permission policies for Pipelines resources](#build-and-manage-xaccount-permissions). 
**Note**  
If you select the extended pipeline execution policy, note that any start, stop, and retry commands called by shared accounts use resources in the AWS account that shared the pipeline.

1. Use AWS account IDs to specify the accounts to which you want to grant access to your shared resources.

1. Review your resource share configuration and select **Create resource share**. It may take a few minutes for the resource share and principal associations to complete.

For more information, see [Sharing your AWS resources](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-sharing.html) in the *AWS Resource Access Manager User Guide*.

### Get responses to your resource share invitation


Once the resource share and principal associations are set, the specified AWS accounts receive an invitation to join the resource share. The AWS accounts must accept the invite to gain access to any shared resources.

For more information on accepting a resource share invite through AWS RAM, see [Using shared AWS resources ](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-shared.html)in the *AWS Resource Access Manager User Guide*.

## Permission policies for Pipelines resources


When creating your resource share, choose one of two supported permission policies to associate with the SageMaker AI pipeline resource type. Both policies grant access to any selected pipeline and all of its executions. 

### Default read-only permissions


The `AWSRAMDefaultPermissionSageMakerPipeline` policy allows the following read-only actions:

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:Search"
```

### Extended pipeline execution permissions


The `AWSRAMPermissionSageMakerPipelineAllowExecution` policy includes all of the read-only permissions from the default policy and also allows shared accounts to start, stop, and retry pipeline executions.

**Note**  
Be mindful of AWS resource usage when using the extended pipeline execution permission policy. With this policy, shared accounts are allowed to start, stop, and retry pipeline executions. Any resources used for shared pipeline executions are consumed by the owner account. 

The extended pipeline execution permission policy allows the following actions:

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:StartPipelineExecution"
"sagemaker:StopPipelineExecution"
"sagemaker:RetryPipelineExecution"
"sagemaker:Search"
```

## Access shared pipeline entities through direct API calls


Once cross-account pipeline sharing is set up, you can call the following SageMaker API actions using a pipeline ARN:

**Note**  
You can only call API commands if they are included in the permissions associated with your resource share. If you select the `AWSRAMPermissionSageMakerPipelineAllowExecution` policy, then the start, stop, and retry commands use resources in the AWS account that shared the pipeline.
+ [DescribePipeline](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipeline.html)
+ [DescribePipelineDefinitionForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineDefinitionForExecution.html)
+ [DescribePipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html)
+ [ListPipelineExecutions](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutions.html)
+ [ListPipelineExecutionSteps](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutionSteps.html)
+ [ListPipelineParametersForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineParametersForExecution.html)
+ [StartPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+ [StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html)
+ [RetryPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryPipelineExecution.html)

# Pipeline parameters


You can introduce variables into your pipeline definition using parameters. You can reference parameters that you define throughout your pipeline definition. Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution. The default value must be an instance matching the parameter type. All parameters used in step definitions must be defined in your pipeline definition. This topic describes the parameters that you can define and how to implement them.

Amazon SageMaker Pipelines supports the following parameter types: 
+  `ParameterString` – Representing a string parameter. 
+  `ParameterInteger` – Representing an integer parameter. 
+  `ParameterFloat` – Representing a float parameter.
+  `ParameterBoolean` – Representing a Boolean Python type.

Parameters take the following format:

```
<parameter> = <parameter_type>(
    name="<parameter_name>",
    default_value=<default_value>
)
```

The following example shows a sample parameter implementation.

```
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
```

You pass the parameter when creating your pipeline as shown in the following example.

```
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count
    ],
    steps=[step_process]
)
```

You can also pass a parameter value that differs from the default value to a pipeline execution, as shown in the following example.

```
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceCount="2",
        ModelApprovalStatus="Approved"
    )
)
```

You can manipulate parameters with SageMaker Python SDK functions like `[ sagemaker.workflow.functions.Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.functions.Join)`. For more information on parameters, see [ SageMaker Pipelines Parameters](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters).

For known limitations of Pipelines Parameters, see *[Limitations - Parameterization](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#parameterization)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

# Pipelines steps
Pipelines Steps

Pipelines are composed of steps. These steps define the actions that the pipeline takes and the relationships between steps using properties. The following page describes the types of steps, their properties, and the relationships between them.

**Topics**
+ [

# Add a step
](build-and-manage-steps-types.md)
+ [

# Add integration
](build-and-manage-steps-integration.md)
+ [

## Step properties
](#build-and-manage-properties)
+ [

## Step parallelism
](#build-and-manage-parallelism)
+ [

## Data dependency between steps
](#build-and-manage-data-dependency)
+ [

## Custom dependency between steps
](#build-and-manage-custom-dependency)
+ [

## Custom images in a step
](#build-and-manage-images)

# Add a step


The following describes the requirements of each step type and provides an example implementation of the step, as well as how to add the step to a Pipelines. These are not working implementations because they don't provide the resource and inputs needed. For a tutorial that implements these steps, see [Pipelines actions](pipelines-build.md).

**Note**  
You can also create a step from your local machine learning code by converting it to a Pipelines step with the `@step` decorator. For more information, see [@step decorator](#step-type-custom).

Amazon SageMaker Pipelines support the following step types:
+ [Execute code](#step-type-executecode)

  [Processing](#step-type-processing)
+ [Training](#step-type-training)
+ [Tuning](#step-type-tuning)
+ [AutoML](#step-type-automl)
+ [`Model`](#step-type-model)
+ [`Create model`](#step-type-create-model)
+ [`Register model`](#step-type-register-model)
+ [`Deploy model (endpoint)`](#step-type-deploy-model-endpoint)
+ [Transform](#step-type-transform)
+ [Condition](#step-type-condition)
+ [`Callback`](#step-type-callback)
+ [Lambda](#step-type-lambda)
+ [`ClarifyCheck`](#step-type-clarify-check)
+ [`QualityCheck`](#step-type-quality-check)
+ [EMR](#step-type-emr)
+ [Notebook Job](#step-type-notebook-job)
+ [Fail](#step-type-fail)

## @step decorator


If you want to orchestrate a custom ML job that leverages advanced SageMaker AI features or other AWS services in the drag-and-drop Pipelines UI, use the [Execute code step](#step-type-executecode).

You can create a step from local machine learning code using the `@step` decorator. After you test your code, you can convert the function to a SageMaker AI pipeline step by annotating it with the `@step` decorator. Pipelines creates and runs a pipeline when you pass the output of the `@step`-decorated function as a step to your pipeline. You can also create a multi-step DAG pipeline that includes one or more `@step`-decorated functions as well as traditional SageMaker AI pipeline steps. For more details about how to create a step with `@step` decorator, see [Lift-and-shift Python code with the @step decorator](pipelines-step-decorator.md).

## Execute code step
Execute code

In the Pipelines drag-and-drop UI, you can use an **Execute code** step to run your own code as a pipeline step. You can upload a Python function, script, or notebook to be executed as part of your pipeline. You should use this step if you want to orchestrate a custom ML job that leverages advanced SageMaker AI features or other AWS services.

The **Execute Code** step uploads files to your default Amazon S3 bucket for Amazon SageMaker AI. This bucket might not have the required Cross-Origin Resource Sharing (CORS) permissions set. To learn more about configuring CORS permissions, see [CORS Requirement for Input Image Data](sms-cors-update.md).

The **Execute Code** step uses an Amazon SageMaker training job to run your code. Ensure that your IAM role has the `sagemaker:DescribeTrainingJob` and `sagemaker:CreateTrainingJob` API permissions. To learn more about all the required permissions for Amazon SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add an execute code step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Execute code** and drag it to the canvas.

1. In the canvas, choose the **Execute code** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. You can upload a single file to execute or upload a compressed folder containing multiple artifacts.

1. For single file uploads, you can provide optional parameters for notebooks, python functions, or scripts.

1. When providing Python functions, a handler must be provided in the format `file.py:<function_name>`

1. For compressed folder uploads, relative paths to your code must be provided, and you can optionally provide paths to a `requirements.txt` file or initialization script inside the compressed folder.

1. If the canvas includes any step that immediately precedes the **Execute code** step you added, click and drag the cursor from the step to the **Execute code** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Execute code** step you added, click and drag the cursor from the **Execute code** step to the step to create an edge. Outputs from **Execute code** steps can be referenced for Python functions.

## Processing step
Processing

Use a processing step to create a processing job for data processing. For more information on processing jobs, see [Process Data and Evaluate Models](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html).

------
#### [ Pipeline Designer ]

To add a processing step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. In the left sidebar, choose **Process data** and drag it to the canvas.

1. In the canvas, choose the **Process data** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep).

1. If the canvas includes any step that immediately precedes the **Process data** step you added, click and drag the cursor from the step to the **Process data** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Process data** step you added, click and drag the cursor from the **Process data** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

A processing step requires a processor, a Python script that defines the processing code, outputs for processing, and job arguments. The following example shows how to create a `ProcessingStep` definition. 

```
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='1.0-1',
                                     role=<role>,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=1)
```

```
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

inputs = [
    ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),
]

outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args = sklearn_processor.run(inputs=inputs, outputs=outputs,
        code="abalone/preprocessing.py")
)
```

**Pass runtime parameters**

The following example shows how to pass runtime parameters from a PySpark processor to a `ProcessingStep`.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

pipeline_session = PipelineSession()

pyspark_processor = PySparkProcessor(
    framework_version='2.4',
    role=<role>,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    sagemaker_session=pipeline_session,
)

step_args = pyspark_processor.run(
    inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="preprocess.py",
    arguments=None,
)


step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args=step_args,
)
```

For more information on processing step requirements, see the [sagemaker.workflow.steps.ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) documentation. For an in-depth example, see the [Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines](https://github.com/aws/amazon-sagemaker-examples/blob/62de6a1fca74c7e70089d77e36f1356033adbe5f/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb) example notebook. The *Define a Processing Step for Feature Engineering* section includes more information.

------

## Training step
Training

You use a training step to create a training job to train a model. For more information on training jobs, see [Train a Model with Amazon SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html).

A training step requires an estimator, as well as training and validation data inputs.

------
#### [ Pipeline Designer ]

To add a training step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Train model** and drag it to the canvas.

1. In the canvas, choose the **Train model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep).

1. If the canvas includes any step that immediately precedes the **Train model** step you added, click and drag the cursor from the step to the **Train model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Train model** step you added, click and drag the cursor from the **Train model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

The following example shows how to create a `TrainingStep` definition. For more information about training step requirements, see the [sagemaker.workflow.steps.TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) documentation.

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.xgboost.estimator import XGBoost

pipeline_session = PipelineSession()

xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session)

step_args = xgb_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
)
```

------

## Tuning step
Tuning

You use a tuning step to create a hyperparameter tuning job, also known as hyperparameter optimization (HPO). A hyperparameter tuning job runs multiple training jobs, with each job producing a model version. For more information on hyperparameter tuning, see [Automatic model tuning with SageMaker AI](automatic-model-tuning.md).

The tuning job is associated with the SageMaker AI experiment for the pipeline, with the training jobs created as trials. For more information, see [Experiments Integration](pipelines-experiments.md).

A tuning step requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) and training inputs. You can retrain previous tuning jobs by specifying the `warm_start_config` parameter of the `HyperparameterTuner`. For more information on hyperparameter tuning and warm start, see [Run a Warm Start Hyperparameter Tuning Job](automatic-model-tuning-warm-start.md).

You use the [get\$1top\$1model\$1s3\$1uri](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) method of the [sagemaker.workflow.steps.TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) class to get the model artifact from one of the top-performing model versions. For a notebook that shows how to use a tuning step in a SageMaker AI pipeline, see [sagemaker-pipelines-tuning-step.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/tuning-step/sagemaker-pipelines-tuning-step.ipynb).

**Important**  
Tuning steps were introduced in Amazon SageMaker Python SDK v2.48.0 and Amazon SageMaker Studio Classic v3.8.0. You must update Studio Classic before you use a tuning step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

The following example shows how to create a `TuningStep` definition.

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep

tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession())
    
step_tuning = TuningStep(
    name = "HPTuning",
    step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data"))
)
```

**Get the best model version**

The following example shows how to get the best model version from the tuning job using the `get_top_model_s3_uri` method. At most, the top 50 performing versions are available ranked according to [HyperParameterTuningJobObjective](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HyperParameterTuningJobObjective.html). The `top_k` argument is an index into the versions, where `top_k=0` is the best-performing version and `top_k=49` is the worst-performing version.

```
best_model = Model(
    image_uri=image_uri,
    model_data=step_tuning.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=sagemaker_session.default_bucket()
    ),
    ...
)
```

For more information on tuning step requirements, see the [sagemaker.workflow.steps.TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) documentation.

## Fine-tuning step
Fine-tuning

Fine-tuning trains a pretrained foundation model from Amazon SageMaker JumpStart on a new dataset. This process, also known as transfer learning, can produce accurate models with smaller datasets and less training time. When you fine-tune a model, you can use the default dataset or choose your own data. To learn more about fine-tuning a foundation model from JumpStart, see [Fine-Tune a Model](jumpstart-fine-tune.md).

The fine-tuning step uses an Amazon SageMaker training job to customize your model. Ensure that your IAM role has the `sagemaker:DescribeTrainingJob` and `sagemaker:CreateTrainingJob` API permissions to execute the fine-tuning job in your pipeline. To learn more about the required permissions for Amazon SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add a **Fine-tune model** step to your pipeline using the drag-and-drop editor, follow these steps:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Fine-tune model** and drag it to the canvas.

1. In the canvas, choose the **Fine-tune model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. If the canvas includes any step that immediately precedes the **Fine-tune model** step you added, click and drag the cursor from the step to the **Fine-tune model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Fine-tune model** step you added, click and drag the cursor from the **Fine-tune model** step to the step to create an edge.

## AutoML step
AutoML

Use the [AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html) API to create an AutoML job to automatically train a model. For more information on AutoML jobs, see [Automate model development with Amazon SageMaker Autopilot](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-automate-model-development.html). 

**Note**  
Currently, the AutoML step supports only [ensembling training mode](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-model-support-validation.html).

The following example shows how to create a definition using `AutoMLStep`.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.automl_step import AutoMLStep

pipeline_session = PipelineSession()

auto_ml = AutoML(...,
    role="<role>",
    target_attribute_name="my_target_attribute_name",
    mode="ENSEMBLING",
    sagemaker_session=pipeline_session) 

input_training = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-training-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="training",
)
input_validation = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-validation-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="validation",
)

step_args = auto_ml.fit(
    inputs=[input_training, input_validation]
)

step_automl = AutoMLStep(
    name="AutoMLStep",
    step_args=step_args,
)
```

**Get the best model version**

The AutoML step automatically trains several model candidates. Get the model with the best objective metric from the AutoML job using the `get_best_auto_ml_model` method as follows. You must also use an IAM `role` to access model artifacts.

```
best_model = step_automl.get_best_auto_ml_model(role=<role>)
```

For more information, see the [AutoML](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.automl_step.AutoMLStep) step in the SageMaker Python SDK.

## Model step
`Model`

Use a `ModelStep` to create or register a SageMaker AI model. For more information on `ModelStep` requirements, see the [sagemaker.workflow.model\$1step.ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) documentation.

### Create a model


You can use a `ModelStep` to create a SageMaker AI model. A `ModelStep` requires model artifacts and information about the SageMaker AI instance type that you need to use to create the model. For more information about SageMaker AI models, see [Train a Model with Amazon SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html).

The following example shows how to create a `ModelStep` definition.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

step_train = TrainingStep(...)
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=PipelineSession(),
    role=role,
)

step_model_create = ModelStep(
   name="MyModelCreationStep",
   step_args=model.create(instance_type="ml.m5.xlarge"),
)
```

### Register a model


You can use a `ModelStep` to register a `sagemaker.model.Model` or a `sagemaker.pipeline.PipelineModel` with the Amazon SageMaker Model Registry. A `PipelineModel` represents an inference pipeline, which is a model composed of a linear sequence of containers that process inference requests. For more information about how to register a model, see [Model Registration Deployment with Model Registry](model-registry.md).

The following example shows how to create a `ModelStep` that registers a `PipelineModel`.

```
import time

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

pipeline_session = PipelineSession()

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(
   model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
   entry_point='inference.py',
   source_dir='sklearn_source_dir/',
   code_location=code_location,
   framework_version='1.0-1',
   role=role,
   sagemaker_session=pipeline_session,
   py_version='py3'
)

xgboost_model = XGBoostModel(
   model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
   entry_point='inference.py',
   source_dir='xgboost_source_dir/',
   code_location=code_location,
   framework_version='0.90-2',
   py_version='py3',
   sagemaker_session=pipeline_session,
   role=role
)

from sagemaker.workflow.model_step import ModelStep
from sagemaker import PipelineModel

pipeline_model = PipelineModel(
   models=[sklearn_model, xgboost_model],
   role=role,sagemaker_session=pipeline_session,
)

register_model_step_args = pipeline_model.register(
    content_types=["application/json"],
   response_types=["application/json"],
   inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
   transform_instances=["ml.m5.xlarge"],
   model_package_group_name='sipgroup',
)

step_model_registration = ModelStep(
   name="AbaloneRegisterModel",
   step_args=register_model_step_args,
)
```

## Create model step
`Create model`

You use a Create model step to create a SageMaker AI model. For more information on SageMaker AI models, see [Train a Model with Amazon SageMaker](how-it-works-training.md).

A create model step requires model artifacts and information about the SageMaker AI instance type that you need to use to create the model. The following examples show how to create a Create model step definition. For more information about Create model step requirements, see the [sagemaker.workflow.steps.CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep) documentation.

------
#### [ Pipeline Designer ]

To add a create model step to your pipeline, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Create model** and drag it to the canvas.

1. In the canvas, choose the **Create model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep).

1. If the canvas includes any step that immediately precedes the **Create model** step you added, click and drag the cursor from the step to the **Create model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Create model** step you added, click and drag the cursor from the **Create model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Important**  
We recommend using [Model step](#step-type-model) to create models as of v2.90.0 of the SageMaker AI Python SDK. `CreateModelStep` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

```
from sagemaker.workflow.steps import CreateModelStep

step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=best_model,
    inputs=inputs
)
```

------

## Register model step
`Register model`

The Register model step registers a model into the SageMaker Model Registry.

------
#### [ Pipeline Designer ]

To register a model from a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Register model** and drag it to the canvas.

1. In the canvas, choose the **Register model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.step\$1collections.RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel).

1. If the canvas includes any step that immediately precedes the **Register model** step you added, click and drag the cursor from the step to the **Register model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Register model** step you added, click and drag the cursor from the **Register model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Important**  
We recommend using [Model step](#step-type-model) to register models as of v2.90.0 of the SageMaker AI Python SDK. `RegisterModel` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

You use a `RegisterModel` step to register a [sagemaker.model.Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html) or a [sagemaker.pipeline.PipelineModel](https://sagemaker.readthedocs.io/en/stable/api/inference/pipeline.html#pipelinemodel) with the Amazon SageMaker Model Registry. A `PipelineModel` represents an inference pipeline, which is a model composed of a linear sequence of containers that process inference requests.

For more information about how to register a model, see [Model Registration Deployment with Model Registry](model-registry.md). For more information on `RegisterModel` step requirements, see the [sagemaker.workflow.step\$1collections.RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel) documentation.

The following example shows how to create a `RegisterModel` step that registers a `PipelineModel`.

```
import time
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
 entry_point='inference.py',
 source_dir='sklearn_source_dir/',
 code_location=code_location,
 framework_version='1.0-1',
 role=role,
 sagemaker_session=sagemaker_session,
 py_version='py3')

xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 entry_point='inference.py',
 source_dir='xgboost_source_dir/',
 code_location=code_location,
 framework_version='0.90-2',
 py_version='py3',
 sagemaker_session=sagemaker_session,
 role=role)

from sagemaker.workflow.step_collections import RegisterModel
from sagemaker import PipelineModel
pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session)

step_register = RegisterModel(
 name="AbaloneRegisterModel",
 model=pipeline_model,
 content_types=["application/json"],
 response_types=["application/json"],
 inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name='sipgroup',
)
```

If `model` isn't provided, the register model step requires an estimator as shown in the following example.

```
from sagemaker.workflow.step_collections import RegisterModel

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)
```

------

## Deploy model (endpoint) step
`Deploy model (endpoint)`

In the Pipeline Designer, use the Deploy model (endpoint) step to deploy your model to an endpoint. You can create a new endpoint or use an existing endpoint. Real-time inference is ideal for inference workloads where you have real-time, interactive, low latency requirements. You can deploy your model to SageMaker AI Hosting services and get a real-time endpoint that can be used for inference. These endpoints are fully managed and support auto-scaling. To learn more about real-time inference in SageMaker AI, see [Real-time inference](realtime-endpoints.md).

Before adding a deploy model step to your pipeline, make sure that your IAM role has the following permissions:
+ `sagemaker:CreateModel`
+ `sagemaker:CreateEndpointConfig`
+ `sagemaker:CreateEndpoint`
+ `sagemaker:UpdateEndpoint`
+ `sagemaker:DescribeModel`
+ `sagemaker:DescribeEndpointConfig`
+ `sagemaker:DescribeEndpoint`

To learn more about all the required permissions for SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add a model deployment step to your Pipeline in the drag-and-drop editor, complete the following steps:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Deploy model (endpoint)** and drag it to the canvas.

1. In the canvas, choose the **Deploy model (endpoint)** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. If the canvas includes any step that immediately precedes the **Deploy model (endpoint)** step you added, click and drag the cursor from the step to the **Deploy model (endpoint)** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Deploy model (endpoint)** step you added, click and drag the cursor from the **Deploy model (endpoint)** step to the step to create an edge.

## Transform step
Transform

You use a transform step for batch transformation to run inference on an entire dataset. For more information about batch transformation, see [Batch transforms with inference pipelines](inference-pipeline-batch.md).

A transform step requires a transformer and the data on which to run batch transformation. The following example shows how to create a Transform step definition. For more information on Transform step requirements, see the [sagemaker.workflow.steps.TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep) documentation.

------
#### [ Pipeline Designer ]

To add a batch transform step to your pipeline using the drag-and-drop visual editor, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Deploy model (batch transform)** and drag it to the canvas.

1. In the canvas, choose the **Deploy model (batch transform)** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep).

1. If the canvas includes any step that immediately precedes the **Deploy model (batch transform)** step you added, click and drag the cursor from the step to the **Deploy model (batch transform)** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Deploy model (batch transform)** step you added, click and drag the cursor from the **Deploy model (batch transform)** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(..., sagemaker_session=PipelineSession())

step_transform = TransformStep(
    name="AbaloneTransform",
    step_args=transformer.transform(data="s3://amzn-s3-demo-bucket/my-data"),
)
```

------

## Condition step
Condition

You use a condition step to evaluate the condition of step properties to assess which action should be taken next in the pipeline.

A condition step requires:
+ A list of conditions.
+ A list of steps to run if the condition evaluates to `true`.
+ A list of steps to run if the condition evaluates to `false`.

------
#### [ Pipeline Designer ]

To add a condition step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Condition** and drag it to the canvas.

1. In the canvas, choose the **Condition** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.condition\$1step.ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep).

1. If the canvas includes any step that immediately precedes the **Condition** step you added, click and drag the cursor from the step to the **Condition** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Condition** step you added, click and drag the cursor from the **Condition** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

 The following example shows how to create a `ConditionStep` definition. 

**Limitations**
+ Pipelines doesn't support the use of nested condition steps. You can't pass a condition step as the input for another condition step.
+ A condition step can't use identical steps in both branches. If you need the same step functionality in both branches, duplicate the step and give it a different name.

```
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[]
)
```

For more information on `ConditionStep` requirements, see the [sagemaker.workflow.condition\$1step.ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep) API reference. For more information on supported conditions, see *[Amazon SageMaker Pipelines - Conditions](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions)* in the SageMaker AI Python SDK documentation. 

------

## Callback step
`Callback`

Use a `Callback` step to add additional processes and AWS services into your workflow that aren't directly provided by Amazon SageMaker Pipelines. When a `Callback` step runs, the following procedure occurs:
+ Pipelines sends a message to a customer-specified Amazon Simple Queue Service (Amazon SQS) queue. The message contains a Pipelines–generated token and a customer-supplied list of input parameters. After sending the message, Pipelines waits for a response from the customer.
+ The customer retrieves the message from the Amazon SQS queue and starts their custom process.
+ When the process finishes, the customer calls one of the following APIs and submits the Pipelines–generated token:
  +  [SendPipelineExecutionStepSuccess](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepSuccess.html), along with a list of output parameters
  +  [SendPipelineExecutionStepFailure](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepFailure.html), along with a failure reason
+ The API call causes Pipelines to either continue the pipeline process or fail the process.

For more information on `Callback` step requirements, see the [sagemaker.workflow.callback\$1step.CallbackStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.callback_step.CallbackStep) documentation. For a complete solution, see [Extend SageMaker Pipelines to include custom steps using callback steps](https://aws.amazon.com/blogs/machine-learning/extend-amazon-sagemaker-pipelines-to-include-custom-steps-using-callback-steps/).

**Important**  
`Callback` steps were introduced in Amazon SageMaker Python SDK v2.45.0 and Amazon SageMaker Studio Classic v3.6.2. You must update Studio Classic before you use a `Callback` step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

The following sample shows an implementation of the preceding procedure.

```
from sagemaker.workflow.callback_step import CallbackStep

step_callback = CallbackStep(
    name="MyCallbackStep",
    sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue",
    inputs={...},
    outputs=[...]
)

callback_handler_code = '
    import boto3
    import json

    def handler(event, context):
        sagemaker_client=boto3.client("sagemaker")

        for record in event["Records"]:
            payload=json.loads(record["body"])
            token=payload["token"]

            # Custom processing

            # Call SageMaker AI to complete the step
            sagemaker_client.send_pipeline_execution_step_success(
                CallbackToken=token,
                OutputParameters={...}
            )
'
```

**Note**  
Output parameters for `CallbackStep` should not be nested. For example, if you use a nested dictionary as your output parameter, then the dictionary is treated as a single string (ex. `{"output1": "{\"nested_output1\":\"my-output\"}"}`). If you provide a nested value, then when you try to refer to a particular output parameter, SageMaker AI throws a non-retryable client error.

**Stopping behavior**

A pipeline process doesn't stop while a `Callback` step is running.

When you call [StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html) on a pipeline process with a running `Callback` step, Pipelines sends an Amazon SQS message to the SQS queue. The body of the SQS message contains a **Status** field, which is set to `Stopping`. The following shows an example SQS message body.

```
{
  "token": "26vcYbeWsZ",
  "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a",
  "arguments": {
    "number": 5,
    "stringArg": "some-arg",
    "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv"
  },
  "status": "Stopping"
}
```

You should add logic to your Amazon SQS message consumer to take any needed action (for example, resource cleanup) upon receipt of the message. Then add a call to `SendPipelineExecutionStepSuccess` or `SendPipelineExecutionStepFailure`.

Only when Pipelines receives one of these calls does it stop the pipeline process.

## Lambda step
Lambda

You use a Lambda step to run an AWS Lambda function. You can run an existing Lambda function, or SageMaker AI can create and run a new Lambda function. If you choose to use an existing Lambda function, it must be in the same AWS Region as the SageMaker AI pipeline. For a notebook that shows how to use a Lambda step in a SageMaker AI pipeline, see [sagemaker-pipelines-lambda-step.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb).

**Important**  
Lambda steps were introduced in Amazon SageMaker Python SDK v2.51.0 and Amazon SageMaker Studio Classic v3.9.1. You must update Studio Classic before you use a Lambda step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

SageMaker AI provides the [sagemaker.lambda\$1helper.Lambda](https://sagemaker.readthedocs.io/en/stable/api/utility/lambda_helper.html) class to create, update, invoke, and delete Lambda functions. `Lambda` has the following signature.

```
Lambda(
    function_arn,       # Only required argument to invoke an existing Lambda function

    # The following arguments are required to create a Lambda function:
    function_name,
    execution_role_arn,
    zipped_code_dir,    # Specify either zipped_code_dir and s3_bucket, OR script
    s3_bucket,          # S3 bucket where zipped_code_dir is uploaded
    script,             # Path of Lambda function script
    handler,            # Lambda handler specified as "lambda_script.lambda_handler"
    timeout,            # Maximum time the Lambda function can run before the lambda step fails
    ...
)
```

The [sagemaker.workflow.lambda\$1step.LambdaStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.lambda_step.LambdaStep) class has a `lambda_func` argument of type `Lambda`. To invoke an existing Lambda function, the only requirement is to supply the Amazon Resource Name (ARN) of the function to `function_arn`. If you don't supply a value for `function_arn`, you must specify `handler` and one of the following:
+ `zipped_code_dir` – The path of the zipped Lambda function

  `s3_bucket` – Amazon S3 bucket where `zipped_code_dir` is to be uploaded
+ `script` – The path of the Lambda function script file

The following example shows how to create a `Lambda` step definition that invokes an existing Lambda function.

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda"
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

The following example shows how to create a `Lambda` step definition that creates and invokes a Lambda function using a Lambda function script.

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
      function_name="split-dataset-lambda",
      execution_role_arn=execution_role_arn,
      script="lambda_script.py",
      handler="lambda_script.lambda_handler",
      ...
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

**Inputs and outputs**

If your `Lambda` function has inputs or outputs, these must also be defined in your `Lambda` step.

**Note**  
Input and output parameters should not be nested. For example, if you use a nested dictionary as your output parameter, then the dictionary is treated as a single string (ex. `{"output1": "{\"nested_output1\":\"my-output\"}"}`). If you provide a nested value and try to refer to it later, a non-retryable client error is thrown.

When defining the `Lambda` step, `inputs` must be a dictionary of key-value pairs. Each value of the `inputs` dictionary must be a primitive type (string, integer, or float). Nested objects are not supported. If left undefined, the `inputs` value defaults to `None`.

The `outputs` value must be a list of keys. These keys refer to a dictionary defined in the output of the `Lambda` function. Like `inputs`, these keys must be primitive types, and nested objects are not supported.

**Timeout and stopping behavior**

The `Lambda` class has a `timeout` argument that specifies the maximum time that the Lambda function can run. The default value is 120 seconds with a maximum value of 10 minutes. If the Lambda function is running when the timeout is met, the Lambda step fails; however, the Lambda function continues to run.

A pipeline process can't be stopped while a Lambda step is running because the Lambda function invoked by the Lambda step can't be stopped. If you stop the process while the Lambda function is running, the pipeline waits for the function to finish or until the timeout is hit. This depends on whichever occurs first. The process then stops. If the Lambda function finishes, the pipeline process status is `Stopped`. If the timeout is hit the pipeline process status is `Failed`.

## ClarifyCheck step
`ClarifyCheck`

You can use the `ClarifyCheck` step to conduct baseline drift checks against previous baselines for bias analysis and model explainability. You can then generate and [register your baselines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations) with the `model.register()` method and pass the output of that method to [Model step](#step-type-model) using `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)`. These baselines for drift check can be used by Amazon SageMaker Model Monitor for your model endpoints. As a result, you don’t need to do a [baseline](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html) suggestion separately. 

The `ClarifyCheck` step can also pull baselines for drift check from the model registry. The `ClarifyCheck` step uses the SageMaker Clarify prebuilt container. This container provides a range of model monitoring capabilities, including constraint suggestion and constraint validation against a given baseline. For more information, see [Prebuilt SageMaker Clarify Containers](clarify-processing-job-configure-container.md).

### Configuring the ClarifyCheck step
Configuring `ClarifyCheck`

You can configure the `ClarifyCheck` step to conduct only one of the following check types each time it’s used in a pipeline.
+ Data bias check
+ Model bias check
+ Model explainability check

To do this, set the `clarify_check_config` parameter with one of the following check type values:
+ `DataBiasCheckConfig`
+ `ModelBiasCheckConfig`
+ `ModelExplainabilityCheckConfig`

The `ClarifyCheck` step launches a processing job that runs the SageMaker AI Clarify prebuilt container and requires dedicated [configurations for the check and the processing job](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html). `ClarifyCheckConfig` and `CheckJobConfig` are helper functions for these configurations. These helper functions are aligned with how the SageMaker Clarify processing job computes for checking model bias, data bias, or model explainability. For more information, see [Run SageMaker Clarify Processing Jobs for Bias Analysis and Explainability](clarify-processing-job-run.md). 

### Controlling step behaviors for drift check
Configuring `ClarifyCheck`

The `ClarifyCheck` step requires the following two boolean flags to control its behavior:
+ `skip_check`: This parameter indicates if the drift check against the previous baseline is skipped or not. If it is set to `False`, the previous baseline of the configured check type must be available.
+ `register_new_baseline`: This parameter indicates if a newly calculated baseline can be accessed though step property `BaselineUsedForDriftCheckConstraints`. If it is set to `False`, the previous baseline of the configured check type also must be available. This can be accessed through the `BaselineUsedForDriftCheckConstraints` property. 

For more information, see [Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines](pipelines-quality-clarify-baseline-lifecycle.md).

### Working with baselines
Baselines

You can optionally specify the `model_package_group_name` to locate the existing baseline. Then, the `ClarifyCheck` step pulls the `DriftCheckBaselines` on the latest approved model package in the model package group. 

Or, you can provide a previous baseline through the `supplied_baseline_constraints` parameter. If you specify both the `model_package_group_name` and the `supplied_baseline_constraints`, the `ClarifyCheck` step uses the baseline specified by the `supplied_baseline_constraints` parameter.

For more information on using the `ClarifyCheck` step requirements, see the [ sagemaker.workflow.steps.ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.clarify_check_step.ClarifyCheckStep) in the *Amazon SageMaker AI SageMaker AI SDK for Python*. For an Amazon SageMaker Studio Classic notebook that shows how to use `ClarifyCheck` step in Pipelines, see [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb).

**Example Create a `ClarifyCheck` step for data bias check**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_bias_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']),
    label=0,
    dataset_type="text/csv",
    s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
)

data_bias_config = BiasConfig(
    label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]]  
)

data_bias_check_config = DataBiasCheckConfig(
    data_config=data_bias_data_config,
    data_bias_config=data_bias_config,
)h

data_bias_check_step = ClarifyCheckStep(
    name="DataBiasCheckStep",
    clarify_check_config=data_bias_check_config,
    check_job_config=check_job_config,
    skip_check=False,
    register_new_baseline=False
   supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## QualityCheck step
`QualityCheck`

Use the `QualityCheck` step to conduct [baseline suggestions](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html) and drift checks against a previous baseline for data quality or model quality in a pipeline. You can then generate and [register your baselines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations) with the `model.register()` method and pass the output of that method to [Model step](#step-type-model) using `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)`. ]

Model Monitor can use these baselines for drift check for your model endpoints so that you don’t need to do a baseline suggestion separately. The `QualityCheck` step can also pull baselines for drift check from the model registry. The `QualityCheck` step leverages the Amazon SageMaker AI Model Monitor prebuilt container. This container has a range of model monitoring capabilities including constraint suggestion, statistics generation, and constraint validation against a baseline. For more information, see [Amazon SageMaker Model Monitor prebuilt container](model-monitor-pre-built-container.md).

### Configuring the QualityCheck step
Configuring `QualityCheck`

You can configure the `QualityCheck` step to run only one of the following check types each time it’s used in a pipeline.
+ Data quality check
+ Model quality check

You do this by setting the `quality_check_config` parameter with one of the following check type values:
+ `DataQualityCheckConfig`
+ `ModelQualityCheckConfig`

The `QualityCheck` step launches a processing job that runs the Model Monitor prebuilt container and requires dedicated configurations for the check and the processing job. The `QualityCheckConfig` and `CheckJobConfig` are helper functions for these configurations. These helper functions are aligned with how Model Monitor creates a baseline for the model quality or data quality monitoring. For more information on the Model Monitor baseline suggestions, see [Create a Baseline](model-monitor-create-baseline.md) and [Create a model quality baseline](model-monitor-model-quality-baseline.md).

### Controlling step behaviors for drift check
Configuring `QualityCheck`

The `QualityCheck` step requires the following two Boolean flags to control its behavior:
+ `skip_check`: This parameter indicates if the drift check against the previous baseline is skipped or not. If it is set to `False`, the previous baseline of the configured check type must be available.
+ `register_new_baseline`: This parameter indicates if a newly calculated baseline can be accessed through step properties `BaselineUsedForDriftCheckConstraints` and `BaselineUsedForDriftCheckStatistics`. If it is set to `False`, the previous baseline of the configured check type must also be available. These can be accessed through the `BaselineUsedForDriftCheckConstraints` and `BaselineUsedForDriftCheckStatistics` properties.

For more information, see [Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines](pipelines-quality-clarify-baseline-lifecycle.md).

### Working with baselines
Baselines

You can specify a previous baseline directly through the `supplied_baseline_statistics` and `supplied_baseline_constraints` parameters. You can also specify the `model_package_group_name` and the `QualityCheck` step pulls the `DriftCheckBaselines` on the latest approved model package in the model package group. 

When you specify the following, the `QualityCheck` step uses the baseline specified by `supplied_baseline_constraints` and `supplied_baseline_statistics` on the check type of the `QualityCheck` step.
+ `model_package_group_name`
+ `supplied_baseline_constraints`
+ `supplied_baseline_statistics`

For more information on using the `QualityCheck` step requirements, see the [sagemaker.workflow.steps.QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.quality_check_step.QualityCheckStep) in the *Amazon SageMaker AI SageMaker AI SDK for Python*. For an Amazon SageMaker Studio Classic notebook that shows how to use `QualityCheck` step in Pipelines, see [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb). 

**Example Create a `QualityCheck` step for data quality check**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep'])
)

data_quality_check_step = QualityCheckStep(
    name="DataQualityCheckStep",
    skip_check=False,
    register_new_baseline=False,
    quality_check_config=data_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json",
    supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## EMR step
EMR

Use the Amazon SageMaker Pipelines [EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-overview.html) step to:
+ Process [Amazon EMR steps](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html) on a running Amazon EMR cluster.
+ Have the pipeline create and manage an Amazon EMR cluster for you.

For more information about Amazon EMR, see [Getting started with Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs.html).

The EMR step requires that `EMRStepConfig` include the location of the JAR file used by the Amazon EMR cluster and any arguments to be passed. You also provide the Amazon EMR cluster ID if you want to run the step on a running EMR cluster. You can also pass the cluster configuration to run the EMR step on a cluster that it creates, manages, and terminates for you. The following sections include examples and links to sample notebooks demonstrating both methods.

**Note**  
EMR steps require that the role passed to your pipeline has additional permissions. Attach the [AWS managed policy: `AmazonSageMakerPipelinesIntegrations`](https://docs.aws.amazon.com/sagemaker/latest/dg/security-iam-awsmanpol-pipelines.html#security-iam-awsmanpol-AmazonSageMakerPipelinesIntegrations) to your pipeline role, or ensure that the role includes the permissions in that policy.
If you process an EMR step on a running cluster, you can only use a cluster that is in one of the following states:   
`STARTING`
`BOOTSTRAPPING`
`RUNNING`
`WAITING`
If you process EMR steps on a running cluster, you can have at most 256 EMR steps in a `PENDING` state on an EMR cluster. EMR steps submitted beyond this limit result in pipeline execution failure. You may consider using [Retry Policy for Pipeline Steps](pipelines-retry-policy.md).
You can specify either cluster ID or cluster configuration, but not both.
The EMR step relies on Amazon EventBridge to monitor changes in the EMR step or cluster state. If you process your Amazon EMR job on a running cluster, the EMR step uses the `SageMakerPipelineExecutionEMRStepStatusUpdateRule` rule to monitor EMR step state. If you process your job on a cluster that the EMR step creates, the step uses the `SageMakerPipelineExecutionEMRClusterStatusRule` rule to monitor changes in cluster state. If you see either of these EventBridge rules in your AWS account, do not delete them or else your EMR step may not complete.

**Add an Amazon EMR step to your pipeline**

To add an EMR step to your pipeline, do the following:
+ Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).
+ In the left navigation pane, select **Pipelines**.
+ Choose **Create**.
+ Choose **Blank**.
+ In the left sidebar, choose **Process data** and drag it to the canvas.
+ In the canvas, choose the **Process data** step you added.
+ In the right sidebar, under mode, choose **EMR (managed)**.
+ In the right sidebar, complete the forms in the **Setting and Details** tabs. For information about the fields in these tabs, see [sagemaker.workflow.fail\$1step.EMRstep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.emr_step.EMRStep).

**Launch a new job on a running Amazon EMR cluster**

To launch a new job on a running Amazon EMR cluster, pass the cluster ID as a string to the `cluster_id` argument of `EMRStep`. The following example demonstrates this procedure.

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

step_emr = EMRStep (
    name="EMRSampleStep", # required
    cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster
    step_config=emr_config, # required
    display_name="My EMR Step",
    description="Pipeline step to execute EMR job"
)
```

For a sample notebook that guides you through a complete example, see [ Pipelines EMR Step With Running EMR Cluster](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-running-emr-cluster.ipynb).

**Launch a new job on a new Amazon EMR cluster**

To launch a new job on a new cluster that `EMRStep` creates for you, provide your cluster configuration as a dictionary. The dictionary must have the same structure as a [RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) request. However, do not include the following fields in your cluster configuration:
+ [`Name`]
+ [`Steps`]
+ [`AutoTerminationPolicy`]
+ [`Instances`][`KeepJobFlowAliveWhenNoSteps`]
+ [`Instances`][`TerminationProtected`]

All other `RunJobFlow` arguments are available for use in your cluster configuration. For details about the request syntax, see [RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).

The following example passes a cluster configuration to an EMR step definition. This prompts the step to launch a new job on a new EMR cluster. The EMR cluster configuration in this example includes specifications for primary and core EMR cluster nodes. For more information about Amazon EMR node types, see [ Understand node types: primary, core, and task nodes](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-master-core-task-nodes.html).

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_step_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

# include your cluster configuration as a dictionary
emr_cluster_config = {
    "Applications": [
        {
            "Name": "Spark", 
        }
    ],
    "Instances":{
        "InstanceGroups":[
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": "m5.2xlarge"
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 2,
                "InstanceType": "m5.2xlarge"
            }
        ]
    },
    "BootstrapActions":[],
    "ReleaseLabel": "emr-6.6.0",
    "JobFlowRole": "job-flow-role",
    "ServiceRole": "service-role"
}

emr_step = EMRStep(
    name="emr-step",
    cluster_id=None,
    display_name="emr_step",
    description="MyEMRStepDescription",
    step_config=emr_step_config,
    cluster_config=emr_cluster_config
)
```

For a sample notebook that guides you through a complete example, see [ Pipelines EMR Step With Cluster Lifecycle Management](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb).

## EMR serverless step
EMR serverless step

To add an EMR serverless step to your pipeline, do the following:
+ Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).
+ In the left navigation pane, select **Pipelines**.
+ Choose **Create**.
+ Choose **Blank**.
+ In the left sidebar, choose **Process data** and drag it to the canvas.
+ In the canvas, choose the **Process data** step you added.
+ In the right sidebar, under mode, choose **EMR (serverless)**.
+ In the right sidebar, complete the forms in the **Setting and Details** tabs.

## Notebook job step
Notebook Job

Use a `NotebookJobStep` to run your SageMaker Notebook Job non-interactively as a pipeline step. If you build your pipeline in the Pipelines drag-and-drop UI, use the [Execute code step](#step-type-executecode) to run your notebook. For more information about SageMaker Notebook Jobs, see [SageMaker Notebook Jobs](notebook-auto-run.md).

A `NotebookJobStep` requires at minimum an input notebook, image URI and kernel name. For more information about Notebook Job step requirements and other parameters you can set to customize your step, see [sagemaker.workflow.steps.NotebookJobStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.notebook_job_step.NotebookJobStep).

The following example uses minimum arguments to define a `NotebookJobStep`.

```
from sagemaker.workflow.notebook_job_step import NotebookJobStep


notebook_job_step = NotebookJobStep(
    input_notebook=input_notebook,
    image_uri=image_uri,
    kernel_name=kernel_name
)
```

Your `NotebookJobStep` pipeline step is treated as a SageMaker notebook job. As a result, track the execution status in the Studio Classic UI notebook job dashboard by including specific tags with the `tags` argument. For more details about tags to include, see [View your notebook jobs in the Studio UI dashboard](create-notebook-auto-run-sdk.md#create-notebook-auto-run-dash).

Also, if you schedule your notebook job using the SageMaker Python SDK, you can only specify certain images to run your notebook job. For more information, see [Image constraints for SageMaker AI Python SDK notebook jobs](notebook-auto-run-constraints.md#notebook-auto-run-constraints-image-sdk).

## Fail step
Fail

Use a Fail step to stop an Amazon SageMaker Pipelines execution when a desired condition or state is not achieved. The Fail step also allows you to enter a custom error message, indicating the cause of the pipeline's execution failure.

**Note**  
When a Fail step and other pipeline steps execute at the same time, the pipeline does not terminate until all concurrent steps are completed.

### Limitations for using Fail step
Fail step limitations
+ You cannot add a Fail step to the `DependsOn` list of other steps. For more information, see [Custom dependency between steps](build-and-manage-steps.md#build-and-manage-custom-dependency).
+ Other steps cannot reference the Fail step. It is *always* the last step in a pipeline's execution.
+ You cannot retry a pipeline execution ending with a Fail step.

You can create the Fail step error message in the form of a static text string. Alternatively, you can also use [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html), a [Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html?highlight=Join#sagemaker.workflow.functions.Join) operation, or other [step properties](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-properties) to create a more informative error message if you use the SDK.

------
#### [ Pipeline Designer ]

To add a Fail step to your pipeline, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Fail** and drag it to the canvas.

1. In the canvas, choose the **Fail** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.fail\$1step.FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep).

1. If the canvas includes any step that immediately precedes the **Fail** step you added, click and drag the cursor from the step to the **Fail** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Fail** step you added, click and drag the cursor from the **Fail** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Example**  
The following example code snippet uses a `FailStep` with an `ErrorMessage` configured with Pipeline Parameters and a `Join` operation.  

```
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterInteger

mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5)
step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(
        on=" ", values=["Execution failed due to MSE >", mse_threshold_param]
    ),
)
```

------

# Add integration


MLflow integration allows you to use MLflow with pipelines to select a tracking server or serverless application, choose an experiment, and log metrics.

## Key concepts


**Default app creation** - A default MLflow application will be created when you enter the pipeline visual editor.

**Integrations panel** - A new integrations panel includes MLflow, which you can select and configure.

**Update app and experiment** - The option to override selected application and experiment during the pipeline execution.

## How it works

+ Go to **Pipeline Visual Editor**
+ Choose **Integration** on the toolbar
+ Choose **MLflow**
+ Configure the MLflow app and experiment

## Example screenshots


Integrations side panel

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-1.png)


MLflow configuration

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-2.png)


How to override experiment during pipeline execution

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-3.png)


## Step properties


Use the `properties` attribute to add data dependencies between steps in the pipeline. Pipelines use these data dependencies to construct the DAG from the pipeline definition. These properties can be referenced as placeholder values and are resolved at runtime. 

The `properties` attribute of a Pipelines step matches the object returned by a `Describe` call for the corresponding SageMaker AI job type. For each job type, the `Describe` call returns the following response object:
+ `ProcessingStep` – [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html)
+ `TrainingStep` – [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html)
+ `TransformStep` – [DescribeTransformJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTransformJob.html)

To check which properties are referrable for each step type during data dependency creation, see *[Data Dependency - Property Reference](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

## Step parallelism


When a step does not depend on any other step, it runs immediately upon pipeline execution. However, executing too many pipeline steps in parallel can quickly exhaust available resources. Control the number of concurrent steps for a pipeline execution with `ParallelismConfiguration`.

The following example uses `ParallelismConfiguration` to set the concurrent step limit to five.

```
pipeline.create(
    parallelism_config=ParallelismConfiguration(5),
)
```

## Data dependency between steps


You define the structure of your DAG by specifying the data relationships between steps. To create data dependencies between steps, pass the properties of one step as the input to another step in the pipeline. The step receiving the input isn't started until after the step providing the input finishes running.

A data dependency uses JsonPath notation in the following format. This format traverses the JSON property file. This means you can append as many *<property>* instances as needed to reach the desired nested property in the file. For more information on JsonPath notation, see the [JsonPath repo](https://github.com/json-path/JsonPath).

```
<step_name>.properties.<property>.<property>
```

The following shows how to specify an Amazon S3 bucket using the `ProcessingOutputConfig` property of a processing step.

```
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
```

To create the data dependency, pass the bucket to a training step as follows.

```
from sagemaker.workflow.pipeline_context import PipelineSession

sklearn_train = SKLearn(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="CensusTrain",
    step_args=sklearn_train.fit(inputs=TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"].S3Output.S3Uri
    ))
)
```

To check which properties are referrable for each step type during data dependency creation, see *[Data Dependency - Property Reference](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

## Custom dependency between steps


When you specify a data dependency, Pipelines provides the data connection between the steps. Alternatively, one step can access the data from a previous step without directly using Pipelines. In this case, you can create a custom dependency that tells Pipelines not to start a step until after another step has finished running. You create a custom dependency by specifying a step's `DependsOn` attribute.

As an example, the following defines a step `C` that starts only after both step `A` and step `B` finish running.

```
{
  'Steps': [
    {'Name':'A', ...},
    {'Name':'B', ...},
    {'Name':'C', 'DependsOn': ['A', 'B']}
  ]
}
```

Pipelines throws a validation exception if the dependency would create a cyclic dependency.

The following example creates a training step that starts after a processing step finishes running.

```
processing_step = ProcessingStep(...)
training_step = TrainingStep(...)

training_step.add_depends_on([processing_step])
```

The following example creates a training step that doesn't start until two different processing steps finish running.

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(...)

training_step.add_depends_on([processing_step_1, processing_step_2])
```

The following provides an alternate way to create the custom dependency.

```
training_step.add_depends_on([processing_step_1])
training_step.add_depends_on([processing_step_2])
```

The following example creates a training step that receives input from one processing step and waits for a different processing step to finish running.

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(
    ...,
    inputs=TrainingInput(
        s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri
    )

training_step.add_depends_on([processing_step_2])
```

The following example shows how to retrieve a string list of the custom dependencies of a step.

```
custom_dependencies = training_step.depends_on
```

## Custom images in a step


 You can use any of the available SageMaker AI [Deep Learning Container images](https://github.com/aws/deep-learning-containers/blob/master/available_images.md) when you create a step in your pipeline. 

You can also use your own container with pipeline steps. Because you can’t create an image from within Studio Classic, you must create your image using another method before using it with Pipelines.

To use your own container when creating the steps for your pipeline, include the image URI in the estimator definition. For more information on using your own container with SageMaker AI, see [Using Docker Containers with SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html).

# Lift-and-shift Python code with the @step decorator


The `@step` decorator is a feature that converts your local machine learning (ML) code into one or more pipeline steps. You can write your ML function as you would for any ML project. Once tested locally or as a training job using the `@remote` decorator, you can convert the function to a SageMaker AI pipeline step by adding a `@step` decorator. You can then pass the output of the `@step`-decorated function call as a step to Pipelines to create and run a pipeline. You can chain a series of functions with the `@step` decorator to create a multi-step directed acyclic graph (DAG) pipeline as well.

The setup to use the `@step` decorator is the same as the setup to use the `@remote` decorator. You can refer to the remote function documentation for details about how to [setup the environment](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html#train-remote-decorator-env) and [use a configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html) to set defaults. For more information about the `@step` decorator, see [sagemaker.workflow.function\$1step.step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.step).

To view to sample notebooks that demonstrate the use of `@step` decorator, see [@step decorator sample notebooks](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-pipelines/step-decorator).

The following sections explain how you can annotate your local ML code with a `@step` decorator to create a step, create and run a pipeline using the step, and customize the experience for your use case.

**Topics**
+ [

# Create a pipeline with `@step`-decorated functions
](pipelines-step-decorator-create-pipeline.md)
+ [

# Run a pipeline
](pipelines-step-decorator-run-pipeline.md)
+ [

# Configure your pipeline
](pipelines-step-decorator-cfg-pipeline.md)
+ [

# Best Practices
](pipelines-step-decorator-best.md)
+ [

# Limitations
](pipelines-step-decorator-limit.md)

# Create a pipeline with `@step`-decorated functions


You can create a pipeline by converting Python functions into pipeline steps using the `@step` decorator, creating dependencies between those functions to create a pipeline graph (or directed acyclic graph (DAG)), and passing the leaf nodes of that graph as a list of steps to the pipeline. The following sections explain this procedure in detail with examples.

**Topics**
+ [

## Convert a function to a step
](#pipelines-step-decorator-run-pipeline-convert)
+ [

## Create dependencies between the steps
](#pipelines-step-decorator-run-pipeline-link)
+ [

## Use `ConditionStep` with `@step`-decorated steps
](#pipelines-step-decorator-condition)
+ [

## Define a pipeline using the `DelayedReturn` output of steps
](#pipelines-step-define-delayed)
+ [

## Create a pipeline
](#pipelines-step-decorator-pipeline-create)

## Convert a function to a step


To create a step using the `@step` decorator, annotate the function with `@step`. The following example shows a `@step`-decorated function that preprocesses the data.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

When you invoke a `@step`-decorated function, SageMaker AI returns a `DelayedReturn` instance instead of running the function. A `DelayedReturn` instance is a proxy for the actual return of that function. The `DelayedReturn` instance can be passed to another function as an argument or directly to a pipeline instance as a step. For information about the `DelayedReturn` class, see [sagemaker.workflow.function\$1step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn).

## Create dependencies between the steps


When you create a dependency between two steps, you create a connection between the steps in your pipeline graph. The following sections introduce multiple ways you can create a dependency between your pipeline steps.

### Data dependencies through input arguments


Passing in the `DelayedReturn` output of one function as an input to another function automatically creates a data dependency in the pipeline DAG. In the following example, passing in the `DelayedReturn` output of the `preprocess` function to the `train` function creates a dependency between `preprocess` and `train`.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

The previous example defines a training function which is decorated with `@step`. When this function is invoked, it receives the `DelayedReturn` output of the preprocessing pipeline step as input. Invoking the training function returns another `DelayedReturn` instance. This instance holds the information about all the previous steps defined in that function (i.e, the `preprocess` step in this example) which form the pipeline DAG.

In the previous example, the `preprocess` function returns a single value. For more complex return types like lists or tuples, refer to [Limitations](pipelines-step-decorator-limit.md).

### Define custom dependencies


In the previous example, the `train` function received the `DelayedReturn` output of `preprocess` and created a dependency. If you want to define the dependency explicitly without passing the previous step output, use the `add_depends_on` function with the step. You can use the `get_step()` function to retrieve the underlying step from its `DelayedReturn` instance, and then call `add_depends_on`\$1on with the dependency as input. To view the `get_step()` function definition, see [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step). The following example shows you how to create a dependency between `preprocess` and `train` using `get_step()` and `add_depends_on()`.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### Pass data to and from a `@step`-decorated function to a traditional pipeline step


You can create a pipeline that includes a `@step`-decorated step and a traditional pipeline step and passes data between them. For example, you can use `ProcessingStep` to process the data and pass its result to the `@step`-decorated training function. In the following example, a `@step`-decorated training step references the output of a processing step.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## Use `ConditionStep` with `@step`-decorated steps


Pipelines supports a `ConditionStep` class which evaluates the results of preceding steps to decide what action to take in the pipeline. You can use `ConditionStep` with a `@step`-decorated step as well. To use the output of any `@step`-decorated step with `ConditionStep`, enter the output of that step as an argument to `ConditionStep`. In the following example, the condition step receives the output of the `@step`-decorated model evaluation step.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## Define a pipeline using the `DelayedReturn` output of steps


You define a pipeline the same way whether or not you use a `@step` decorator. When you pass a `DelayedReturn` instance to your pipeline, you don't need to pass a full list of steps to build the pipeline. The SDK automatically infers the previous steps based on the dependencies you define. All the previous steps of the `Step` objects you passed to the pipeline or `DelayedReturn` objects are included in the pipeline graph. In the following example, the pipeline receives the `DelayedReturn` object for the `train` function. SageMaker AI adds the `preprocess` step, as a previous step of `train`, to the pipeline graph.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

If there are no data or custom dependencies between the steps and you run multiple steps in parallel, the pipeline graph has more than one leaf node. Pass all of these leaf nodes in a list to the `steps` argument in your pipeline definition, as shown in the following example:

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

When the pipeline runs, both steps run in parallel.

You only pass the leaf nodes of the graph to the pipeline because the leaf nodes contain information about all the previous steps defined through data or custom dependencies. When it compiles the pipeline, SageMaker AI also infers of all of the subsequent steps that form the pipeline graph and adds each of them as a separate step to the pipeline.

## Create a pipeline


Create a pipeline by calling `pipeline.create()`, as shown in the following snippet. For details about `create()`, see [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create).

```
role = "pipeline-role"
pipeline.create(role)
```

When you call `pipeline.create()`, SageMaker AI compiles all of the steps defined as part of the pipeline instance. SageMaker AI uploads the serialized function, arguments, and all the other step-related artifacts to Amazon S3.

Data resides in the S3 bucket according to the following structure:

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri` is defined in the SageMaker AI config file and applies to the entire pipeline. If undefined, the default SageMaker AI bucket is used.

**Note**  
Every time SageMaker AI compiles a pipeline, SageMaker AI saves the the steps' serialized functions, arguments and dependencies in a folder timestamped with the current time. This occurs every time you run `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` or `pipeline.definition()`.

# Run a pipeline


The following page describes how to run a pipeline with Amazon SageMaker Pipelines, either with SageMaker AI resources or locally.

Start a new pipeline run with the `pipeline.start()` function as you would for a traditional SageMaker AI pipeline run. For information about the `start()` function, see [sagemaker.workflow.pipeline.Pipeline.start](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.start).

**Note**  
A step defined using the `@step` decorator runs as a training job. Therefore, be aware of the following limits:  
Instance limits and training job limits in your accounts. Update your limits accordingly to avoid any throttling or resource limit issues.
The monetary costs associated with every run of a training step in the pipeline. For more details, refer to [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/).

## Retrieve results from a pipeline run locally


To view the result of any step of a pipeline run, use [execution.result()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline._PipelineExecution.result           ), as shown in the following snippet:

```
execution = pipeline.start()
execution.result(step_name="train")
```

**Note**  
Pipelines does not support `execution.result()` in local mode.

You can only retrieve results for one step at a time. If the step name was generated by SageMaker AI, you can retrieve the step name by calling `list_steps` as follows:

```
execution.list_step()
```

## Run a pipeline locally


You can run a pipeline with `@step`-decorated steps locally as you would for traditional pipeline steps. For details about local mode pipeline runs, see [Run pipelines using local mode](pipelines-local-mode.md). To use local mode, provide a `LocalPipelineSession` instead of a `SageMakerSession` to your pipeline definition, as shown in the following example:

```
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model
    
step_train_result = train()

local_pipeline_session = LocalPipelineSession()

local_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=local_pipeline_session # needed for local mode
)

local_pipeline.create(role_arn="role_arn")

# pipeline runs locally
execution = local_pipeline.start()
```

# Configure your pipeline


You are advised to use the SageMaker AI config file to set the defaults for the pipeline. For information about the SageMaker AI configuration file, see [Configuring and using defaults with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk). Any configuration added to the config file applies to all steps in the pipeline. If you want to override options for any of the steps, provide new values in the `@step` decorator arguments. The following topic describes how to set up a config file.

The `@step` decorator's configuration in the config file is identical to the `@remote` decorator's configuration. To set up the pipeline role ARN and pipeline tags in the config file, use the `Pipeline` section shown in the following snippet:

```
SchemaVersion: '1.0'
SageMaker:
  Pipeline:
    RoleArn: 'arn:aws:iam::555555555555:role/IMRole'
    Tags:
    - Key: 'tag_key'
      Value: 'tag_value'
```

For most of the defaults you can set in the configuration file you can also override by passing new values to the `@step` decorator. For example, you can override the instance type set in the config file for your preprocessing step, as shown in the following example:

```
@step(instance_type="ml.m5.large")
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
```

A few arguments are not part of the `@step` decorator parameters list—these can be configured for the entire pipeline only through the SageMaker AI configuration file. They are listed as follows:
+ `sagemaker_session` (`sagemaker.session.Session`): The underlying SageMaker AI session to which SageMaker AI delegates service calls. If unspecified, a session is created using a default configuration as follows:

  ```
  SageMaker:
    PythonSDK:
      Modules:
        Session:
          DefaultS3Bucket: 'default_s3_bucket'
          DefaultS3ObjectKeyPrefix: 'key_prefix'
  ```
+ `custom_file_filter` (`CustomFileFilter)`: A `CustomFileFilter` object that specifies the local directories and files to include in the pipeline step. If unspecified, this value defaults to `None`. For `custom_file_filter` to take effect, you must set `IncludeLocalWorkdir` to `True`. The following example shows a configuration that ignores all notebook files, and files and directories named `data`.

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          IncludeLocalWorkDir: true
          CustomFileFilter: 
            IgnoreNamePatterns: # files or directories to ignore
            - "*.ipynb" # all notebook files
            - "data" # folder or file named "data"
  ```

  For more details about how to use `IncludeLocalWorkdir` with `CustomFileFilter`, see [Using modular code with the @remote decorator](train-remote-decorator-modular.md).
+ `s3_root_uri (str)`: The root Amazon S3 folder to which SageMaker AI uploads the code archives and data. If unspecified, the default SageMaker AI bucket is used.
+ `s3_kms_key (str)`: The key used to encrypt the input and output data. You can only configure this argument in the SageMaker AI config file and the argument applies to all steps defined in the pipeline. If unspecified, the value defaults to `None`. See the following snippet for an example S3 KMS key configuration:

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          S3KmsKeyId: 's3kmskeyid'
          S3RootUri: 's3://amzn-s3-demo-bucket/my-project
  ```

# Best Practices


The following sections suggest best practices to follow when you use the `@step` decorator for your pipeline steps.

## Use warm pools


For faster pipeline step runs, use the warm pooling functionality provided for training jobs. You can turn on the warm pool functionality by providing the `keep_alive_period_in_seconds` argument to the `@step` decorator as demonstrated in the following snippet:

```
@step(
   keep_alive_period_in_seconds=900
)
```

For more information about warm pools, see [SageMaker AI Managed Warm Pools](train-warm-pools.md). 

## Structure your directory


You are advised to use code modules while using the `@step` decorator. Put the `pipeline.py` module, in which you invoke the step functions and define the pipeline, at the root of the workspace. The recommended structure is shown as follows:

```
.
├── config.yaml # the configuration file that define the infra settings
├── requirements.txt # dependencies
├── pipeline.py  # invoke @step-decorated functions and define the pipeline here
├── steps/
| ├── processing.py
| ├── train.py
├── data/
├── test/
```

# Limitations


The following sections outline the limitations that you should be aware of when you use the `@step` decorator for your pipeline steps.

## Function argument limitations


When you pass an input argument to the `@step`-decorated function, the following limitations apply:
+ You can pass the `DelayedReturn`, `Properties` (of steps of other types), `Parameter`, and `ExecutionVariable` objects to `@step`-decorated functions as arguments. But `@step`-decorated functions do not support `JsonGet` and `Join` objects as arguments.
+ You cannot directly access a pipeline variable from a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func():
      print(param)
  
  func() # this raises a SerializationError
  ```
+ You cannot nest a pipeline variable in another object and pass it to a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func(arg):
      print(arg)
  
  func(arg=(param,)) # this raises a SerializationError because param is nested in a tuple
  ```
+ Since inputs and outputs of a function are serialized, there are restrictions on the type of data that can be passed as input or output from a function. See the *Data serialization and deserialization* section of [Invoke a remote function](train-remote-decorator-invocation.md) for more details. The same restrictions apply to `@step`-decorated functions.
+ Any object that has a boto client cannot be serialized, hence you cannot pass such objects as input to or output from a `@step`-decorated function. For example, SageMaker Python SDK client classes such as `Estimator`, `Predictor`, and `Processor` can't be serialized.

## Function imports


You should import the libraries required by the step inside rather than outside the function. If you import them at global scope, you risk an import collision while serializing the function. For example, `sklearn.pipeline.Pipeline` could be overridden by `sagemaker.workflow.pipeline.Pipeline`.

## Referencing child members of function return value


If you reference child members of a `@step`-decorated function's return value, the following limitations apply:
+ You can reference the child members with `[]` if the `DelayedReturn` object represents a tuple, list or dict, as shown in the following example:

  ```
  delayed_return[0]
  delayed_return["a_key"]
  delayed_return[1]["a_key"]
  ```
+ You cannot unpack a tuple or list output because the exact length of the underlying tuple or list can't be known when you invoke the function. The following example produces an error:

  ```
  a, b, c = func() # this raises ValueError
  ```
+ You cannot iterate over a `DelayedReturn` object. The following example raises an error:

  ```
  for item in func(): # this raises a NotImplementedError
  ```
+ You cannot reference arbitrary child members with '`.`'. The following example produces an error:

  ```
  delayed_return.a_child # raises AttributeError
  ```

## Existing pipeline features that are not supported


You cannot use the `@step` decorator with the following pipeline features:
+ [Pipeline step caching](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)
+ [Property files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html#build-and-manage-propertyfile-property)

# Pass Data Between Steps


When building pipelines with Amazon SageMaker Pipelines, you might need to pass data from one step to the next. For example, you might want to use the model artifacts generated by a training step as input to a model evaluation or deployment step. You can use this functionality to create interdependent pipeline steps and build your ML workflows.

When you need to retrieve information from the output of a pipeline step, you can use `JsonGet`. `JsonGet` helps you extract information from Amazon S3 or property files. The following sections explain methods you can use to extract step outputs with `JsonGet`.

## Pass data between steps with Amazon S3


You can use `JsonGet` in a `ConditionStep` to fetch the JSON output directly from Amazon S3. The Amazon S3 URI can be a `Std:Join` function containing primitive strings, pipeline run variables, or pipeline parameters. The following example shows how you can use `JsonGet` in a `ConditionStep`:

```
# Example json file in s3 bucket generated by a processing_step
{
   "Output": [5, 10]
}

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name="<step-name>",
        s3_uri="<s3-path-to-json>",
        json_path="Output[1]"
    ),
    right=6.0
)
```

If you are using `JsonGet` with an Amazon S3 path in the condition step, you must explicitly add a dependency between the condition step and the step generating the JSON output. In following example, the condition step is created with a dependency on the processing step:

```
cond_step = ConditionStep(
        name="<step-name>",
        conditions=[cond_lte],
        if_steps=[fail_step],
        else_steps=[register_model_step],
        depends_on=[processing_step],
)
```

## Pass data between steps with property files


Use property files to store information from the output of a processing step. This is particularly useful when analyzing the results of a processing step to decide how a conditional step should be executed. The `JsonGet` function processes a property file and enables you to use JsonPath notation to query the property JSON file. For more information on JsonPath notation, see the [JsonPath repo](https://github.com/json-path/JsonPath).

To store a property file for later use, you must first create a `PropertyFile` instance with the following format. The `path` parameter is the name of the JSON file to which the property file is saved. Any `output_name` must match the `output_name` of the `ProcessingOutput` that you define in your processing step. This enables the property file to capture the `ProcessingOutput` in the step.

```
from sagemaker.workflow.properties import PropertyFile

<property_file_instance> = PropertyFile(
    name="<property_file_name>",
    output_name="<processingoutput_output_name>",
    path="<path_to_json_file>"
)
```

When you create your `ProcessingStep` instance, add the `property_files` parameter to list all of the parameter files that the Amazon SageMaker Pipelines service must index. This saves the property file for later use.

```
property_files=[<property_file_instance>]
```

To use your property file in a condition step, add the `property_file` to the condition that you pass to your condition step as shown in the following example to query the JSON file for your desired property using the `json_path` parameter.

```
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=<property_file_instance>,
        json_path="mse"
    ),
    right=6.0
)
```

For more in-depth examples, see *[Property File](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#property-file)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

# Caching pipeline steps


In Amazon SageMaker Pipelines, you can use step caching to save time and resources when rerunning pipelines. Step caching reuses the output of a previous successful run of a step (instead of recomputing it) when the step has the same configuration and inputs. This helps you achieve consistent results across pipeline reruns with identical parameters. The following topic shows you how to configure and turn on step caching for your pipelines.

When you use step signature caching, Pipelines tries to find a previous run of your current pipeline step with the same values for certain attributes. If found, Pipelines propagates the outputs from the previous run rather than recomputing the step. The attributes checked are specific to the step type, and are listed in [Default cache key attributes by pipeline step type](pipelines-default-keys.md).

You must opt in to step caching — it is off by default. When you turn on step caching, you must also define a timeout. This timeout defines how old a previous run can be to remain a candidate for reuse.

Step caching only considers successful runs — it never reuses failed runs. When multiple successful runs exist within the timeout period, Pipelines uses the result for the most recent successful run. If no successful runs match in the timeout period, Pipelines reruns the step. If the executor finds a previous run that meets the criteria but is still in progress, both steps continue running and update the cache if they're successful.

Step caching is only scoped for individual pipelines, so you can’t reuse a step from another pipeline even if there is a step signature match.

Step caching is available for the following step types: 
+ [Processing](build-and-manage-steps-types.md#step-type-processing)
+ [Training](build-and-manage-steps-types.md#step-type-training)
+ [Tuning](build-and-manage-steps-types.md#step-type-tuning)
+ [AutoML](build-and-manage-steps-types.md#step-type-automl)
+ [Transform](build-and-manage-steps-types.md#step-type-transform)
+ [`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check)
+ [`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check)
+ [EMR](build-and-manage-steps-types.md#step-type-emr)

**Topics**
+ [

# Turn on step caching
](pipelines-caching-enabling.md)
+ [

# Turn off step caching
](pipelines-caching-disabling.md)
+ [

# Default cache key attributes by pipeline step type
](pipelines-default-keys.md)
+ [

# Cached data access control
](pipelines-access-control.md)

# Turn on step caching


To turn on step caching, you must add a `CacheConfig` property to the step definition. `CacheConfig` properties use the following format in the pipeline definition file:

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

The `Enabled` field indicates whether caching is turned on for the particular step. You can set the field to `true`, which tells SageMaker AI to try to find a previous run of the step with the same attributes. Or, you can set the field to `false`, which tells SageMaker AI to run the step every time the pipeline runs. `ExpireAfter` is a string in [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format that defines the timeout period. The `ExpireAfter` duration can be a year, month, week, day, hour, or minute value. Each value consists of a number followed by a letter indicating the unit of duration. For example:
+ "30d" = 30 days
+ "5y" = 5 years
+ "T16m" = 16 minutes
+ "30dT5h" = 30 days and 5 hours.

The following discussion describes the procedure to turn on caching for new or pre-existing pipelines using the Amazon SageMaker Python SDK.

**Turn on caching for new pipelines**

For new pipelines, initialize a `CacheConfig` instance with `enable_caching=True` and provide it as an input to your pipeline step. The following example turns on caching with a 1-hour timeout period for a training step: 

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
      
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)
```

**Turn on caching for pre-existing pipelines**

To turn on caching for pre-existing, already-defined pipelines, turn on the `enable_caching` property for the step, and set `expire_after` to a timeout value. Lastly, update the pipeline with `pipeline.upsert()` or `pipeline.update()`. Once you run it again, the following code example turns on caching with a 1-hour timeout period for a training step:

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

Alternatively, update the cache config after you have already defined the (pre-existing) pipeline, allowing one continuous code run. The following code sample demonstrates this method:

```
# turn on caching with timeout period of one hour
pipeline.steps[0].cache_config.enable_caching = True 
pipeline.steps[0].cache_config.expire_after = "PT1H" 

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

For more detailed code examples and a discussion about how Python SDK parameters affect caching, see [ Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

# Turn off step caching


A pipeline step does not rerun if you change any attributes that are not listed in [Default cache key attributes by pipeline step type](pipelines-default-keys.md) for its step type. However, you may decide that you want the pipeline step to rerun anyway. In this case, you need to turn off step caching.

To turn off step caching, set the `Enabled` attribute in the step definition’s `CacheConfig` property in the step definition to `false`, as shown in the following code snippet:

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

Note that the `ExpireAfter` attribute is ignored when `Enabled` is `false`.

To turn off caching for a pipeline step using the Amazon SageMaker Python SDK, define the pipeline of your pipeline step, turn off the `enable_caching` property, and update the pipeline.

Once you run it again, the following code example turns off caching for a training step:

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=False, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

Alternatively, turn off the `enable_caching` property after you have already defined the pipeline, allowing one continuous code run. The following code sample demonstrates this solution:

```
# turn off caching for the training step
pipeline.steps[0].cache_config.enable_caching = False

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

For more detailed code examples and a discussion about how Python SDK parameters affect caching, see [Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

# Default cache key attributes by pipeline step type


When deciding whether to reuse a previous pipeline step or rerun the step, Pipelines checks to see if certain attributes have changed. If the set of attributes is different from all previous runs within the timeout period, the step runs again. These attributes include input artifacts, app or algorithm specification, and environment variables. The following list shows each pipeline step type and the attributes that, if changed, initiate a rerun of the step. For more information about which Python SDK parameters are used to create the following attributes, see [ Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

## [Processing step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html)

+ AppSpecification
+ Environment
+ ProcessingInputs. This attribute contains information about the preprocessing script.

  

## [Training step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html)

+ AlgorithmSpecification
+ CheckpointConfig
+ DebugHookConfig
+ DebugRuleConfigurations
+ Environment
+ HyperParameters
+ InputDataConfig. This attribute contains information about the training script.

  

## [Tuning step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateHyperParameterTuningJob.html)

+ HyperParameterTuningJobConfig
+ TrainingJobDefinition. This attribute is composed of multiple child attributes, not all of which cause the step to rerun. The child attributes that could incur a rerun (if changed) are:
  + AlgorithmSpecification
  + HyperParameterRanges
  + InputDataConfig
  + StaticHyperParameters
  + TuningObjective
+ TrainingJobDefinitions

  

## [AutoML step](https://docs.aws.amazon.com//sagemaker/latest/APIReference/API_AutoMLJobConfig.html)

+ AutoMLJobConfig. This attribute is composed of multiple child attributes, not all of which cause the step to rerun. The child attributes that could incur a rerun (if changed) are:
  + CompletionCriteria
  + CandidateGenerationConfig
  + DataSplitConfig
  + Mode
+ AutoMLJobObjective
+ InputDataConfig
+ ProblemType

  

## [Transform step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html)

+ DataProcessing
+ Environment
+ ModelName
+ TransformInput

  

## [ClarifyCheck step](build-and-manage-steps-types.md#step-type-clarify-check)

+ ClarifyCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints

  

## [QualityCheck step](build-and-manage-steps-types.md#step-type-quality-check)

+ QualityCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints
+ SuppliedBaselineStatistics

  

## [EMR Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-emr)

+ ClusterId
+ StepConfig

  

# Cached data access control


When a SageMaker AI pipeline runs, it caches the parameters and metadata associated with the SageMaker AI jobs launched by the pipeline and saves them for reuse in subsequent runs. This metadata is accessible through a variety of sources in addition to cached pipeline steps, and includes the following types:
+ `Describe*Job` requests
+ CloudWatch Logs
+ CloudWatch Events
+ CloudWatch Metrics
+ SageMaker AI Search

Note that access to each data source in the list is controlled by its own set of IAM permissions. Removing a particular role’s access to one data source does not affect the level of access to the others. For example, an account admin might remove IAM permissions for `Describe*Job` requests from a caller’s role. While the caller can no longer make `Describe*Job` requests, they can still retrieve the metadata from a pipeline run with cached steps as long as they have permission to run the pipeline. If an account admin wants to remove access to the metadata from a particular SageMaker AI job completely, they need to remove permissions for each of the relevant services that provide access to the data. 

# Retry Policy for Pipeline Steps
Retry Policy

Retry policies help you automatically retry your Pipelines steps after an error occurs. Any pipeline step can encounter exceptions, and exceptions happen for various reasons. In some cases, a retry can resolve these issues. With a retry policy for pipeline steps, you can choose whether to retry a particular pipeline step or not.

The retry policy only supports the following pipeline steps:
+ [Processing step](build-and-manage-steps-types.md#step-type-processing) 
+ [Training step](build-and-manage-steps-types.md#step-type-training) 
+ [Tuning step](build-and-manage-steps-types.md#step-type-tuning) 
+ [AutoML step](build-and-manage-steps-types.md#step-type-automl) 
+ [Create model step](build-and-manage-steps-types.md#step-type-create-model) 
+ [Register model step](build-and-manage-steps-types.md#step-type-register-model) 
+ [Transform step](build-and-manage-steps-types.md#step-type-transform) 
+ [Notebook job step](build-and-manage-steps-types.md#step-type-notebook-job) 

**Note**  
Jobs running inside both the tuning and AutoML steps conduct retries internally and will not retry the `SageMaker.JOB_INTERNAL_ERROR` exception type, even if a retry policy is configured. You can program your own [ Retry Strategy](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryStrategy.html) using the SageMaker API.

## Supported exception types for the retry policy


The retry policy for pipeline steps supports the following exception types:
+ `Step.SERVICE_FAULT`: These exceptions occur when an internal server error or transient error happens when calling downstream services. Pipelines retries on this type of error automatically. With a retry policy, you can override the default retry operation for this exception type.
+ `Step.THROTTLING`: Throttling exceptions can occur while calling the downstream services. Pipelines retries on this type of error automatically. With a retry policy, you can override the default retry operation for this exception type.
+ `SageMaker.JOB_INTERNAL_ERROR`: These exceptions occur when the SageMaker AI job returns `InternalServerError`. In this case, starting a new job may fix a transient issue.
+ `SageMaker.CAPACITY_ERROR`: The SageMaker AI job may encounter Amazon EC2 `InsufficientCapacityErrors`, which leads to the SageMaker AI job’s failure. You can retry by starting a new SageMaker AI job to avoid the issue. 
+ `SageMaker.RESOURCE_LIMIT`: You can exceeed the resource limit quota when running a SageMaker AI job. You can wait and retry running the SageMaker AI job after a short period and see if resources are released.

## The JSON schema for the retry policy


The retry policy for Pipelines has the following JSON schema:

```
"RetryPolicy": {
   "ExceptionType": [String]
   "IntervalSeconds": Integer
   "BackoffRate": Double
   "MaxAttempts": Integer
   "ExpireAfterMin": Integer
}
```
+ `ExceptionType`: This field requires the following exception types in a string array format.
  + `Step.SERVICE_FAULT`
  + `Step.THROTTLING`
  + `SageMaker.JOB_INTERNAL_ERROR`
  + `SageMaker.CAPACITY_ERROR`
  + `SageMaker.RESOURCE_LIMIT`
+ `IntervalSeconds` (optional): The number of seconds before the first retry attempt (1 by default). `IntervalSeconds` has a maximum value of 43200 seconds (12 hours).
+ `BackoffRate` (optional): The multiplier by which the retry interval increases during each attempt (2.0 by default).
+ `MaxAttempts` (optional): A positive integer that represents the maximum number of retry attempts (5 by default). If the error recurs more times than `MaxAttempts` specifies, retries cease and normal error handling resumes. A value of 0 specifies that errors are never retried. `MaxAttempts` has a maximum value of 20.
+ `ExpireAfterMin` (optional): A positive integer that represents the maximum timespan of retry. If the error recurs after `ExpireAfterMin` minutes counting from the step gets executed, retries cease and normal error handling resumes. A value of 0 specifies that errors are never retried. `ExpireAfterMin ` has a maximum value of 14,400 minutes (10 days).
**Note**  
Only one of `MaxAttempts` or `ExpireAfterMin` can be given, but not both; if both are *not* specified, `MaxAttempts` becomes the default. If both properties are identified within one policy, then the retry policy generates a validation error.

# Configuring a retry policy
Retry policy example

While SageMaker Pipelines provide a robust and automated way to orchestrate machine learning workflows, you might encounter failures when you run them. To handle such scenarios gracefully and improve the reliability of your pipelines, you can configure retry policies that define how and when to automatically retry specific steps after encountering an exception. The retry policy allows you to specify the types of exceptions to retry, the maximum number of retry attempts, the interval between retries, and the backoff rate for increasing the retry intervals. The following section provides examples of how to configure a retry policy for a training step in your pipeline, both in JSON and using the SageMaker Python SDK.

The following is an example of a training step with a retry policy.

```
{
    "Steps": [
        {
            "Name": "MyTrainingStep",
            "Type": "Training",
            "RetryPolicies": [
                {
                    "ExceptionType": [
                        "SageMaker.JOB_INTERNAL_ERROR",
                        "SageMaker.CAPACITY_ERROR"
                    ],
                    "IntervalSeconds": 1,
                    "BackoffRate": 2,
                    "MaxAttempts": 5
                }
            ]
        }
    ]
}
```



The following is an example of how to build a `TrainingStep` in SDK for Python (Boto3) with a retry policy.

```
from sagemaker.workflow.retry import (
    StepRetryPolicy, 
    StepExceptionTypeEnum,
    SageMakerJobExceptionTypeEnum,
    SageMakerJobStepRetryPolicy
)

step_train = TrainingStep(
    name="MyTrainingStep",
    xxx,
    retry_policies=[
        // override the default 
        StepRetryPolicy(
            exception_types=[
                StepExceptionTypeEnum.SERVICE_FAULT, 
                StepExceptionTypeEnum.THROTTLING
            ],
            expire_after_mins=5,
            interval_seconds=10,
            backoff_rate=2.0 
        ),
        // retry when resource limit quota gets exceeded
        SageMakerJobStepRetryPolicy(
            exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
            expire_after_mins=120,
            interval_seconds=60,
            backoff_rate=2.0
        ),
        // retry when job failed due to transient error or EC2 ICE.
        SageMakerJobStepRetryPolicy(
            failure_reason_types=[
                SageMakerJobExceptionTypeEnum.INTERNAL_ERROR,
                SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
            ],
            max_attempts=10,
            interval_seconds=30,
            backoff_rate=2.0
        )
    ]
)
```

For more information on configuring retry behavior for certain step types, see *[Amazon SageMaker Pipelines - Retry Policy](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#retry-policy)* in the Amazon SageMaker Python SDK documentation.

# Selective execution of pipeline steps
Selective Execution

As you use Pipelines to create workflows and orchestrate your ML training steps, you might need to undertake multiple experimentation phases. Instead of running the full pipeline each time, you might only want to repeat certain steps. With Pipelines, you can execute pipeline steps selectively. This helps optimize your ML training. Selective execution is useful in the following scenarios: 
+ You want to restart a specific step with updated instance type, hyperparameters, or other variables while keeping the parameters from upstream steps.
+ Your pipeline fails an intermediate step. Previous steps in the execution, such as data preparation or feature extraction, are expensive to rerun. You might need to introduce a fix and rerun certain steps manually to complete the pipeline. 

Using selective execution, you can choose to run any subset of steps as long as they are connected in the directed acyclic graph (DAG) of your pipeline. The following DAG shows an example pipeline workflow:

![\[A directed acyclic graph (DAG) of an example pipeline.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipeline-full.png)


You can select steps `AbaloneTrain` and `AbaloneEval` in a selective execution, but you cannot select just `AbaloneTrain` and `AbaloneMSECond` steps because these steps are not connected in the DAG. For non-selected steps in the workflow, the selective execution reuses the outputs from a reference pipeline execution rather than rerunning the steps. Also, non-selected steps that are downstream from the selected steps do not run in a selective execution. 

If you choose to run a subset of intermediate steps in your pipeline, your steps may depend on previous steps. SageMaker AI needs a reference pipeline execution from which to resource these dependencies. For example, if you choose to run the steps `AbaloneTrain` and `AbaloneEval`, you need the outputs from the `AbaloneProcess` step. You can either provide a reference execution ARN or direct SageMaker AI to use the latest pipeline execution, which is the default behavior. If you have a reference execution, you can also build the runtime parameters from your reference run and supply them to your selective executive run with overrides. For details, see [Reuse runtime parameter values from a reference execution](#pipelines-selective-ex-reuse).

In detail, you provide a configuration for your selective execution pipeline run using `SelectiveExecutionConfig`. If you include an ARN for a reference pipeline execution (with the `source_pipeline_execution_arn` argument), SageMaker AI uses the previous step dependencies from the pipeline execution you provided. If you do not include an ARN and a latest pipeline execution exists, SageMaker AI uses it as a reference by default. If you do not include an ARN and do not want SageMaker AI to use your latest pipeline execution, set `reference_latest_execution` to `False`. The pipeline execution which SageMaker AI ultimately uses as a reference, whether the latest or user-specified, must be in `Success` or `Failed` state.

The following table summarizes how SageMaker AI chooses a reference execution.


| The `source_pipeline_execution_arn` argument value | The `reference_latest_execution` argument value | The reference execution used | 
| --- | --- | --- | 
| A pipeline ARN | `True` or unspecified | The specified pipeline ARN | 
| A pipeline ARN | `False` | The specified pipeline ARN | 
| null or unspecified | `True` or unspecified | The latest pipeline execution | 
| null or unspecified | `False` | None—in this case, select steps without upstream dependencies | 

For more information about selective execution configuration requirements, see the [ sagemaker.workflow.selective\$1execution\$1config.SelectiveExecutionConfig ](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#selective-execution-config) documentation.

The following discussion includes examples for the cases in which you want to specify a pipeline reference execution, use the latest pipeline execution as a reference, or run selective execution without a reference pipeline execution.

## Selective execution with a user-specified pipeline reference


The following example demonstrates a selective execution of the steps `AbaloneTrain` and `AbaloneEval` using a reference pipeline execution.

```
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig

selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef", 
    selected_steps=["AbaloneTrain", "AbaloneEval"]
)

selective_execution = pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Selective execution with the latest pipeline execution as a reference


The following example demonstrates a selective execution of the steps `AbaloneTrain` and `AbaloneEval` using the latest pipeline execution as a reference. Since SageMaker AI uses the latest pipeline execution by default, you can optionally set the `reference_latest_execution` argument to `True`.

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneTrain", "AbaloneEval"],
    # optional
    reference_latest_execution=True
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Selective execution without a reference pipeline


The following example demonstrates a selective execution of the steps `AbaloneProcess` and `AbaloneTrain` without providing a reference ARN and turning off the option to use the latest pipeline run as a reference. SageMaker AI permits this configuration since this subset of steps doesn’t depend on previous steps.

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneProcess", "AbaloneTrain"],
    reference_latest_execution=False
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Reuse runtime parameter values from a reference execution


You can build the parameters from your reference pipeline execution using `build_parameters_from_execution`, and supply the result to your selective execution pipeline. You can use the original parameters from the reference execution, or apply any overrides using the `parameter_value_overrides` argument.

The following example shows you how to build parameters from a reference execution and apply an override for the `MseThreshold` parameter.

```
# Prepare a new selective execution.
selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    selected_steps=["AbaloneTrain", "AbaloneEval", "AbaloneMSECond"],
)
# Define a new parameters list to test.
new_parameters_mse={
    "MseThreshold": 5,
}

# Build parameters from reference execution and override with new parameters to test.
new_parameters = pipeline.build_parameters_from_execution(
    pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    parameter_value_overrides=new_parameters_mse
)

# Start pipeline execution with new parameters.
execution = pipeline.start(
    selective_execution_config=selective_execution_config,
    parameters=new_parameters
)
```

# Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines
ClarifyCheck QualityCheck Baselines

The following topic discusses how baselines and model versions evolve in the Amazon SageMaker Pipelines when using the [`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check) and [`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check) steps.

For the `ClarifyCheck` step, a baseline is a single file that resides in the step properties with the suffix `constraints`. For the `QualityCheck` step, a baseline is a combination of two files that resides in the step properties: one with the suffix `statistics` and the other with the suffix `constraints`. In the following topics we discuss these properties with a prefix that describes how they are used, impacting baseline behavior and lifecycle in these two pipeline steps. For example, the `ClarifyCheck` step always calculates and assigns the new baselines in the `CalculatedBaselineConstraints` property and the `QualityCheck` step does the same in the `CalculatedBaselineConstraints` and `CalculatedBaselineStatistics` properties.

## Baseline calculation and registration for ClarifyCheck and QualityCheck steps
Baseline calculation and registration

Both the `ClarifyCheck` and `QualityCheck` steps always calculate new baselines based on step inputs through the underlying processing job run. These newly calculated baselines are accessed through the properties with the prefix `CalculatedBaseline`. You can record these properties as the `ModelMetrics` of your model package in the [Model step](build-and-manage-steps-types.md#step-type-model). This model package can be registered with 5 different baselines. You can register it with one for each check type: data bias, model bias, and model explainability from running the `ClarifyCheck` step and model quality, and data quality from running the `QualityCheck` step. The `register_new_baseline` parameter dictates the value set in the properties with the prefix `BaselineUsedForDriftCheck` after a step runs.

The following table of potential use cases shows different behaviors resulting from the step parameters you can set for the `ClarifyCheck` and `QualityCheck` steps:


| Possible use case that you may consider for selecting this configuration  | `skip_check` / `register_new_baseline` | Does step do a drift check? | Value of step property `CalculatedBaseline` | Value of step property `BaselineUsedForDriftCheck` | 
| --- | --- | --- | --- | --- | 
| You are doing regular retraining with checks enabled to get a new model version, but you *want to carry over the previous baselines* as the `DriftCheckBaselines` in the model registry for your new model version. | False/ False | Drift check runs against existing baselines | New baselines calculated by running the step | Baseline from the latest approved model in Model Registry or the baseline supplied as step parameter | 
| You are doing regular retraining with checks enabled to get a new model version, but you *want to refresh the `DriftCheckBaselines` in the model registry with the newly calculated baselines* for your new model version. | False/ True | Drift check runs against existing baselines | New baselines calculated by running the step | Newly calculated baseline by running the step (value of property CalculatedBaseline) | 
| You are initiating the pipeline to retrain a new model version because there is a violation detected by Amazon SageMaker Model Monitor on an endpoint for a particular type of check, and you want to *skip this type of check against the previous baseline, but carry over the previous baseline as `DriftCheckBaselines` in the model registry* for your new model version. | True/ False | No drift check | New baselines calculated by running | Baseline from the latest approved model in the model registry or the baseline supplied as step parameter | 
| This happens in the following cases: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html)  | True/ True | No drift check | New baselines calculated by running the step | Newly calculated baseline by running the step (value of property CalculatedBaseline) | 

**Note**  
If you use scientific notation in your constraint, you need to convert to float. For a preprocessing script example of how to do this, see [Create a Model Quality Baseline](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-baseline.html).

When you register a model with [Model step](build-and-manage-steps-types.md#step-type-model), you can register the `BaselineUsedForDriftCheck` property as `DriftCheckBaselines`. These baseline files can then be used by Model Monitor for model and data quality checks. In addition, these baselines can also be used in the ClarifyCheckStep and `QualityCheck` step to compare newly trained models against the existing models that are registered in the model registry for future pipeline runs.

## Drift Detection against Previous Baselines in Pipelines
Drift Detection

In the case of the `QualityCheck` step, when you initiate the pipeline for regular retraining to get a new model version, you may not want to run the training step if the data quality and the data bias has [Schema for Violations (constraint\$1violations.json file)](model-monitor-interpreting-violations.md) on the baselines of your previous approved model version. You also may not want to register the newly trained model version if the model quality, model bias, or model explainability violates the registered baseline of your previous approved model version when running the `ClarifyCheck` step. In these cases, you can enable the checks you want by setting the `skip_check` property of the corresponding check step set to `False`, resulting in the `ClarifyCheck` and `QualityCheck` step failing if violation is detected against previous baselines. The pipeline process then does not proceed so that the model drifted from the baseline isn't registered. `ClarifyCheck` and `QualityCheck` steps are able to get `DriftCheckBaselines` of the latest approved model version of a given model package group against which to compare. Previous baselines can also be supplied directly through `supplied_baseline_constraints` (in addition to `supplied_baseline_statistics` if it is a `QualityCheck` step) and are always prioritized over any baselines pulled from the model package group. 

## Baseline and model version lifecycle and evolution with Pipelines
Baseline Lifecycle and Evolution

By setting `register_new_baseline` of your `ClarifyCheck` and `QualityCheck` step to `False`, your previous baseline is accessible through the step property prefix `BaselineUsedForDriftCheck`. You can then register these baselines as the `DriftCheckBaselines` in the new model version when you register a model with [Model step](build-and-manage-steps-types.md#step-type-model). Once you approve this new model version in the model registry, the `DriftCheckBaseline` in this model version becomes available for the `ClarifyCheck` and `QualityCheck` steps in the next pipeline process. If you want to refresh the baseline of a certain check type for future model versions, you can set `register_new_baseline` to `True` so that the properties with prefix `BaselineUsedForDriftCheck` become the newly calculated baseline. In these ways, you can preserve your preferred baselines for a model trained in the future, or refresh the baselines for drift checks when needed, managing your baseline evolution and lifecycle throughout your model training iterations. 

The following diagram illustrates a model-version-centric view of the baseline evolution and lifecycle.

![\[A model-version-centric view of the baseline evolution and lifecycle.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipelines/Baseline-Lifecycle.png)


# Schedule Pipeline Runs
Schedule Pipeline Runs

You can schedule your Amazon SageMaker Pipelines executions using [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html). Amazon SageMaker Pipelines is supported as a target in [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html). This allows you to initiate the execution of your model building pipeline based on any event in your event bus. With EventBridge, you can automate your pipeline executions and respond automatically to events such as training job or endpoint status changes. Events include a new file being uploaded to your Amazon S3 bucket, a change in status of your Amazon SageMaker AI endpoint due to drift, and *Amazon Simple Notification Service* (SNS) topics.

The following Pipelines actions can be automatically initiated:  
+  `StartPipelineExecution` 

For more information on scheduling SageMaker AI jobs, see [Automating SageMaker AI with Amazon EventBridge.](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html) 

**Topics**
+ [

## Schedule a Pipeline with Amazon EventBridge
](#pipeline-eventbridge-schedule)
+ [

## Schedule a pipeline with the SageMaker Python SDK
](#build-and-manage-scheduling)

## Schedule a Pipeline with Amazon EventBridge


To start a pipeline execution with Amazon CloudWatch Events, you must create an EventBridge [rule](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_Rule.html). When you create a rule for events, you specify a target action to take when EventBridge receives an event that matches the rule. When an event matches the rule, EventBridge sends the event to the specified target and initiates the action defined in the rule. 

 The following tutorials show how to schedule a pipeline execution with EventBridge using the EventBridge console or the AWS CLI.  

### Prerequisites

+ A role that EventBridge can assume with the `SageMaker::StartPipelineExecution` permission. This role can be created automatically if you create a rule from the EventBridge console; otherwise, you need to create this role yourself. For information on creating a SageMaker AI role, see [SageMaker Roles](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html).
+ An Amazon SageMaker AI Pipeline to schedule. To create an Amazon SageMaker AI Pipeline, see [Define a Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html).

### Create an EventBridge rule using the EventBridge console


 The following procedure shows how to create an EventBridge rule using the EventBridge console.  

1. Navigate to the [EventBridge console](https://console.aws.amazon.com/events). 

1. Select **Rules** on the left hand side. 

1.  Select `Create Rule`. 

1. Enter a name and description for your rule.

1.  Select how you want to initiate this rule. You have the following choices for your rule: 
   + **Event pattern**: Your rule is initiated when an event matching the pattern occurs. You can choose a predefined pattern that matches a certain type of event, or you can create a custom pattern. If you select a predefined pattern, you can edit the pattern to customize it. For more information on Event patterns, see [Event Patterns in CloudWatch Events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html). 
   + **Schedule**: Your rule is initiated regularly on a specified schedule. You can use a fixed-rate schedule that initiates regularly for a specified number of minutes, hour, or weeks. You can also use a [cron expression](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html#CronExpressions) to create a more fine-grained schedule, such as “the first Monday of each month at 8am.” Schedule is not supported on a custom or partner event bus. 

1. Select your desired Event bus. 

1. Select the target(s) to invoke when an event matches your event pattern or when the schedule is initiated. You can add up to 5 targets per rule. Select `SageMaker Pipeline` in the target dropdown list. 

1. Select the pipeline you want to initiate from the pipeline dropdown list. 

1. Add parameters to pass to your pipeline execution using a name and value pair. Parameter values can be static or dynamic. For more information on Amazon SageMaker AI Pipeline parameters, see [AWS::Events::Rule SagemakerPipelineParameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-pipeline.html#aws-resource-sagemaker-pipeline-properties).
   + Static values are passed to the pipeline execution every time the pipeline is initiated. For example, if `{"Name": "Instance_type", "Value": "ml.4xlarge"}` is specified in the parameter list, then it is passed as a parameter in `StartPipelineExecutionRequest` every time EventBridge initiates the pipeline. 
   + Dynamic values are specified using a JSON path. EventBridge parses the value from an event payload, then passes it to the pipeline execution. For example: *`$.detail.param.value`* 

1. Select the role to use for this rule. You can either use an existing role or create a new one. 

1. (Optional) Add tags. 

1. Select `Create` to finalize your rule. 

 Your rule is now in effect and ready to initiate your pipeline executions. 

### Create an EventBridge rule using the [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/events/index.html)


 The following procedure shows how to create an EventBridge rule using the AWS CLI. 

1. Create a rule to be initiated. When creating an EventBridge rule using the AWS CLI, you have two options for how your rule is initiated, event pattern and schedule.
   +  **Event pattern**: Your rule is initiated when an event matching the pattern occurs. You can choose a predefined pattern that matches a certain type of event, or you can create a custom pattern. If you select a predefined pattern, you can edit the pattern to customize it.  You can create a rule with event pattern using the following command: 

     ```
     aws events put-rule --name <RULE_NAME> ----event-pattern <YOUR_EVENT_PATTERN> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```
   +  **Schedule**: Your rule is initiated regularly on a specified schedule. You can use a fixed-rate schedule that initiates regularly for a specified number of minutes, hour, or weeks. You can also use a cron expression to create a more fine-grained schedule, such as “the first Monday of each month at 8am.” Schedule is not supported on a custom or partner event bus. You can create a rule with schedule using the following command: 

     ```
     aws events put-rule --name <RULE_NAME> --schedule-expression <YOUR_CRON_EXPRESSION> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```

1. Add target(s) to invoke when an event matches your event pattern or when the schedule is initiated. You can add up to 5 targets per rule.  For each target, you must specify:  
   +  ARN: The resource ARN of your pipeline. 
   +  Role ARN: The ARN of the role EventBridge should assume to execute the pipeline. 
   +  Parameters:  Amazon SageMaker AI pipeline parameters to pass. 

1. Run the following command to pass a Amazon SageMaker AI pipeline as a target to your rule using [put-targets](https://docs.aws.amazon.com/cli/latest/reference/events/put-targets.html) : 

   ```
   aws events put-targets --rule <RULE_NAME> --event-bus-name <EVENT_BUS_NAME> --targets "[{\"Id\": <ID>, \"Arn\": <RESOURCE_ARN>, \"RoleArn\": <ROLE_ARN>, \"SageMakerPipelineParameter\": { \"SageMakerParameterList\": [{\"Name\": <NAME>, \"Value\": <VALUE>}]} }]"] 
   ```

## Schedule a pipeline with the SageMaker Python SDK


The following sections show you how to set up permissions to access EventBridge resources and create your pipeline schedule using the SageMaker Python SDK. 

### Required permissions


You need to have necessary permissions to use the pipeline scheduler. Complete the following steps to set up your permissions:

1. Attach the following minimum privilege policy to the IAM role used to create the pipeline triggers, or use the AWS managed policy `AmazonEventBridgeSchedulerFullAccess`.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement":
       [
           {
               "Action":
               [
                   "scheduler:ListSchedules",
                   "scheduler:GetSchedule",
                   "scheduler:CreateSchedule",
                   "scheduler:UpdateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Effect": "Allow",
               "Resource":
               [
                   "*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": "iam:PassRole",
               "Resource": "arn:aws:iam::*:role/*", 
               "Condition": {
                   "StringLike": {
                       "iam:PassedToService": "scheduler.amazonaws.com"
                   }
               }
           }
       ]
   }
   ```

------

1. Establish a trust relationship with EventBridge by adding the service principal `scheduler.amazonaws.com` to this role’s trust policy. Make sure you attach the following trust policy to the execution role if you launch the notebook in SageMaker Studio.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com"
                ]
            },
        "Action": "sts:AssumeRole"
        }
    ]
}
```

------

### Create a pipeline schedule


Using the `PipelineSchedule` constructor, you can schedule a pipeline to run once or at a predetermined interval. A pipeline schedule must be of the type `at`, `rate`, or `cron`. This set of scheduling types is an extension of the [EventBridge scheduling options](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html). For more information about how to use the `PipelineSchedule` class, see [sagemaker.workflow.triggers.PipelineSchedule](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#pipeline-schedule). The following example demonstrates how to create each scheduling type with `PipelineSchedule`.

```
from sagemaker.workflow.triggers import PipelineSchedule

# schedules a pipeline run for 12/13/2023 at time 10:15:20 UTC
my_datetime_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(2023, 12, 13, 10, 15, 20)
)

# schedules a pipeline run every 5 minutes
my_rate_schedule = PipelineSchedule(
    name="<schedule-name>", 
    rate=(5, "minutes")
)

# schedules a pipeline run at 10:15am UTC on the last Friday of each month during the years 2022 to 2023
my_cron_schedule = PipelineSchedule(
    name="<schedule-name>", 
    cron="15 10 ? * 6L 2022-2023"
)
```

**Note**  
If you create a one-time schedule and need to access the current time, use `datetime.utcnow()` instead of `datetime.now()`. The latter does not store the current zone context and results in an incorrect time passed to EventBridge.

### Attach the trigger to your pipeline


To attach your `PipelineSchedule` to your pipeline, invoke the `put_triggers` call on your created pipeline object with a list of triggers. If you get a response ARN, you successfully created the schedule in your account and EventBridge begins to invoke the target pipeline at the time or rate specified. You must specify a role with correct permissions to attach triggers to a parent pipeline. If you don't provide one, Pipelines fetches the default role used to create the pipeline from the [configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html).

The following example demonstrates how to attach a schedule to a pipeline.

```
scheduled_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[...],
    sagemaker_session=<sagemaker-session>,
)
custom_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(year=2023, month=12, date=25, hour=10, minute=30, second=30)
)
scheduled_pipeline.put_triggers(triggers=[custom_schedule], role_arn=<role>)
```

### Describe current triggers


To retrieve information about your created pipeline triggers, you can invoke the `describe_trigger()` API with the trigger name. This command returns details about the created schedule expression such as its start time, enabled state, and other useful information. The following snippet shows a sample invocation:

```
scheduled_pipeline.describe_trigger(name="<schedule-name>")
```

### Cleanup trigger resources


Before you delete your pipeline, clean up existing triggers to avoid a resource leak in your account. You should delete the triggers before destroying the parent pipeline. You can delete your triggers by passing a list of trigger names to the `delete_triggers` API. The following snippet demonstrates how to delete triggers.

```
pipeline.delete_triggers(trigger_names=["<schedule-name>"])
```

**Note**  
Be aware of the following limitations when you delete your triggers:  
The option to delete the triggers by specifying trigger names is only available in the SageMaker Python SDK. Deleting the pipeline in the CLI or a `DeletePipeline` API call does not delete your triggers. As a result, the triggers become orphaned and SageMaker AI attempts to start a run for a non-existent pipeline.
Also, if you are using another notebook session or already deleted the pipeline target, clean up orphaned schedules through the scheduler [CLI](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/scheduler/delete-schedule.html) or EventBridge console.

# Amazon SageMaker Experiments Integration
Experiments Integration

Amazon SageMaker Pipelines is closely integrated with Amazon SageMaker Experiments. By default, when Pipelines creates and executes a pipeline, the following SageMaker Experiments entities are created if they don't exist:
+ An experiment for the pipeline
+ A run group for every execution of the pipeline
+ A run that's added to the run group for each SageMaker AI job created in a pipeline execution step

You can compare metrics such as model training accuracy across multiple pipeline executions just as you can compare such metrics across multiple run groups of a SageMaker AI model training experiment.

The following sample shows the relevant parameters of the [Pipeline](https://github.com/aws/sagemaker-python-sdk/blob/v2.41.0/src/sagemaker/workflow/pipeline.py) class in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

```
Pipeline(
    name="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[...]
)
```

If you don't want an experiment and run group created for the pipeline, set `pipeline_experiment_config` to `None`.

**Note**  
Experiments integration was introduced in the Amazon SageMaker Python SDK v2.41.0.

The following naming rules apply based on what you specify for the `ExperimentName` and `TrialName` parameters of `pipeline_experiment_config`:
+ If you don't specify `ExperimentName`, the pipeline `name` is used for the experiment name.

  If you do specify `ExperimentName`, it's used for the experiment name. If an experiment with that name exists, the pipeline-created run groups are added to the existing experiment. If an experiment with that name doesn't exist, a new experiment is created.
+ If you don't specify `TrialName`, the pipeline execution ID is used for the run group name.

  If you do specify `TrialName`, it's used for the run group name. If a run group with that name exists, the pipeline-created runs are added to the existing run group. If a run group with that name doesn't exist, a new run group is created.

**Note**  
The experiment entities aren't deleted when the pipeline that created the entities is deleted. You can use the SageMaker Experiments API to delete the entities.

For information about how to view the SageMaker AI Experiment entities associated with a pipeline, see [Access experiment data from a pipeline](pipelines-studio-experiments.md). For more information on SageMaker Experiments, see [Amazon SageMaker Experiments in Studio Classic](experiments.md).

The following sections show examples of the previous rules and how they are represented in the pipeline definition file. For more information on pipeline definition files, see [Pipelines overview](pipelines-overview.md).

**Topics**
+ [

# Default Behavior
](pipelines-experiments-default.md)
+ [

# Disable Experiments Integration
](pipelines-experiments-none.md)
+ [

# Specify a Custom Experiment Name
](pipelines-experiments-custom-experiment.md)
+ [

# Specify a Custom Run Group Name
](pipelines-experiments-custom-trial.md)

# Default Behavior


**Create a pipeline**

The default behavior when creating a SageMaker AI Pipeline is to automatically integrate it with SageMaker Experiments. If you don't specify any custom configuration, SageMaker AI creates an experiment with the same name as the pipeline, a run group for each execution of the pipeline using the pipeline execution ID as the name, and individual runs within each run group for every SageMaker AI job launched as part of the pipeline steps. You can seamlessly track and compare metrics across different pipeline executions, similar to how you would analyze a model training experiment. The following section demonstrates this default behavior when defining a pipeline without explicitly configuring the experiment integration.

The `pipeline_experiment_config` is omitted. `ExperimentName` defaults to the pipeline `name`. `TrialName` defaults to the execution ID.

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  "Version": "2020-12-01",
  "Parameters": [
    {
      "Name": "InputDataSource"
    },
    {
      "Name": "InstanceCount",
      "Type": "Integer",
      "DefaultValue": 1
    }
  ],
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# Disable Experiments Integration


**Create a pipeline**

You can disable your pipeline's integration with SageMaker Experiments by setting the `pipeline_experiment_config` parameter to `None` when you define your pipeline. This way, SageMaker AI will not automatically create an experiment, run groups, or individual runs for tracking metrics and artifacts associated with your pipeline executions. The following example sets the pipeline config parameter to `None`.

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=None,
    steps=[step_train]
)
```

**Pipeline definition file**

This is the same as the preceding default example, without the `PipelineExperimentConfig`.

# Specify a Custom Experiment Name


While the default behavior is to use the pipeline name as the experiment name in SageMaker Experiments, you can override this and specify a custom experiment name instead. This can be useful if you want to group multiple pipeline executions under the same experiment for easier analysis and comparison. The run group name will still default to the pipeline execution ID unless you explicitly set a custom name for that as well. The following section demonstrates how to create a pipeline with a custom experiment name while leaving the run group name as the default execution ID.

**Create a pipeline**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      "CustomExperimentName",
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": "CustomExperimentName",
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# Specify a Custom Run Group Name


In addition to setting a custom experiment name, you can also specify a custom name for the run groups created by SageMaker Experiments during pipeline executions. This name is appended with the pipeline execution ID to ensure uniqueness. You can specify a custom run group name to identify and analyze related pipeline runs within the same experiment. The following section shows how to define a pipeline with a custom run group name while using the default pipeline name for the experiment name.

**Create a pipeline**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      Join(on="-", values=["CustomTrialName", ExecutionVariables.PIPELINE_EXECUTION_ID])
    ),
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {
      "On": "-",
      "Values": [
         "CustomTrialName",
         {"Get": "Execution.PipelineExecutionId"}
       ]
    }
  },
  "Steps": [...]
}
```

# Run pipelines using local mode


SageMaker Pipelines local mode is an easy way to test your training, processing and inference scripts, as well as the runtime compatibility of [pipeline parameters](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#pipeline-parameters) before you execute your pipeline on the managed SageMaker AI service. By using local mode, you can test your SageMaker AI pipeline locally using a smaller dataset. This allows quick and easy debugging of errors in user scripts and the pipeline definition itself without incurring the costs of using the managed service. The following topic shows you how to define and run pipelines locally.

Pipelines local mode leverages [SageMaker AI jobs local mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode) under the hood. This is a feature in the SageMaker Python SDK that allows you to run SageMaker AI built-in or custom images locally using Docker containers. Pipelines local mode is built on top of SageMaker AI jobs local mode. Therefore, you can expect to see the same results as if you were running those jobs separately. For example, local mode still uses Amazon S3 to upload model artifacts and processing outputs. If you want data generated by local jobs to reside on local disk, you can use the setup mentioned in [Local Mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode).

Pipeline local mode currently supports the following step types:
+ [Training step](build-and-manage-steps-types.md#step-type-training)
+ [Processing step](build-and-manage-steps-types.md#step-type-processing)
+ [Transform step](build-and-manage-steps-types.md#step-type-transform)
+ [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model-create) (with Create Model arguments only)
+ [Condition step](build-and-manage-steps-types.md#step-type-condition)
+ [Fail step](build-and-manage-steps-types.md#step-type-fail)

As opposed to the managed Pipelines service which allows multiple steps to execute in parallel using [Parallelism Configuration](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parallelism-configuration), the local pipeline executor runs the steps sequentially. Therefore, overall execution performance of a local pipeline may be poorer than one that runs on the cloud - this mostly depends on the size of the dataset, algorithm, as well as the power of your local computer. Also note that Pipelines runs in local mode are not recorded in [SageMaker Experiments](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-experiments.html).

**Note**  
Pipelines local mode is not compatible with SageMaker AI algorithms such as XGBoost. If you to want use these algorithms, you must use them in [script mode](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-script-mode/sagemaker-script-mode.html).

In order to execute a pipeline locally, the `sagemaker_session` fields associated with the pipeline steps and the pipeline itself need to be of type `LocalPipelineSession`. The following example shows how you can define a SageMaker AI pipeline to execute locally.

```
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline

local_pipeline_session = LocalPipelineSession()

pytorch_estimator = PyTorch(
    sagemaker_session=local_pipeline_session,
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)

step = TrainingStep(
    name="MyTrainingStep",
    step_args=pytorch_estimator.fit(
        inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data/train"),
    )
)

pipeline = Pipeline(
    name="MyPipeline",
    steps=[step],
    sagemaker_session=local_pipeline_session
)

pipeline.create(
    role_arn=sagemaker.get_execution_role(), 
    description="local pipeline example"
)

// pipeline will execute locally
execution = pipeline.start()

steps = execution.list_steps()

training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']

step_outputs = pipeline_session.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)
```

Once you are ready to execute the pipeline on the managed SageMaker Pipelines service, you can do so by replacing `LocalPipelineSession` in the previous code snippet with `PipelineSession` (as shown in the following code sample) and rerunning the code.

```
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()
```

# Troubleshooting Amazon SageMaker Pipelines
Troubleshooting Pipelines

When using Amazon SageMaker Pipelines, you might run into issues for various reasons. This topic provides information about common errors and how to resolve them. 

 **Pipeline Definition Issues** 

Your pipeline definition might not be formatted correctly. This can result in your execution failing or your job being inaccurate. These errors can be caught when the pipeline is created or when an execution occurs. If your definition doesn’t validate, Pipelines returns an error message identifying the character where the JSON file is malformed. To fix this problem, review the steps created using the SageMaker AI Python SDK for accuracy. 

You can only include steps in a pipeline definition once. Because of this, steps cannot exist as part of a condition step *and* a pipeline in the same pipeline. 

 **Examining Pipeline Logs** 

You can view the status of your steps using the following command: 

```
execution.list_steps()
```

Each step includes the following information:
+ The ARN of the entity launched by the pipeline, such as SageMaker AI job ARN, model ARN, or model package ARN. 
+ The failure reason includes a brief explanation of the step failure.
+ If the step is a condition step, it includes whether the condition is evaluated to true or false.  
+ If the execution reuses a previous job execution, the `CacheHit` lists the source execution.  

You can also view the error messages and logs in the Amazon SageMaker Studio interface. For information about how to see the logs in Studio, see [View the details of a pipeline run](pipelines-studio-view-execution.md).

 **Missing Permissions** 

Correct permissions are required for the role that creates the pipeline execution, and the steps that create each of the jobs in your pipeline execution. Without these permissions, you may not be able to submit your pipeline execution or run your SageMaker AI jobs as expected. To ensure that your permissions are properly set up, see [IAM Access Management](build-and-manage-access.md). 

 **Job Execution Errors ** 

You may run into issues when executing your steps because of issues in the scripts that define the functionality of your SageMaker AI jobs. Each job has a set of CloudWatch logs. To view these logs from Studio, see [View the details of a pipeline run](pipelines-studio-view-execution.md). For information about using CloudWatch logs with SageMaker AI, see [CloudWatch Logs for Amazon SageMaker AI](logging-cloudwatch.md). 

 **Property File Errors** 

You may have issues when incorrectly implementing property files with your pipeline. To ensure that your implementation of property files works as expected, see [Pass Data Between Steps](build-and-manage-propertyfile.md). 

 **Issues copying the script to the container in the Dockerfile** 

You can either copy the script to the container or pass it via the `entry_point` argument (of your estimator entity) or `code` argument (of your processor entity), as demonstrated in the following code sample.

```
step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    processor=sklearn_processor,
    inputs = [
        ProcessingInput(
            input_name='dataset',
            source=...,
            destination="/opt/ml/processing/code",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination = processed_data_path),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination = processed_data_path),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination = processed_data_path),
    ],
    code=os.path.join(BASE_DIR, "process.py"), ## Code is passed through an argument
    cache_config = cache_config,
    job_arguments = ['--input', 'arg1']
)

sklearn_estimator = SKLearn(
    entry_point=os.path.join(BASE_DIR, "train.py"), ## Code is passed through the entry_point
    framework_version="0.23-1",
    instance_type=training_instance_type,
    role=role,
    output_path=model_path, # New
    sagemaker_session=sagemaker_session, # New
    instance_count=1, # New
    base_job_name=f"{base_job_prefix}/pilot-train",
    metric_definitions=[
        {'Name': 'train:accuracy', 'Regex': 'accuracy_train=(.*?);'},
        {'Name': 'validation:accuracy', 'Regex': 'accuracy_validation=(.*?);'}
    ],
)
```

# Pipelines actions
Pipelines actions

You can use either the Amazon SageMaker Pipelines Python SDK or the drag-and-drop visual designer in Amazon SageMaker Studio to author, view, edit, execute, and monitor your ML workflows.

The following screenshot shows the visual designer that you can use to create and manage your Amazon SageMaker Pipelines.

![\[Screenshot of the visual drag-and-drop interface for Pipelines in Studio.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipelines/pipelines-studio-overview.png)


After your pipeline is deployed, you can view the directed acyclic graph (DAG) for your pipeline and manage your executions using Amazon SageMaker Studio. Using SageMaker Studio, you can get information about your current and historical pipelines, compare executions, see the DAG for your executions, get metadata information, and more. To learn about how to view pipelines from Studio, see [View the details of a pipeline](pipelines-studio-list.md). 

**Topics**
+ [

# Define a pipeline
](define-pipeline.md)
+ [

# Edit a pipeline
](edit-pipeline-before-execution.md)
+ [

# Run a pipeline
](run-pipeline.md)
+ [

# Stop a pipeline
](pipelines-studio-stop.md)
+ [

# View the details of a pipeline
](pipelines-studio-list.md)
+ [

# View the details of a pipeline run
](pipelines-studio-view-execution.md)
+ [

# Download a pipeline definition file
](pipelines-studio-download.md)
+ [

# Access experiment data from a pipeline
](pipelines-studio-experiments.md)
+ [

# Track the lineage of a pipeline
](pipelines-lineage-tracking.md)

# Define a pipeline
Define a pipeline

To orchestrate your workflows with Amazon SageMaker Pipelines, you must generate a directed acyclic graph (DAG) in the form of a JSON pipeline definition. The DAG specifies the different steps involved in your ML process, such as data preprocessing, model training, model evaluation, and model deployment, as well as the dependencies and flow of data between these steps. The following topic shows you how to generate a pipeline definition.

You can generate your JSON pipeline definition using either the SageMaker Python SDK or the visual drag-and-drop Pipeline Designer feature in Amazon SageMaker Studio. The following image is a representation of the pipeline DAG that you create in this tutorial:

![\[Screenshot of the visual drag-and-drop interface for Pipelines in Studio.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipelines/pipelines-studio-overview.png)


The pipeline that you define in the following sections solves a regression problem to determine the age of an abalone based on its physical measurements. For a runnable Jupyter notebook that includes the content in this tutorial, see [Orchestrating Jobs with Amazon SageMaker Model Building Pipelines](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.html).

**Note**  
You can reference the model location as a property of the training step, as shown in the end-to-end example [CustomerChurn pipeline](https://github.com/aws-samples/customer-churn-sagemaker-pipelines-sample/blob/main/pipelines/customerchurn/pipeline.py) in Github.

**Topics**

## Define a pipeline (Pipeline Designer)


The following walkthrough guides you through the steps to create a barebones pipeline using the drag-and-drop Pipeline Designer. If you need to pause or end your Pipeline editing session in the visual designer at any time, click on the **Export** option. This allows you to download the current definition of your Pipeline to your local environment. Later, when you want to resume the Pipeline editing process, you can import the same JSON definition file into the visual designer.

### Create a Processing step


To create a data processing job step, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Process data** and drag it to the canvas.

1. In the canvas, choose the **Process data** step you added.

1. To add an input dataset, choose **Add** under **Data (input)** in the right sidebar and select a dataset.

1. To add a location to save output datasets, choose **Add** under **Data (output)** in the right sidebar and navigate to the destination.

1. Complete the remaining fields in the right sidebar. For information about the fields in these tabs, see [ sagemaker.workflow.steps.ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep).

### Create a Training step


To set up a model training step, do the following:

1. In the left sidebar, choose **Train model** and drag it to the canvas.

1. In the canvas, choose the **Train model** step you added.

1. To add an input dataset, choose **Add** under **Data (input)** in the right sidebar and select a dataset.

1. To choose a location to save your model artifacts, enter an Amazon S3 URI in the **Location (S3 URI)** field, or choose **Browse S3** to navigate to the destination location.

1. Complete the remaining fields in the right sidebar. For information about the fields in these tabs, see [ sagemaker.workflow.steps.TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep).

1. Click and drag the cursor from the **Process data** step you added in the previous section to the **Train model** step to create an edge connecting the two steps.

### Create a model package with a Register model step


To create a model package with a model registration step, do the following:

1. In the left sidebar, choose **Register model** and drag it to the canvas.

1. In the canvas, choose the **Register model** step you added.

1. To select a model to register, choose **Add** under **Model (input)**.

1. Choose **Create a model group** to add your model to a new model group.

1. Complete the remaining fields in the right sidebar. For information about the fields in these tabs, see [ sagemaker.workflow.step\$1collections.RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel).

1. Click and drag the cursor from the **Train model** step you added in the previous section to the **Register model** step to create an edge connecting the two steps.

### Deploy the model to an endpoint with a Deploy model (endpoint) step


To deploy your model using a model deployment step, do the following:

1. In the left sidebar, choose **Deploy model (endpoint)** and drag it to the canvas.

1. In the canvas, choose the **Deploy model (endpoint)** step you added.

1. To choose a model to deploy, choose **Add** under **Model (input)**.

1. Choose the **Create endpoint** radio button to create a new endpoint.

1. Enter a **Name** and **Description** for your endpoint.

1. Click and drag the cursor from the **Register model** step you added in the previous section to the **Deploy model (endpoint)** step to create an edge connecting the two steps.

1. Complete the remaining fields in the right sidebar.

### Define the Pipeline parameters


You can configure a set of Pipeline parameters whose values can be updated for every execution. To define the pipeline parameters and set the default values, click on the gear icon at the bottom of the visual designer.

### Save Pipeline


After you have entered all the required information to create your pipeline, click on **Save** at the bottom of the visual designer. This validates your pipeline for any potential errors at runtime and notifies you. The **Save** operation won't succeed until you address all errors flagged by the automated validations checks. If you want to resume editing at a later point, you can save your in-progress pipeline as a JSON definition in your local environment. You can export your Pipeline as a JSON definition file by clicking on the **Export** button at the bottom of the visual designer. Later, to resume updating your Pipeline, upload that JSON definition file by clicking on the **Import** button.

## Define a pipeline (SageMaker Python SDK)


### Prerequisites


 To run the following tutorial, complete the following: 
+ Set up your notebook instance as outlined in [Create a notebook instance](https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html). This gives your role permissions to read and write to Amazon S3, and create training, batch transform, and processing jobs in SageMaker AI. 
+ Grant your notebook permissions to get and pass its own role as shown in [Modifying a role permissions policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/roles-managingrole-editing-console.html#roles-modify_permissions-policy). Add the following JSON snippet to attach this policy to your role. Replace `<your-role-arn>` with the ARN used to create your notebook instance. 

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iam:GetRole",
                  "iam:PassRole"
              ],
              "Resource": "arn:aws:iam::111122223333:role/role-name"
          }
      ]
  }
  ```

------
+  Trust the SageMaker AI service principal by following the steps in [Modifying a role trust policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/roles-managingrole-editing-cli.html#roles-managingrole_edit-trust-policy-cli). Add the following statement fragment to the trust relationship of your role: 

  ```
  {
        "Sid": "",
        "Effect": "Allow",
        "Principal": {
          "Service": "sagemaker.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
  ```

#### Set up your environment


Create a new SageMaker AI session using the following code block. This returns the role ARN for the session. This role ARN should be the execution role ARN that you set up as a prerequisite. 

```
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

pipeline_session = PipelineSession()

model_package_group_name = f"AbaloneModelPackageGroupName"
```

### Create a pipeline


**Important**  
Custom IAM policies that allow Amazon SageMaker Studio or Amazon SageMaker Studio Classic to create Amazon SageMaker resources must also grant permissions to add tags to those resources. The permission to add tags to resources is required because Studio and Studio Classic automatically tag any resources they create. If an IAM policy allows Studio and Studio Classic to create resources but does not allow tagging, "AccessDenied" errors can occur when trying to create resources. For more information, see [Provide permissions for tagging SageMaker AI resources](security_iam_id-based-policy-examples.md#grant-tagging-permissions).  
[AWS managed policies for Amazon SageMaker AI](security-iam-awsmanpol.md) that give permissions to create SageMaker resources already include permissions to add tags while creating those resources.

Run the following steps from your SageMaker AI notebook instance to create a pipeline that includes steps for:
+ preprocessing
+ training
+ evaluation
+ conditional evaluation
+ model registration

**Note**  
You can use [ExecutionVariables](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#execution-variables) and the [ Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#execution-variables) function to specify your output location. `ExecutionVariables` is resolved at runtime. For instance, `ExecutionVariables.PIPELINE_EXECUTION_ID` is resolved to the ID of the current execution, which can be used as a unique identifier across different runs.

#### Step 1: Download the dataset


This notebook uses the UCI Machine Learning Abalone Dataset. The dataset contains the following features: 
+ `length` – The longest shell measurement of the abalone.
+ `diameter` – The diameter of the abalone perpendicular to its length.
+ `height` – The height of the abalone with meat in the shell.
+ `whole_weight` – The weight of the whole abalone.
+ `shucked_weight` – The weight of the meat removed from the abalone.
+ `viscera_weight` – The weight of the abalone viscera after bleeding.
+ `shell_weight` – The weight of the abalone shell after meat removal and drying.
+ `sex` – The sex of the abalone. One of 'M', 'F', or 'I', where 'I' is an infant abalone.
+ `rings` – The number of rings in the abalone shell.

The number of rings in the abalone shell is a good approximation for its age using the formula `age=rings + 1.5`. However, getting this number is a time-consuming task. You must cut the shell through the cone, stain the section, and count the number of rings through a microscope. However, the other physical measurements are easier to get. This notebook uses the dataset to build a predictive model of the variable rings using the other physical measurements.

**To download the dataset**

1. Download the dataset into your account's default Amazon S3 bucket.

   ```
   !mkdir -p data
   local_path = "data/abalone-dataset.csv"
   
   s3 = boto3.resource("s3")
   s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
       "dataset/abalone-dataset.csv",
       local_path
   )
   
   base_uri = f"s3://{default_bucket}/abalone"
   input_data_uri = sagemaker.s3.S3Uploader.upload(
       local_path=local_path, 
       desired_s3_uri=base_uri,
   )
   print(input_data_uri)
   ```

1. Download a second dataset for batch transformation after your model is created.

   ```
   local_path = "data/abalone-dataset-batch.csv"
   
   s3 = boto3.resource("s3")
   s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
       "dataset/abalone-dataset-batch",
       local_path
   )
   
   base_uri = f"s3://{default_bucket}/abalone"
   batch_data_uri = sagemaker.s3.S3Uploader.upload(
       local_path=local_path, 
       desired_s3_uri=base_uri,
   )
   print(batch_data_uri)
   ```

#### Step 2: Define pipeline parameters


 This code block defines the following parameters for your pipeline: 
+  `processing_instance_count` – The instance count of the processing job. 
+  `input_data` – The Amazon S3 location of the input data. 
+  `batch_data` – The Amazon S3 location of the input data for batch transformation. 
+  `model_approval_status` – The approval status to register the trained model with for CI/CD. For more information, see [MLOps Automation With SageMaker Projects](sagemaker-projects.md).

```
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
```

#### Step 3: Define a processing step for feature engineering


This section shows how to create a processing step to prepare the data from the dataset for training.

**To create a processing step**

1.  Create a directory for the processing script.

   ```
   !mkdir -p abalone
   ```

1. Create a file in the `/abalone` directory named `preprocessing.py` with the following content. This preprocessing script is passed in to the processing step for running on the input data. The training step then uses the preprocessed training features and labels to train a model. The evaluation step uses the trained model and preprocessed test features and labels to evaluate the model. The script uses `scikit-learn` to do the following:
   +  Fill in missing `sex` categorical data and encode it so it's suitable for training. 
   +  Scale and normalize all numerical fields except for `rings` and `sex`. 
   +  Split the data into training, test, and validation datasets. 

   ```
   %%writefile abalone/preprocessing.py
   import argparse
   import os
   import requests
   import tempfile
   import numpy as np
   import pandas as pd
   
   
   from sklearn.compose import ColumnTransformer
   from sklearn.impute import SimpleImputer
   from sklearn.pipeline import Pipeline
   from sklearn.preprocessing import StandardScaler, OneHotEncoder
   
   
   # Because this is a headerless CSV file, specify the column names here.
   feature_columns_names = [
       "sex",
       "length",
       "diameter",
       "height",
       "whole_weight",
       "shucked_weight",
       "viscera_weight",
       "shell_weight",
   ]
   label_column = "rings"
   
   feature_columns_dtype = {
       "sex": str,
       "length": np.float64,
       "diameter": np.float64,
       "height": np.float64,
       "whole_weight": np.float64,
       "shucked_weight": np.float64,
       "viscera_weight": np.float64,
       "shell_weight": np.float64
   }
   label_column_dtype = {"rings": np.float64}
   
   
   def merge_two_dicts(x, y):
       z = x.copy()
       z.update(y)
       return z
   
   
   if __name__ == "__main__":
       base_dir = "/opt/ml/processing"
   
       df = pd.read_csv(
           f"{base_dir}/input/abalone-dataset.csv",
           header=None, 
           names=feature_columns_names + [label_column],
           dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
       )
       numeric_features = list(feature_columns_names)
       numeric_features.remove("sex")
       numeric_transformer = Pipeline(
           steps=[
               ("imputer", SimpleImputer(strategy="median")),
               ("scaler", StandardScaler())
           ]
       )
   
       categorical_features = ["sex"]
       categorical_transformer = Pipeline(
           steps=[
               ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
               ("onehot", OneHotEncoder(handle_unknown="ignore"))
           ]
       )
   
       preprocess = ColumnTransformer(
           transformers=[
               ("num", numeric_transformer, numeric_features),
               ("cat", categorical_transformer, categorical_features)
           ]
       )
       
       y = df.pop("rings")
       X_pre = preprocess.fit_transform(df)
       y_pre = y.to_numpy().reshape(len(y), 1)
       
       X = np.concatenate((y_pre, X_pre), axis=1)
       
       np.random.shuffle(X)
       train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])
   
       
       pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
       pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
       pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
   ```

1.  Create an instance of an `SKLearnProcessor` to pass in to the processing step. 

   ```
   from sagemaker.sklearn.processing import SKLearnProcessor
   
   
   framework_version = "0.23-1"
   
   sklearn_processor = SKLearnProcessor(
       framework_version=framework_version,
       instance_type="ml.m5.xlarge",
       instance_count=processing_instance_count,
       base_job_name="sklearn-abalone-process",
       sagemaker_session=pipeline_session,
       role=role,
   )
   ```

1. Create a processing step. This step takes in the `SKLearnProcessor`, the input and output channels, and the `preprocessing.py` script that you created. This is very similar to a processor instance's `run` method in the SageMaker AI Python SDK. The `input_data` parameter passed into `ProcessingStep` is the input data of the step itself. This input data is used by the processor instance when it runs. 

    Note the  `"train`, `"validation`, and `"test"` named channels specified in the output configuration for the processing job. Step `Properties` such as these can be used in subsequent steps and resolve to their runtime values at runtime. 

   ```
   from sagemaker.processing import ProcessingInput, ProcessingOutput
   from sagemaker.workflow.steps import ProcessingStep
      
   
   processor_args = sklearn_processor.run(
       inputs=[
         ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
       ],
       outputs=[
           ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
           ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
           ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
       ],
       code="abalone/preprocessing.py",
   ) 
   
   step_process = ProcessingStep(
       name="AbaloneProcess",
       step_args=processor_args
   )
   ```

#### Step 4: Define a training step


This section shows how to use the SageMaker AI [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train a model on the training data output from the processing steps. 

**To define a training step**

1.  Specify the model path where you want to save the models from training. 

   ```
   model_path = f"s3://{default_bucket}/AbaloneTrain"
   ```

1. Configure an estimator for the XGBoost algorithm and the input dataset. The training instance type is passed into the estimator. A typical training script:
   + loads data from the input channels
   + configures training with hyperparameters
   + trains a model
   + saves a model to `model_dir` so that it can be hosted later

   SageMaker AI uploads the model to Amazon S3 in the form of a `model.tar.gz` at the end of the training job.

   ```
   from sagemaker.estimator import Estimator
   
   
   image_uri = sagemaker.image_uris.retrieve(
       framework="xgboost",
       region=region,
       version="1.0-1",
       py_version="py3",
       instance_type="ml.m5.xlarge"
   )
   xgb_train = Estimator(
       image_uri=image_uri,
       instance_type="ml.m5.xlarge",
       instance_count=1,
       output_path=model_path,
       sagemaker_session=pipeline_session,
       role=role,
   )
   xgb_train.set_hyperparameters(
       objective="reg:linear",
       num_round=50,
       max_depth=5,
       eta=0.2,
       gamma=4,
       min_child_weight=6,
       subsample=0.7,
       silent=0
   )
   ```

1. Create a `TrainingStep` using the estimator instance and properties of the `ProcessingStep`. Pass in the `S3Uri` of the `"train"` and `"validation"` output channel to the `TrainingStep`.  

   ```
   from sagemaker.inputs import TrainingInput
   from sagemaker.workflow.steps import TrainingStep
   
   
   train_args = xgb_train.fit(
       inputs={
           "train": TrainingInput(
               s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                   "train"
               ].S3Output.S3Uri,
               content_type="text/csv"
           ),
           "validation": TrainingInput(
               s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                   "validation"
               ].S3Output.S3Uri,
               content_type="text/csv"
           )
       },
   )
   
   step_train = TrainingStep(
       name="AbaloneTrain",
       step_args = train_args
   )
   ```

#### Step 5: Define a processing step for model evaluation


This section shows how to create a processing step to evaluate the accuracy of the model. The result of this model evaluation is used in the condition step to determine which run path to take.

**To define a processing step for model evaluation**

1. Create a file in the `/abalone` directory named `evaluation.py`. This script is used in a processing step to perform model evaluation. It takes a trained model and the test dataset as input, then produces a JSON file containing classification evaluation metrics.

   ```
   %%writefile abalone/evaluation.py
   import json
   import pathlib
   import pickle
   import tarfile
   import joblib
   import numpy as np
   import pandas as pd
   import xgboost
   
   
   from sklearn.metrics import mean_squared_error
   
   
   if __name__ == "__main__":
       model_path = f"/opt/ml/processing/model/model.tar.gz"
       with tarfile.open(model_path) as tar:
           tar.extractall(path=".")
       
       model = pickle.load(open("xgboost-model", "rb"))
   
       test_path = "/opt/ml/processing/test/test.csv"
       df = pd.read_csv(test_path, header=None)
       
       y_test = df.iloc[:, 0].to_numpy()
       df.drop(df.columns[0], axis=1, inplace=True)
       
       X_test = xgboost.DMatrix(df.values)
       
       predictions = model.predict(X_test)
   
       mse = mean_squared_error(y_test, predictions)
       std = np.std(y_test - predictions)
       report_dict = {
           "regression_metrics": {
               "mse": {
                   "value": mse,
                   "standard_deviation": std
               },
           },
       }
   
       output_dir = "/opt/ml/processing/evaluation"
       pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
       
       evaluation_path = f"{output_dir}/evaluation.json"
       with open(evaluation_path, "w") as f:
           f.write(json.dumps(report_dict))
   ```

1.  Create an instance of a `ScriptProcessor` that is used to create a `ProcessingStep`. 

   ```
   from sagemaker.processing import ScriptProcessor
   
   
   script_eval = ScriptProcessor(
       image_uri=image_uri,
       command=["python3"],
       instance_type="ml.m5.xlarge",
       instance_count=1,
       base_job_name="script-abalone-eval",
       sagemaker_session=pipeline_session,
       role=role,
   )
   ```

1.  Create a `ProcessingStep` using the processor instance, the input and output channels, and the  `evaluation.py` script. Pass in:
   + the `S3ModelArtifacts` property from the `step_train` training step
   + the `S3Uri` of the `"test"` output channel of the `step_process` processing step

   This is very similar to a processor instance's `run` method in the SageMaker AI Python SDK.  

   ```
   from sagemaker.workflow.properties import PropertyFile
   
   
   evaluation_report = PropertyFile(
       name="EvaluationReport",
       output_name="evaluation",
       path="evaluation.json"
   )
   
   eval_args = script_eval.run(
           inputs=[
           ProcessingInput(
               source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
               destination="/opt/ml/processing/model"
           ),
           ProcessingInput(
               source=step_process.properties.ProcessingOutputConfig.Outputs[
                   "test"
               ].S3Output.S3Uri,
               destination="/opt/ml/processing/test"
           )
       ],
       outputs=[
           ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
       ],
       code="abalone/evaluation.py",
   )
   
   step_eval = ProcessingStep(
       name="AbaloneEval",
       step_args=eval_args,
       property_files=[evaluation_report],
   )
   ```

#### Step 6: Define a CreateModelStep for batch transformation


**Important**  
We recommend using [Model step](build-and-manage-steps-types.md#step-type-model) to create models as of v2.90.0 of the SageMaker Python SDK. `CreateModelStep` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

This section shows how to create a SageMaker AI model from the output of the training step. This model is used for batch transformation on a new dataset. This step is passed into the condition step and only runs if the condition step evaluates to `true`.

**To define a CreateModelStep for batch transformation**

1.  Create a SageMaker AI model. Pass in the `S3ModelArtifacts` property from the `step_train` training step.

   ```
   from sagemaker.model import Model
   
   
   model = Model(
       image_uri=image_uri,
       model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
       sagemaker_session=pipeline_session,
       role=role,
   )
   ```

1. Define the model input for your SageMaker AI model.

   ```
   from sagemaker.inputs import CreateModelInput
   
   
   inputs = CreateModelInput(
       instance_type="ml.m5.large",
       accelerator_type="ml.eia1.medium",
   )
   ```

1. Create your `CreateModelStep` using the `CreateModelInput` and SageMaker AI model instance you defined.

   ```
   from sagemaker.workflow.steps import CreateModelStep
   
   
   step_create_model = CreateModelStep(
       name="AbaloneCreateModel",
       model=model,
       inputs=inputs,
   )
   ```

#### Step 7: Define a TransformStep to perform batch transformation


This section shows how to create a `TransformStep` to perform batch transformation on a dataset after the model is trained. This step is passed into the condition step and only runs if the condition step evaluates to `true`.

**To define a TransformStep to perform batch transformation**

1. Create a transformer instance with the appropriate compute instance type, instance count, and desired output Amazon S3 bucket URI. Pass in the `ModelName` property from the `step_create_model` `CreateModel` step. 

   ```
   from sagemaker.transformer import Transformer
   
   
   transformer = Transformer(
       model_name=step_create_model.properties.ModelName,
       instance_type="ml.m5.xlarge",
       instance_count=1,
       output_path=f"s3://{default_bucket}/AbaloneTransform"
   )
   ```

1. Create a `TransformStep` using the transformer instance you defined and the `batch_data` pipeline parameter.

   ```
   from sagemaker.inputs import TransformInput
   from sagemaker.workflow.steps import TransformStep
   
   
   step_transform = TransformStep(
       name="AbaloneTransform",
       transformer=transformer,
       inputs=TransformInput(data=batch_data)
   )
   ```

#### Step 8: Define a RegisterModel step to create a model package


**Important**  
We recommend using [Model step](build-and-manage-steps-types.md#step-type-model) to register models as of v2.90.0 of the SageMaker Python SDK. `RegisterModel` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

This section shows how to create an instance of `RegisterModel`. The result of running `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. It consists of an inference specification that defines the inference image to use along with an optional model weights location. A model package group is a collection of model packages. You can use a `ModelPackageGroup` for Pipelines to add a new version and model package to the group for every pipeline run. For more information about model registry, see [Model Registration Deployment with Model Registry](model-registry.md).

This step is passed into the condition step and only runs if the condition step evaluates to `true`.

**To define a RegisterModel step to create a model package**
+  Construct a `RegisterModel` step using the estimator instance you used for the training step . Pass in the `S3ModelArtifacts` property from the `step_train` training step and specify a `ModelPackageGroup`. Pipelines creates this `ModelPackageGroup` for you.

  ```
  from sagemaker.model_metrics import MetricsSource, ModelMetrics 
  from sagemaker.workflow.step_collections import RegisterModel
  
  
  model_metrics = ModelMetrics(
      model_statistics=MetricsSource(
          s3_uri="{}/evaluation.json".format(
              step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
          ),
          content_type="application/json"
      )
  )
  step_register = RegisterModel(
      name="AbaloneRegisterModel",
      estimator=xgb_train,
      model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
      content_types=["text/csv"],
      response_types=["text/csv"],
      inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
      transform_instances=["ml.m5.xlarge"],
      model_package_group_name=model_package_group_name,
      approval_status=model_approval_status,
      model_metrics=model_metrics
  )
  ```

#### Step 9: Define a condition step to verify model accuracy


A `ConditionStep` allows Pipelines to support conditional running in your pipeline DAG based on the condition of step properties. In this case, you only want to register a model package if the accuracy of that model exceeds the required value. The accuracy of the model is determined by the model evaluation step. If the accuracy exceeds the required value, the pipeline also creates a SageMaker AI Model and runs batch transformation on a dataset. This section shows how to define the Condition step.

**To define a condition step to verify model accuracy**

1.  Define a `ConditionLessThanOrEqualTo` condition using the accuracy value found in the output of the model evaluation processing step, `step_eval`. Get this output using the property file you indexed in the processing step and the respective JSONPath of the mean squared error value, `"mse"`.

   ```
   from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
   from sagemaker.workflow.condition_step import ConditionStep
   from sagemaker.workflow.functions import JsonGet
   
   
   cond_lte = ConditionLessThanOrEqualTo(
       left=JsonGet(
           step_name=step_eval.name,
           property_file=evaluation_report,
           json_path="regression_metrics.mse.value"
       ),
       right=6.0
   )
   ```

1.  Construct a `ConditionStep`. Pass the `ConditionEquals` condition in, then set the model package registration and batch transformation steps as the next steps if the condition passes. 

   ```
   step_cond = ConditionStep(
       name="AbaloneMSECond",
       conditions=[cond_lte],
       if_steps=[step_register, step_create_model, step_transform],
       else_steps=[], 
   )
   ```

#### Step 10: Create a pipeline


Now that you’ve created all of the steps, combine them into a pipeline.

**To create a pipeline**

1.  Define the following for your pipeline: `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.
**Note**  
A step can only appear once in either the pipeline's step list or the if/else step lists of the condition step. It cannot appear in both. 

   ```
   from sagemaker.workflow.pipeline import Pipeline
   
   
   pipeline_name = f"AbalonePipeline"
   pipeline = Pipeline(
       name=pipeline_name,
       parameters=[
           processing_instance_count,
           model_approval_status,
           input_data,
           batch_data,
       ],
       steps=[step_process, step_train, step_eval, step_cond],
   )
   ```

1.  (Optional) Examine the JSON pipeline definition to ensure that it's well-formed.

   ```
   import json
   
   json.loads(pipeline.definition())
   ```

 This pipeline definition is ready to submit to SageMaker AI. In the next tutorial, you submit this pipeline to SageMaker AI and start a run. 

## Define a pipeline (JSON)


You can also use [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_pipeline) or [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-pipeline.html) to create a pipeline. Creating a pipeline requires a pipeline definition, which is a JSON object that defines each step of the pipeline. The SageMaker SDK offers a simple way to construct the pipeline definition, which you can use with any of the APIs previously mentioned to create the pipeline itself. Without using the SDK, users have to write the raw JSON definition to create the pipeline without any of the error checks provided by the SageMaker Python SDK. To see the schema for the pipeline JSON definition, see [ SageMaker AI Pipeline Definition JSON Schema](https://aws-sagemaker-mlops.github.io/sagemaker-model-building-pipeline-definition-JSON-schema/). The following code sample shows an example of a SageMaker AI pipeline definition JSON object:

```
{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'ProcessedData',
   'Type': 'String',
   'DefaultValue': 'S3_URL',
{'Name': 'InputDataUrl',
   'Type': 'String',
   'DefaultValue': 'S3_URL',
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'ReadTrainDataFromFS',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.4xlarge',
      'InstanceCount': 2,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': 'IMAGE_URI',
     'ContainerArguments': [....]},
    'RoleArn': 'ROLE',
      'ProcessingInputs': [...],
    'ProcessingOutputConfig': {'Outputs': [.....]},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400}},
   'CacheConfig': {'Enabled': True, 'ExpireAfter': '30d'}},
   ...
   ...
   ...
  }
```

 **Next step:** [Run a pipeline](run-pipeline.md) 

# Edit a pipeline


To make changes to a pipeline before running it, do the following:

1. Open SageMaker Studio by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane of Studio, select **Pipelines**.

1. Select a pipeline name to view details about the pipeline.

1. Choose the **Executions** tab.

1. Select the name of a pipeline execution.

1. Choose **Edit** to open the Pipeline Designer.

1. Update the edges between steps or the step configuration as required and click **Save**. 

   Saving a pipeline after editing automatically generates a new version number.

1. Choose **Run**.

# Run a pipeline


After defining the steps of your pipeline as a directed acyclic graph (DAG), you can run your pipeline, which executes the steps defined in your DAG. The following walkthroughs show you how to run an Amazon SageMaker AI pipeline using either the drag-and-drop visual editor in Amazon SageMaker Studio or the Amazon SageMaker Python SDK.

## Run a pipeline (Pipeline designer)


To start a new execution of your pipeline, do the following:

------
#### [ Studio ]

1. Open SageMaker Studio by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane, choose **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Choose a pipeline name to open the pipeline details view.

1. Choose **Visual Editor** on the top right.

1. To start an execution from the latest version, choose **Executions**.

1. To start an execution from a specific version, follow these steps:
   + Choose the version icon in the bottom toolbar to open the version panel.
   + Choose the pipeline version you want to execute.
   + Hover over the version item to reveal the three-dot menu, choose **Execute**.
   + (Optional) To view a previous version of the pipeline, choose **Preview** from the three-dot menu in the version panel. You can also edit the version by choosing **Edit** in the notification bar.

**Note**  
If your pipeline fails, the status banner will show a **Failed** status. After troubleshooting the failed step, choose **Retry** on the status banner to resume running the pipeline from that step.

------
#### [ Studio Classic ]

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the Studio Classic sidebar, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Pipelines** from the menu.

1. To narrow the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name.

1. From the **Executions** or **Graph** tab in the execution list, choose **Create execution**.

1. Enter or update the following required information:
   + **Name** – Must be unique to your account in the AWS Region.
   + **ProcessingInstanceCount** – The number of instances to use for processing.
   + **ModelApprovalStatus** – For your convenience.
   + **InputDataUrl** – The Amazon S3 URI of the input data.

1. Choose **Start**.

Once your pipeline is running, you can view the details of the execution by choosing **View details** on the status banner.

To stop the run, choose **Stop** on the status banner. To resume the execution from where it was stopped, choose **Resume** on the status banner.

**Note**  
If your pipeline fails, the status banner will show a **Failed** status. After troubleshooting the failed step, choose **Retry** on the status banner to resume running the pipeline from that step.

------

## Run a pipeline (SageMaker Python SDK)


After you’ve created a pipeline definition using the SageMaker AI Python SDK, you can submit it to SageMaker AI to start your execution. The following tutorial shows how to submit a pipeline, start an execution, examine the results of that execution, and delete your pipeline. 

**Topics**
+ [

### Prerequisites
](#run-pipeline-prereq)
+ [

### Step 1: Start the Pipeline
](#run-pipeline-submit)
+ [

### Step 2: Examine a Pipeline Execution
](#run-pipeline-examine)
+ [

### Step 3: Override Default Parameters for a Pipeline Execution
](#run-pipeline-parametrized)
+ [

### Step 4: Stop and Delete a Pipeline Execution
](#run-pipeline-delete)

### Prerequisites


This tutorial requires the following: 
+  A SageMaker notebook instance.  
+  A Pipelines pipeline definition. This tutorial assumes you're using the pipeline definition created by completing the [Define a pipeline](define-pipeline.md) tutorial. 

### Step 1: Start the Pipeline


First, you need to start the pipeline. 

**To start the pipeline**

1. Examine the JSON pipeline definition to ensure that it's well-formed.

   ```
   import json
   
   json.loads(pipeline.definition())
   ```

1. Submit the pipeline definition to the Pipelines service to create a pipeline if it doesn't exist, or update the pipeline if it does. The role passed in is used by Pipelines to create all of the jobs defined in the steps. 

   ```
   pipeline.upsert(role_arn=role)
   ```

1. Start a pipeline execution.

   ```
   execution = pipeline.start()
   ```

### Step 2: Examine a Pipeline Execution


Next, you need to examine the pipeline execution. 

**To examine a pipeline execution**

1.  Describe the pipeline execution status to ensure that it has been created and started successfully.

   ```
   execution.describe()
   ```

1. Wait for the execution to finish. 

   ```
   execution.wait()
   ```

1. List the execution steps and their status.

   ```
   execution.list_steps()
   ```

   Your output should look like the following:

   ```
   [{'StepName': 'AbaloneTransform',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 27, 870000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 45, 50, 492000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:transform-job/pipelines-cfvy1tjuxdq8-abalonetransform-ptyjoef3jy'}}},
    {'StepName': 'AbaloneRegisterModel',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 929000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 28, 15000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:model-package/abalonemodelpackagegroupname/1'}}},
    {'StepName': 'AbaloneCreateModel',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 895000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 27, 708000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:model/pipelines-cfvy1tjuxdq8-abalonecreatemodel-jl94rai0ra'}}},
    {'StepName': 'AbaloneMSECond',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 25, 558000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 329000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'Condition': {'Outcome': 'True'}}},
    {'StepName': 'AbaloneEval',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 37, 34, 767000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 18, 80000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:processing-job/pipelines-cfvy1tjuxdq8-abaloneeval-zfraozhmny'}}},
    {'StepName': 'AbaloneTrain',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 34, 55, 867000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 37, 34, 34000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:training-job/pipelines-cfvy1tjuxdq8-abalonetrain-tavd6f3wdf'}}},
    {'StepName': 'AbaloneProcess',
     'StartTime': datetime.datetime(2020, 11, 21, 2, 30, 27, 160000, tzinfo=tzlocal()),
     'EndTime': datetime.datetime(2020, 11, 21, 2, 34, 48, 390000, tzinfo=tzlocal()),
     'StepStatus': 'Succeeded',
     'CacheHitResult': {'SourcePipelineExecutionArn': ''},
     'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:processing-job/pipelines-cfvy1tjuxdq8-abaloneprocess-mgqyfdujcj'}}}]
   ```

1. After your pipeline execution is complete, download the resulting  `evaluation.json` file from Amazon S3 to examine the report. 

   ```
   evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
       step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
   ))
   json.loads(evaluation_json)
   ```

### Step 3: Override Default Parameters for a Pipeline Execution


You can run additional executions of the pipeline by specifying different pipeline parameters to override the defaults.

**To override default parameters**

1. Create the pipeline execution. This starts another pipeline execution with the model approval status override set to "Approved". This means that the model package version generated by the `RegisterModel` step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects. For more information, see [MLOps Automation With SageMaker Projects](sagemaker-projects.md).

   ```
   execution = pipeline.start(
       parameters=dict(
           ModelApprovalStatus="Approved",
       )
   )
   ```

1. Wait for the execution to finish. 

   ```
   execution.wait()
   ```

1. List the execution steps and their status.

   ```
   execution.list_steps()
   ```

1. After your pipeline execution is complete, download the resulting  `evaluation.json` file from Amazon S3 to examine the report. 

   ```
   evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
       step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
   ))
   json.loads(evaluation_json)
   ```

### Step 4: Stop and Delete a Pipeline Execution


When you're finished with your pipeline, you can stop any ongoing executions and delete the pipeline.

**To stop and delete a pipeline execution**

1. Stop the pipeline execution.

   ```
   execution.stop()
   ```

1. Delete the pipeline.

   ```
   pipeline.delete()
   ```

# Stop a pipeline


You can stop a pipeline run in the Amazon SageMaker Studio console.

To stop a pipeline execution in the Amazon SageMaker Studio console, complete the following steps based on whether you use Studio or Studio Classic.

------
#### [ Studio ]

1. In the left navigation pane, select **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name.

1. Choose the **Executions** tab.

1. Select the execution to stop.

1. Choose **Stop**. To resume the execution from where it was stopped, choose **Resume**

------
#### [ Studio Classic ]

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the Studio Classic sidebar, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Pipelines** from the menu.

1. To narrow the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. To stop a pipeline run, choose **View details** on the status banner of the pipeline, and then choose **Stop**. To resume the execution from where it was stopped, choose **Resume**.

------

# View the details of a pipeline


You can view the details of a SageMaker AI pipeline to understand its parameters, the dependencies of its steps, or monitor its progress and status. This can help you troubleshoot or optimize your workflow. You can access the details of a given pipeline using the Amazon SageMaker Studio console and explore its execution history, definition, parameters, and metadata.

Alternatively, if your pipeline is associated with a SageMaker AI Project, you can access the pipeline details from the project's details page. For more information, see [View Project Resources](sagemaker-projects-resources.md).

To view the details of a SageMaker AI pipeline, complete the following steps based on whether you use Studio or Studio Classic.

**Note**  
Model repacking happens when the pipeline needs to include a custom script in the compressed model file (model.tar.gz) to be uploaded to Amazon S3 and used to deploy a model to a SageMaker AI endpoint. When SageMaker AI pipeline trains a model and registers it to the model registry, it introduces a repack step *if* the trained model output from the training job needs to include a custom inference script. The repack step uncompresses the model, adds a new script, and recompresses the model. Running the pipeline adds the repack step as a training job.

------
#### [ Studio ]

1. Open the SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane, select **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name to view details about the pipeline.

1. Choose one of the following tabs to view pipeline details:
   + **Executions** – Details about the executions.
   + **Graph** – The pipeline graph, including all steps.
   + **Parameters** – The run parameters and metrics related to the pipeline.
   + **Information** – The metadata associated with the pipeline, such as tags, the pipeline Amazon Resource Name (ARN), and role ARN. You can also edit the pipeline description from this page.

------
#### [ Studio Classic ]

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the Studio Classic sidebar, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Pipelines** from the menu.

1. To narrow the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name to view details about the pipeline. The pipeline details tab opens and displays a list of pipeline executions. You can start an execution or choose one of the other tabs for more information about the pipeline. Use the **Property Inspector** icon (![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/gears.png)) to choose which columns to display.

1. From the pipeline details page, choose one of the following tabs to view details about the pipeline:
   + **Executions** – Details about the executions. You can create an execution from this tab or the **Graph** tab.
   + **Graph** – The DAG for the pipeline.
   + **Parameters** – Includes the model approval status.
   + **Settings** – The metadata associated with the pipeline. You can download the pipeline definition file and edit the pipeline name and description from this tab.

------

# View the details of a pipeline run


You can review the details of a particular SageMaker AI pipeline run. This can help you:
+ Identify and resolve problems that may have occurred during the run, such as failed steps or unexpected errors.
+ Compare the results of different pipeline executions to understand how changes in input data or parameters impact the overall workflow.
+ Identify bottlenecks and opportunities for optimization.

To view the details of a pipeline run, complete the following steps based on whether you use Studio or Studio Classic.

------
#### [ Studio ]

1. Open the SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane, select **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name to view details about the pipeline.

1. Choose the **Executions** tab.

1. Select the name of a pipeline execution to view. The pipeline graph for that execution appears.

1. Choose any of the pipeline steps in the graph to see step settings in the right sidebar.

1. Choose one of the following tabs to view more pipeline details:
   + **Definition** — The pipeline graph, including all steps.
   + **Parameters** – Includes the model approval status.
   + **Details** – The metadata associated with the pipeline, such as tags, the pipeline Amazon Resource Name (ARN), and role ARN. You can also edit the pipeline description from this page.

------
#### [ Studio Classic ]

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the Studio Classic sidebar, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Pipelines** from the menu.

1. To narrow the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name. The pipeline's **Executions** page opens.

1. In the **Executions** page, select an execution name to view details about the execution. The execution details tab opens and displays a graph of the steps in the pipeline.

1. To search for a step by name, type characters that match a step name in the search field. Use the resizing icons on the lower-right side of the graph to zoom in and out of the graph, fit the graph to screen, and expand the graph to full screen. To focus on a specific part of the graph, you can select a blank area of the graph and drag the graph to center on that area.   
![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/yosemite/execution-graph-w-input.png)

1. Choose one of the pipeline steps in the graph to see details about the step. In the preceding screenshot, a training step is chosen and displays the following tabs:
   + **Input** – The training inputs. If an input source is from Amazon Simple Storage Service (Amazon S3), choose the link to view the file in the Amazon S3 console.
   + **Output** – The training outputs, such as metrics, charts, files, and evaluation outcome. The graphs are produced using the [Tracker](https://sagemaker-experiments.readthedocs.io/en/latest/tracker.html#smexperiments.tracker.Tracker.log_precision_recall) APIs.
   + **Logs** – The Amazon CloudWatch logs produced by the step.
   + **Info** – The parameters and metadata associated with the step.  
![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/yosemite/execution-graph-info.png)

------

# Download a pipeline definition file


You can download the definition file for your SageMaker AI pipeline directly from the Amazon SageMaker Studio UI. You can use this pipeline definition file for:
+ Backup and restoration: Use the downloaded file to create a backup of your pipeline configuration, which you can restore in case of infrastructure failures or accidental changes.
+ Version control: Store the pipeline definition file in a source control system to track changes to the pipeline and revert to previous versions if needed.
+ Programmatic interactions: Use the pipeline definition file as input to the SageMaker SDK or AWS CLI.
+ Integration with automation processes: Integrate the pipeline definition into your CI/CD workflows or other automation processes.

To download the definition file of a pipeline, complete the following steps based on whether you use Studio or Studio Classic.

------
#### [ Studio ]

1. Open the SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane, select **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name. The **Executions** page opens and displays a list of pipeline executions.

1. Stay on the **Executions** page or choose the **Graph**, **Information**, or **Parameters** page to the left of the pipeline executions table. You can download the pipeline definition from any of these pages.

1. At the top right of the page, choose the vertical ellipsis and choose **Download pipeline definition (JSON)**.

------
#### [ Studio Classic ]

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the Studio Classic sidebar, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Pipelines** from the menu.

1. To narrow the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. Select a pipeline name.

1. Choose the **Settings** tab.

1. Choose **Download pipeline definition file**.

------

# Access experiment data from a pipeline
Access experiment data from a pipeline

**Note**  
SageMaker Experiments is a feature provided in Studio Classic only.

When you create a pipeline and specify [pipeline\$1experiment\$1config](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.pipeline_experiment_config), Pipelines creates the following SageMaker Experiments entities by default if they don't exist:
+ An experiment for the pipeline
+ A run group for every execution of the pipeline
+ A run for each SageMaker AI job created in a pipeline step

For information about how experiments are integrated with pipelines, see [Amazon SageMaker Experiments Integration](pipelines-experiments.md). For more information about SageMaker Experiments, see [Amazon SageMaker Experiments in Studio Classic](experiments.md).

You can get to the list of runs associated with a pipeline from either the pipeline executions list or the experiments list.

**To view the runs list from the pipeline executions list**

1. To view the pipeline executions list, follow the first five steps in the *Studio Classic* tab of [View the details of a pipeline](pipelines-studio-list.md).

1. On the top right of the screen, choose the **Filter** icon (![\[Funnel or filter icon representing data filtering or narrowing down options.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/jumpstart/jumpstart-filter-icon.png)).

1. Choose **Experiment**. If experiment integration wasn't deactivated when the pipeline was created, the experiment name is displayed in the executions list. 
**Note**  
Experiments integration was introduced in v2.41.0 of the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable). Pipelines created with an earlier version of the SDK aren't integrated with experiments by default.

1. Select the experiment of your choice to view run groups and runs related to that experiment.

**To view the runs list from the experiments list**

1. In the left sidebar of Studio Classic, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. Select **Experiments** from the menu.

1. Use search bar or **Filter** icon (![\[Funnel or filter icon representing data filtering or narrowing down options.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/jumpstart/jumpstart-filter-icon.png)) to filter the list to experiments created by a pipeline.

1. Open an experiment name and view a list of runs created by the pipeline.

# Track the lineage of a pipeline
Track the lineage of a pipeline

In this tutorial, you use Amazon SageMaker Studio to track the lineage of an Amazon SageMaker AI ML Pipeline.

The pipeline was created by the [Orchestrating Jobs with Amazon SageMaker Model Building Pipelines](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.html) notebook in the [Amazon SageMaker example GitHub repository](https://github.com/awslabs/amazon-sagemaker-examples). For detailed information on how the pipeline was created, see [Define a pipeline](define-pipeline.md).

Lineage tracking in Studio is centered around a directed acyclic graph (DAG). The DAG represents the steps in a pipeline. From the DAG you can track the lineage from any step to any other step. The following diagram displays the steps in the pipeline. These steps appear as a DAG in Studio.

![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/yosemite/pipeline-tutorial-steps.png)


To track the lineage of a pipeline in the Amazon SageMaker Studio console, complete the following steps based on whether you use Studio or Studio Classic.

------
#### [ Studio ]

**To track the lineage of a pipeline**

1. Open the SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).

1. In the left navigation pane, select **Pipelines**.

1. (Optional) To filter the list of pipelines by name, enter a full or partial pipeline name in the search field.

1. In the **Name** column, select a pipeline name to view details about the pipeline.

1. Choose the **Executions** tab.

1. In the **Name** column of the **Executions** table, select the name of a pipeline execution to view.

1. At the top right of the **Executions** page, choose the vertical ellipsis and choose **Download pipeline definition (JSON)**. You can view the file to see how the pipeline graph was defined. 

1. Choose **Edit** to open the Pipeline Designer.

1. Use the resizing and zoom controls at the top right corner of the canvas to zoom in and out of the graph, fit the graph to screen, or expand the graph to full screen.

1. To view your training, validation, and test datasets, complete the following steps:

   1. Choose the Processing step in your pipeline graph.

   1. In the right sidebar, choose the **Overview** tab.

   1. In the **Files** section, find the Amazon S3 paths to the training, validation, and test datasets.

1. To view your model artifacts, complete the following steps:

   1. Choose the Training step in your pipeline graph.

   1. In the right sidebar, choose the **Overview** tab.

   1. In the **Files** section, find the Amazon S3 paths to the model artifact.

1. To find the model package ARN, complete the following steps:

   1. Choose the Register model step.

   1. In the right sidebar, choose the **Overview** tab.

   1. In the **Files** section, find the ARN of the model package.

------
#### [ Studio Classic ]

**To track the lineage of a pipeline**

1. Sign in to Amazon SageMaker Studio Classic. For more information, see [Launch Amazon SageMaker Studio Classic](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-launch.html).

1. In the left sidebar of Studio, choose the **Home** icon ( ![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/house.png)).

1. In the menu, select **Pipelines**.

1. Use the **Search** box to filter the pipelines list.

1. Choose the `AbalonePipeline` pipeline to view the execution list and other details about the pipeline.

1. Choose the **Property Inspector** icon (![\[Black square icon representing a placeholder or empty image.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/studio/icons/gears.png)) in the right sidebar to open the **TABLE PROPERTIES** pane, where you can choose which properties to view.

1. Choose the **Settings** tab and then choose **Download pipeline definition file**. You can view the file to see how the pipeline graph was defined.

1. On the **Execution** tab, select the first row in the execution list to view its execution graph and other details about the execution. Note that the graph matches the diagram displayed at the beginning of the tutorial.

   Use the resizing icons on the lower-right side of the graph to zoom in and out of the graph, fit the graph to screen, or expand the graph to full screen. To focus on a specific part of the graph, you can select a blank area of the graph and drag the graph to center on that area. The inset on the lower-right side of the graph displays your location in the graph.  
![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/yosemite/pipeline-tutorial-execution-graph.png)

1. On the **Graph** tab, choose the `AbaloneProcess` step to view details about the step.

1. Find the Amazon S3 paths to the training, validation, and test datasets in the **Output** tab, under **Files**.
**Note**  
To get the full paths, right-click the path and then choose **Copy cell contents**.

   ```
   s3://sagemaker-eu-west-1-acct-id/sklearn-abalone-process-2020-12-05-17-28-28-509/output/train
   s3://sagemaker-eu-west-1-acct-id/sklearn-abalone-process-2020-12-05-17-28-28-509/output/validation
   s3://sagemaker-eu-west-1-acct-id/sklearn-abalone-process-2020-12-05-17-28-28-509/output/test
   ```

1. Choose the `AbaloneTrain` step.

1. Find the Amazon S3 path to the model artifact in the **Output** tab, under **Files**:

   ```
   s3://sagemaker-eu-west-1-acct-id/AbaloneTrain/pipelines-6locnsqz4bfu-AbaloneTrain-NtfEpI0Ahu/output/model.tar.gz
   ```

1. Choose the `AbaloneRegisterModel` step.

1. Find the ARN of the model package in the **Output** tab, under **Files**:

   ```
   arn:aws:sagemaker:eu-west-1:acct-id:model-package/abalonemodelpackagegroupname/2
   ```

------