

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# `@step` でデコレートした関数を使用してパイプラインを作成する
<a name="pipelines-step-decorator-create-pipeline"></a>

`@step` デコレータを使用して Python 関数をパイプラインステップに変換し、それらの関数間に依存関係を作成してパイプライングラフ (または有向非巡回グラフ (DAG)) を作成して、このグラフのリーフノードをステップのリストとしてパイプラインに渡すことで、パイプラインを作成できます。以下のセクションでは、この手順を例を使って詳しく説明します。

**Topics**
+ [関数をステップに変換する](#pipelines-step-decorator-run-pipeline-convert)
+ [ステップ間の依存関係を作成する](#pipelines-step-decorator-run-pipeline-link)
+ [`@step` でデコレートしたステップで `ConditionStep` でを使用する](#pipelines-step-decorator-condition)
+ [ステップの `DelayedReturn` 出力を使用してパイプラインを定義する](#pipelines-step-define-delayed)
+ [パイプラインを作成する](#pipelines-step-decorator-pipeline-create)

## 関数をステップに変換する
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

`@step` デコレータを使用してステップを作成するには、`@step` 関数を使用して注釈を付けます。次の例は、データを事前処理する `@step` でデコレートした関数を説明しています。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

`@step` でデコレートした関数を呼び出すと、SageMaker AI は関数を実行する代わりに `DelayedReturn` インスタンスを返します。`DelayedReturn` インスタンスは、その関数の実際の戻り値のプロキシです。`DelayedReturn` インスタンスは、別の関数に引数として渡すことも、ステップとしてパイプラインインスタンスに直接渡すこともできます。`DelayedReturn` クラスの詳細については、「[sagemaker.workflow.function\_step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn)」を参照してください。

## ステップ間の依存関係を作成する
<a name="pipelines-step-decorator-run-pipeline-link"></a>

2 つのステップ間に依存関係を作成する際は、パイプライングラフのステップ間に接続を作成します。以下のセクションでは、パイプラインステップ間の依存関係を作成する複数の方法を紹介します。

### 入力引数を使用したデータ依存関係
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

ある関数の `DelayedReturn` 出力を別の関数への入力として渡すと、パイプライン DAG にデータ依存関係が自動的に作成されます。次の例では、`preprocess` 関数の `DelayedReturn` 出力を `train` 関数に渡す `preprocess` と、`train` との間に依存関係が作成されます。

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

前の例では、`@step` でデコレートされたトレーニング関数を定義しています。この関数が呼び出されると、前処理パイプラインステップの `DelayedReturn` 出力を入力として受け取ります。トレーニング関数を呼び出すと、別の `DelayedReturn` インスタンスが返されます。このインスタンスは、パイプライン DAG を形成する、その関数で定義された以前のすべてのステップ (この例の `preprocess` ステップ) に関する情報を保持します。

前の例では、`preprocess` 関数は単一の値を返します。リストやタプルなどのより複雑な戻り値タイプについては、「[制限事項](pipelines-step-decorator-limit.md)」を参照してください。

### カスタム依存関係を定義する
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

前の例では、`train` 関数は `preprocess` の `DelayedReturn` 出力を受け取り、依存関係を作成しました。前のステップの出力を渡さずに依存関係を明示的に定義する場合は、`add_depends_on` 関数をステップで使用します。`get_step()` 関数を使用して、その `DelayedReturn` インスタンスから基盤となるステップを取得し、依存関係を入力として `add_depends_on`\_on を呼び出します。`get_step()` 関数の定義を表示するには、「[sagemaker.workflow.step\_outputs.get\_step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step)」を参照してください。次の例は、`get_step()` と `add_depends_on()` を使用して、`preprocess` と `train` の依存関係を作成する方法を説明しています。

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### `@step` でデコレートした関数から従来のパイプラインステップにデータを渡す
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

`@step` でデコレートしたステップと従来のパイプラインステップを含むパイプラインを作成し、両者間でデータを渡すパイプラインを作成できます。例えば、`ProcessingStep` を使用してデータを処理し、その結果を `@step` でデコレートしたトレーニング関数に渡すことができます。次の例では、`@step` でデコレートしたトレーニングステップは、処理ステップの出力を参照します。

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source={{input_data}}, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## `@step` でデコレートしたステップで `ConditionStep` でを使用する
<a name="pipelines-step-decorator-condition"></a>

Pipelines は、先行するステップの結果を評価してパイプラインで実行するアクションを決定する、`ConditionStep` クラスをサポートしています。`ConditionStep` は、`@step` でデコレートしたステップでも使用できます。`ConditionStep` で任意の `@step` でデコレートしたステップの出力を使用するには、そのステップの出力を `ConditionStep` の引数として入力します。次の例の条件ステップは、`@step` でデコレートしたモデル評価ステップの出力を受け取ります。

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## ステップの `DelayedReturn` 出力を使用してパイプラインを定義する
<a name="pipelines-step-define-delayed"></a>

パイプラインの定義方法は、`@step` デコレータを使用する場合も使用しない場合も同じです。`DelayedReturn` インスタンスをパイプラインに渡す場合、パイプラインを構築するためのステップの完全なリストを渡す必要はありません。SDK は、定義した依存関係に基づいて前のステップを自動的に推測します。パイプラインに渡した `Step` オブジェクトまたは `DelayedReturn` オブジェクトの前のすべてのステップすべてがパイプライングラフに含まれます。次の例では、パイプラインは `train` 関数の `DelayedReturn` オブジェクトを受け取ります。SageMaker AI は、`preprocess` ステップを `train` の前のステップとしてパイプライングラフに追加します。

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_train_result],
    sagemaker_session={{<sagemaker-session>}},
)
```

ステップ間にデータやカスタム依存関係がなく、複数のステップを並行して実行する場合、パイプライングラフには複数のリーフノードが含まれます。このようなリーフノードをすべてリストにして、パイプライン定義の `steps` 引数に渡します。

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="{{<pipeline-name>}}",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session={{sagemaker-session}},
)
```

パイプラインを実行すると、両方のステップが並行して実行されます。

リーフノードには、データまたはカスタム依存関係を通じて定義された以前のすべてのステップに関する情報が含まれているため、パイプラインに渡すのはグラフのリーフノードのみです。パイプラインをコンパイルすると、SageMaker AI はパイプライングラフを形成する後続のすべてのステップも推測し、それぞれを個別のステップとしてパイプラインに追加します。

## パイプラインを作成する
<a name="pipelines-step-decorator-pipeline-create"></a>

次のスニペットに示されるとおり、`pipeline.create()` を呼び出してパイプラインを作成します。`create()` の詳細については、「[sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create)」を参照してください。

```
role = "{{pipeline-role}}"
pipeline.create(role)
```

`pipeline.create()` を呼び出すと、SageMaker AI はパイプラインインスタンスの一部として定義されたすべてのステップをコンパイルします。SageMaker AI は、シリアル化された関数、引数、その他のすべてのステップ関連のアーティファクトを Amazon S3 にアップロードします。

データは、次の構造に従って S3 バケットに存在します。

```
s3_root_uri/
    {{pipeline_name}}/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        {{step_name}}/
            {{timestamp}}/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        {{execution_id}}/
            {{step_name}}/
                results     # returned output from the serialized function including the model
```

`s3_root_uri` は、SageMaker AI 設定ファイルで定義され、パイプライン全体にわたって適用されます。定義されていない場合は、デフォルトの SageMaker AI バケットが使用されます。

**注記**  
SageMaker AI がパイプラインをコンパイルする都度、SageMaker AI はステップのシリアル化された関数、引数、依存関係をその時点の時刻でタイムスタンプされたフォルダに保存します。これは、`pipeline.create()`、`pipeline.update()`、`pipeline.upsert()` または `pipeline.definition()` を実行する都度、発生します。