

# Lift-and-shift Python code with the @step decorator
<a name="pipelines-step-decorator"></a>

The `@step` decorator is a feature that converts your local machine learning (ML) code into one or more pipeline steps. You can write your ML function as you would for any ML project. Once tested locally or as a training job using the `@remote` decorator, you can convert the function to a SageMaker AI pipeline step by adding a `@step` decorator. You can then pass the output of the `@step`-decorated function call as a step to Pipelines to create and run a pipeline. You can chain a series of functions with the `@step` decorator to create a multi-step directed acyclic graph (DAG) pipeline as well.

The setup to use the `@step` decorator is the same as the setup to use the `@remote` decorator. You can refer to the remote function documentation for details about how to [setup the environment](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html#train-remote-decorator-env) and [use a configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html) to set defaults. For more information about the `@step` decorator, see [sagemaker.workflow.function\$1step.step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.step).

To view to sample notebooks that demonstrate the use of `@step` decorator, see [@step decorator sample notebooks](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-pipelines/step-decorator).

The following sections explain how you can annotate your local ML code with a `@step` decorator to create a step, create and run a pipeline using the step, and customize the experience for your use case.

**Topics**
+ [Create a pipeline with `@step`-decorated functions](pipelines-step-decorator-create-pipeline.md)
+ [Run a pipeline](pipelines-step-decorator-run-pipeline.md)
+ [Configure your pipeline](pipelines-step-decorator-cfg-pipeline.md)
+ [Best Practices](pipelines-step-decorator-best.md)
+ [Limitations](pipelines-step-decorator-limit.md)

# Create a pipeline with `@step`-decorated functions
<a name="pipelines-step-decorator-create-pipeline"></a>

You can create a pipeline by converting Python functions into pipeline steps using the `@step` decorator, creating dependencies between those functions to create a pipeline graph (or directed acyclic graph (DAG)), and passing the leaf nodes of that graph as a list of steps to the pipeline. The following sections explain this procedure in detail with examples.

**Topics**
+ [Convert a function to a step](#pipelines-step-decorator-run-pipeline-convert)
+ [Create dependencies between the steps](#pipelines-step-decorator-run-pipeline-link)
+ [Use `ConditionStep` with `@step`-decorated steps](#pipelines-step-decorator-condition)
+ [Define a pipeline using the `DelayedReturn` output of steps](#pipelines-step-define-delayed)
+ [Create a pipeline](#pipelines-step-decorator-pipeline-create)

## Convert a function to a step
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

To create a step using the `@step` decorator, annotate the function with `@step`. The following example shows a `@step`-decorated function that preprocesses the data.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

When you invoke a `@step`-decorated function, SageMaker AI returns a `DelayedReturn` instance instead of running the function. A `DelayedReturn` instance is a proxy for the actual return of that function. The `DelayedReturn` instance can be passed to another function as an argument or directly to a pipeline instance as a step. For information about the `DelayedReturn` class, see [sagemaker.workflow.function\$1step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn).

## Create dependencies between the steps
<a name="pipelines-step-decorator-run-pipeline-link"></a>

When you create a dependency between two steps, you create a connection between the steps in your pipeline graph. The following sections introduce multiple ways you can create a dependency between your pipeline steps.

### Data dependencies through input arguments
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

Passing in the `DelayedReturn` output of one function as an input to another function automatically creates a data dependency in the pipeline DAG. In the following example, passing in the `DelayedReturn` output of the `preprocess` function to the `train` function creates a dependency between `preprocess` and `train`.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

The previous example defines a training function which is decorated with `@step`. When this function is invoked, it receives the `DelayedReturn` output of the preprocessing pipeline step as input. Invoking the training function returns another `DelayedReturn` instance. This instance holds the information about all the previous steps defined in that function (i.e, the `preprocess` step in this example) which form the pipeline DAG.

In the previous example, the `preprocess` function returns a single value. For more complex return types like lists or tuples, refer to [Limitations](pipelines-step-decorator-limit.md).

### Define custom dependencies
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

In the previous example, the `train` function received the `DelayedReturn` output of `preprocess` and created a dependency. If you want to define the dependency explicitly without passing the previous step output, use the `add_depends_on` function with the step. You can use the `get_step()` function to retrieve the underlying step from its `DelayedReturn` instance, and then call `add_depends_on`\$1on with the dependency as input. To view the `get_step()` function definition, see [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step). The following example shows you how to create a dependency between `preprocess` and `train` using `get_step()` and `add_depends_on()`.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### Pass data to and from a `@step`-decorated function to a traditional pipeline step
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

You can create a pipeline that includes a `@step`-decorated step and a traditional pipeline step and passes data between them. For example, you can use `ProcessingStep` to process the data and pass its result to the `@step`-decorated training function. In the following example, a `@step`-decorated training step references the output of a processing step.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## Use `ConditionStep` with `@step`-decorated steps
<a name="pipelines-step-decorator-condition"></a>

Pipelines supports a `ConditionStep` class which evaluates the results of preceding steps to decide what action to take in the pipeline. You can use `ConditionStep` with a `@step`-decorated step as well. To use the output of any `@step`-decorated step with `ConditionStep`, enter the output of that step as an argument to `ConditionStep`. In the following example, the condition step receives the output of the `@step`-decorated model evaluation step.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## Define a pipeline using the `DelayedReturn` output of steps
<a name="pipelines-step-define-delayed"></a>

You define a pipeline the same way whether or not you use a `@step` decorator. When you pass a `DelayedReturn` instance to your pipeline, you don't need to pass a full list of steps to build the pipeline. The SDK automatically infers the previous steps based on the dependencies you define. All the previous steps of the `Step` objects you passed to the pipeline or `DelayedReturn` objects are included in the pipeline graph. In the following example, the pipeline receives the `DelayedReturn` object for the `train` function. SageMaker AI adds the `preprocess` step, as a previous step of `train`, to the pipeline graph.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

If there are no data or custom dependencies between the steps and you run multiple steps in parallel, the pipeline graph has more than one leaf node. Pass all of these leaf nodes in a list to the `steps` argument in your pipeline definition, as shown in the following example:

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

When the pipeline runs, both steps run in parallel.

You only pass the leaf nodes of the graph to the pipeline because the leaf nodes contain information about all the previous steps defined through data or custom dependencies. When it compiles the pipeline, SageMaker AI also infers of all of the subsequent steps that form the pipeline graph and adds each of them as a separate step to the pipeline.

## Create a pipeline
<a name="pipelines-step-decorator-pipeline-create"></a>

Create a pipeline by calling `pipeline.create()`, as shown in the following snippet. For details about `create()`, see [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create).

```
role = "pipeline-role"
pipeline.create(role)
```

When you call `pipeline.create()`, SageMaker AI compiles all of the steps defined as part of the pipeline instance. SageMaker AI uploads the serialized function, arguments, and all the other step-related artifacts to Amazon S3.

Data resides in the S3 bucket according to the following structure:

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri` is defined in the SageMaker AI config file and applies to the entire pipeline. If undefined, the default SageMaker AI bucket is used.

**Note**  
Every time SageMaker AI compiles a pipeline, SageMaker AI saves the the steps' serialized functions, arguments and dependencies in a folder timestamped with the current time. This occurs every time you run `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` or `pipeline.definition()`.

# Run a pipeline
<a name="pipelines-step-decorator-run-pipeline"></a>

The following page describes how to run a pipeline with Amazon SageMaker Pipelines, either with SageMaker AI resources or locally.

Start a new pipeline run with the `pipeline.start()` function as you would for a traditional SageMaker AI pipeline run. For information about the `start()` function, see [sagemaker.workflow.pipeline.Pipeline.start](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.start).

**Note**  
A step defined using the `@step` decorator runs as a training job. Therefore, be aware of the following limits:  
Instance limits and training job limits in your accounts. Update your limits accordingly to avoid any throttling or resource limit issues.
The monetary costs associated with every run of a training step in the pipeline. For more details, refer to [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/).

## Retrieve results from a pipeline run locally
<a name="pipelines-step-decorator-run-pipeline-retrieve"></a>

To view the result of any step of a pipeline run, use [execution.result()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline._PipelineExecution.result           ), as shown in the following snippet:

```
execution = pipeline.start()
execution.result(step_name="train")
```

**Note**  
Pipelines does not support `execution.result()` in local mode.

You can only retrieve results for one step at a time. If the step name was generated by SageMaker AI, you can retrieve the step name by calling `list_steps` as follows:

```
execution.list_step()
```

## Run a pipeline locally
<a name="pipelines-step-decorator-run-pipeline-local"></a>

You can run a pipeline with `@step`-decorated steps locally as you would for traditional pipeline steps. For details about local mode pipeline runs, see [Run pipelines using local mode](pipelines-local-mode.md). To use local mode, provide a `LocalPipelineSession` instead of a `SageMakerSession` to your pipeline definition, as shown in the following example:

```
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model
    
step_train_result = train()

local_pipeline_session = LocalPipelineSession()

local_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=local_pipeline_session # needed for local mode
)

local_pipeline.create(role_arn="role_arn")

# pipeline runs locally
execution = local_pipeline.start()
```

# Configure your pipeline
<a name="pipelines-step-decorator-cfg-pipeline"></a>

You are advised to use the SageMaker AI config file to set the defaults for the pipeline. For information about the SageMaker AI configuration file, see [Configuring and using defaults with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk). Any configuration added to the config file applies to all steps in the pipeline. If you want to override options for any of the steps, provide new values in the `@step` decorator arguments. The following topic describes how to set up a config file.

The `@step` decorator's configuration in the config file is identical to the `@remote` decorator's configuration. To set up the pipeline role ARN and pipeline tags in the config file, use the `Pipeline` section shown in the following snippet:

```
SchemaVersion: '1.0'
SageMaker:
  Pipeline:
    RoleArn: 'arn:aws:iam::555555555555:role/IMRole'
    Tags:
    - Key: 'tag_key'
      Value: 'tag_value'
```

For most of the defaults you can set in the configuration file you can also override by passing new values to the `@step` decorator. For example, you can override the instance type set in the config file for your preprocessing step, as shown in the following example:

```
@step(instance_type="ml.m5.large")
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
```

A few arguments are not part of the `@step` decorator parameters list—these can be configured for the entire pipeline only through the SageMaker AI configuration file. They are listed as follows:
+ `sagemaker_session` (`sagemaker.session.Session`): The underlying SageMaker AI session to which SageMaker AI delegates service calls. If unspecified, a session is created using a default configuration as follows:

  ```
  SageMaker:
    PythonSDK:
      Modules:
        Session:
          DefaultS3Bucket: 'default_s3_bucket'
          DefaultS3ObjectKeyPrefix: 'key_prefix'
  ```
+ `custom_file_filter` (`CustomFileFilter)`: A `CustomFileFilter` object that specifies the local directories and files to include in the pipeline step. If unspecified, this value defaults to `None`. For `custom_file_filter` to take effect, you must set `IncludeLocalWorkdir` to `True`. The following example shows a configuration that ignores all notebook files, and files and directories named `data`.

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          IncludeLocalWorkDir: true
          CustomFileFilter: 
            IgnoreNamePatterns: # files or directories to ignore
            - "*.ipynb" # all notebook files
            - "data" # folder or file named "data"
  ```

  For more details about how to use `IncludeLocalWorkdir` with `CustomFileFilter`, see [Using modular code with the @remote decorator](train-remote-decorator-modular.md).
+ `s3_root_uri (str)`: The root Amazon S3 folder to which SageMaker AI uploads the code archives and data. If unspecified, the default SageMaker AI bucket is used.
+ `s3_kms_key (str)`: The key used to encrypt the input and output data. You can only configure this argument in the SageMaker AI config file and the argument applies to all steps defined in the pipeline. If unspecified, the value defaults to `None`. See the following snippet for an example S3 KMS key configuration:

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          S3KmsKeyId: 's3kmskeyid'
          S3RootUri: 's3://amzn-s3-demo-bucket/my-project
  ```

# Best Practices
<a name="pipelines-step-decorator-best"></a>

The following sections suggest best practices to follow when you use the `@step` decorator for your pipeline steps.

## Use warm pools
<a name="pipelines-step-decorator-best-warmpool"></a>

For faster pipeline step runs, use the warm pooling functionality provided for training jobs. You can turn on the warm pool functionality by providing the `keep_alive_period_in_seconds` argument to the `@step` decorator as demonstrated in the following snippet:

```
@step(
   keep_alive_period_in_seconds=900
)
```

For more information about warm pools, see [SageMaker AI Managed Warm Pools](train-warm-pools.md). 

## Structure your directory
<a name="pipelines-step-decorator-best-dir"></a>

You are advised to use code modules while using the `@step` decorator. Put the `pipeline.py` module, in which you invoke the step functions and define the pipeline, at the root of the workspace. The recommended structure is shown as follows:

```
.
├── config.yaml # the configuration file that define the infra settings
├── requirements.txt # dependencies
├── pipeline.py  # invoke @step-decorated functions and define the pipeline here
├── steps/
| ├── processing.py
| ├── train.py
├── data/
├── test/
```

# Limitations
<a name="pipelines-step-decorator-limit"></a>

The following sections outline the limitations that you should be aware of when you use the `@step` decorator for your pipeline steps.

## Function argument limitations
<a name="pipelines-step-decorator-arg"></a>

When you pass an input argument to the `@step`-decorated function, the following limitations apply:
+ You can pass the `DelayedReturn`, `Properties` (of steps of other types), `Parameter`, and `ExecutionVariable` objects to `@step`-decorated functions as arguments. But `@step`-decorated functions do not support `JsonGet` and `Join` objects as arguments.
+ You cannot directly access a pipeline variable from a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func():
      print(param)
  
  func() # this raises a SerializationError
  ```
+ You cannot nest a pipeline variable in another object and pass it to a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func(arg):
      print(arg)
  
  func(arg=(param,)) # this raises a SerializationError because param is nested in a tuple
  ```
+ Since inputs and outputs of a function are serialized, there are restrictions on the type of data that can be passed as input or output from a function. See the *Data serialization and deserialization* section of [Invoke a remote function](train-remote-decorator-invocation.md) for more details. The same restrictions apply to `@step`-decorated functions.
+ Any object that has a boto client cannot be serialized, hence you cannot pass such objects as input to or output from a `@step`-decorated function. For example, SageMaker Python SDK client classes such as `Estimator`, `Predictor`, and `Processor` can't be serialized.

## Function imports
<a name="pipelines-step-decorator-best-import"></a>

You should import the libraries required by the step inside rather than outside the function. If you import them at global scope, you risk an import collision while serializing the function. For example, `sklearn.pipeline.Pipeline` could be overridden by `sagemaker.workflow.pipeline.Pipeline`.

## Referencing child members of function return value
<a name="pipelines-step-decorator-best-child"></a>

If you reference child members of a `@step`-decorated function's return value, the following limitations apply:
+ You can reference the child members with `[]` if the `DelayedReturn` object represents a tuple, list or dict, as shown in the following example:

  ```
  delayed_return[0]
  delayed_return["a_key"]
  delayed_return[1]["a_key"]
  ```
+ You cannot unpack a tuple or list output because the exact length of the underlying tuple or list can't be known when you invoke the function. The following example produces an error:

  ```
  a, b, c = func() # this raises ValueError
  ```
+ You cannot iterate over a `DelayedReturn` object. The following example raises an error:

  ```
  for item in func(): # this raises a NotImplementedError
  ```
+ You cannot reference arbitrary child members with '`.`'. The following example produces an error:

  ```
  delayed_return.a_child # raises AttributeError
  ```

## Existing pipeline features that are not supported
<a name="pipelines-step-decorator-best-unsupported"></a>

You cannot use the `@step` decorator with the following pipeline features:
+ [Pipeline step caching](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)
+ [Property files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html#build-and-manage-propertyfile-property)