翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
@step でデコレートした関数を使用してパイプラインを作成する
@step デコレータを使用して Python 関数をパイプラインステップに変換し、それらの関数間に依存関係を作成してパイプライングラフ (または有向非巡回グラフ (DAG)) を作成して、このグラフのリーフノードをステップのリストとしてパイプラインに渡すことで、パイプラインを作成できます。以下のセクションでは、この手順を例を使って詳しく説明します。
トピック
関数をステップに変換する
@step デコレータを使用してステップを作成するには、@step 関数を使用して注釈を付けます。次の例は、データを事前処理する @step でデコレートした関数を説明しています。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
@stepデコレートされた関数を呼び出すと、SageMaker AI は関数を実行する代わりにDelayedReturnインスタンスを返します。DelayedReturn インスタンスは、その関数の実際の戻り値のプロキシです。DelayedReturn インスタンスは、別の関数に引数として渡すことも、ステップとしてパイプラインインスタンスに直接渡すこともできます。DelayedReturn クラスの詳細については、「sagemaker.workflow.function_step.DelayedReturn
ステップ間の依存関係を作成する
2 つのステップ間に依存関係を作成する際は、パイプライングラフのステップ間に接続を作成します。以下のセクションでは、パイプラインステップ間の依存関係を作成する複数の方法を紹介します。
入力引数を使用したデータ依存関係
ある関数の DelayedReturn 出力を別の関数への入力として渡すと、パイプライン DAG にデータ依存関係が自動的に作成されます。次の例では、preprocess 関数の DelayedReturn 出力を train 関数に渡す preprocess と、train との間に依存関係が作成されます。
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
前の例では、@step でデコレートされたトレーニング関数を定義しています。この関数が呼び出されると、前処理パイプラインステップの DelayedReturn 出力を入力として受け取ります。トレーニング関数を呼び出すと、別の DelayedReturn インスタンスが返されます。このインスタンスは、パイプライン DAG を形成する、その関数で定義された以前のすべてのステップ (この例の preprocess ステップ) に関する情報を保持します。
前の例では、preprocess 関数は単一の値を返します。リストやタプルなどのより複雑な戻り値タイプについては、「制限」を参照してください。
カスタム依存関係を定義する
前の例では、train 関数は preprocess の DelayedReturn 出力を受け取り、依存関係を作成しました。前のステップの出力を渡さずに依存関係を明示的に定義する場合は、add_depends_on 関数をステップで使用します。get_step() 関数を使用して、その DelayedReturn インスタンスから基盤となるステップを取得し、依存関係を入力として add_depends_on_on を呼び出します。get_step() 関数の定義を表示するには、「sagemaker.workflow.step_outputs.get_stepget_step() と add_depends_on() を使用して、preprocess と train の依存関係を作成する方法を説明しています。
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
@step でデコレートした関数から従来のパイプラインステップにデータを渡す
@step でデコレートしたステップと従来のパイプラインステップを含むパイプラインを作成し、両者間でデータを渡すパイプラインを作成できます。例えば、ProcessingStep を使用してデータを処理し、その結果を @step でデコレートしたトレーニング関数に渡すことができます。次の例では、@step でデコレートしたトレーニングステップは、処理ステップの出力を参照します。
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a@step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
@step でデコレートしたステップで ConditionStep でを使用する
Pipelines は、先行するステップの結果を評価してパイプラインで実行するアクションを決定する、ConditionStep クラスをサポートしています。ConditionStep は、@step でデコレートしたステップでも使用できます。ConditionStep で任意の @step でデコレートしたステップの出力を使用するには、そのステップの出力を ConditionStep の引数として入力します。次の例の条件ステップは、@step でデコレートしたモデル評価ステップの出力を受け取ります。
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
ステップの DelayedReturn 出力を使用してパイプラインを定義する
パイプラインの定義方法は、@step デコレータを使用する場合も使用しない場合も同じです。DelayedReturn インスタンスをパイプラインに渡す場合、パイプラインを構築するためのステップの完全なリストを渡す必要はありません。SDK は、定義した依存関係に基づいて前のステップを自動的に推測します。パイプラインに渡した Step オブジェクトまたは DelayedReturn オブジェクトの前のすべてのステップすべてがパイプライングラフに含まれます。次の例では、パイプラインは train 関数の DelayedReturn オブジェクトを受け取ります。SageMaker AI は、preprocessステップを の前のステップとしてパイプライングラフに追加trainします。
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )
ステップ間にデータやカスタム依存関係がなく、複数のステップを並行して実行する場合、パイプライングラフには複数のリーフノードが含まれます。このようなリーフノードをすべてリストにして、パイプライン定義の steps 引数に渡します。
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )
パイプラインを実行すると、両方のステップが並行して実行されます。
リーフノードには、データまたはカスタム依存関係を通じて定義された以前のすべてのステップに関する情報が含まれているため、パイプラインに渡すのはグラフのリーフノードのみです。パイプラインをコンパイルすると、SageMaker AI はパイプライングラフを形成する後続のすべてのステップも推測し、それぞれを個別のステップとしてパイプラインに追加します。
パイプラインを作成する
次のスニペットに示されるとおり、pipeline.create() を呼び出してパイプラインを作成します。create() の詳細については、「sagemaker.workflow.pipeline.Pipeline.create
role = "pipeline-role" pipeline.create(role)
を呼び出すとpipeline.create()、SageMaker AI はパイプラインインスタンスの一部として定義されたすべてのステップをコンパイルします。SageMaker AI は、シリアル化された関数、引数、およびその他のすべてのステップ関連のアーティファクトを Amazon S3 にアップロードします。
データは、次の構造に従って S3 バケットに存在します。
s3_root_uri/pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name/timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id/step_name/ results # returned output from the serialized function including the model
s3_root_uri は SageMaker AI 設定ファイルで定義され、パイプライン全体に適用されます。定義されていない場合は、デフォルトの SageMaker AI バケットが使用されます。
注記
SageMaker AI がパイプラインをコンパイルするたびに、SageMaker AI はステップのシリアル化された関数、引数、依存関係を現在の時刻のタイムスタンプが付けられたフォルダに保存します。これは、pipeline.create()、pipeline.update()、pipeline.upsert() または pipeline.definition() を実行する都度、発生します。