

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# リモート関数を呼び出す
<a name="train-remote-decorator-invocation"></a>

@remote デコレータ内の関数を呼び出すには、次のいずれかの方法を使用します。
+ [@remote デコレータを使用して関数を呼び出す](#train-remote-decorator-invocation-decorator).
+ [`RemoteExecutor` API を使用して関数を呼び出す](#train-remote-decorator-invocation-api).

@remote デコレータメソッドを使用して関数を呼び出す場合、トレーニングジョブでは、関数が完了するのを待ってから新しいタスクを開始します。ただし、`RemoteExecutor` API を使用すると、複数のジョブを並列で実行できます。以下のセクションでは、関数を呼び出す両方の方法を示します。

## @remote デコレータを使用して関数を呼び出す
<a name="train-remote-decorator-invocation-decorator"></a>

@remote デコレータを使用して関数に注釈を付けることができます。SageMaker AI はデコレータ内部のコードを SageMaker トレーニングジョブに変換します。その後、トレーニングジョブでは、デコレータ内部の関数を呼び出し、ジョブが完了するのを待ちます。次のコードサンプルは、必要なライブラリをインポートし、SageMaker AI インスタンスを起動して、@remote デコレータで行列乗算に注釈を付ける方法を示しています。

```
from sagemaker.remote_function import remote
import numpy as np

@remote(instance_type="{{ml.m5.large}}")
def matrix_multiply(a, b):
    return np.matmul(a, b)
    
a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

assert (matrix_multiply(a, b) == np.array([1,2])).all()
```

デコレータは次のように定義されます。

```
def remote(
    *,
    **kwarg):
        ...
```

修飾された関数を呼び出すと、SageMaker Python SDK はエラーによって発生した例外をローカルメモリに読み込みます。次のコードサンプルでは、最初の divide 関数の呼び出しが正常に完了し、結果がローカルメモリに読み込まれます。2 回目の divide 関数の呼び出しで、コードからエラーが返され、このエラーがローカルメモリに読み込まれます。

```
from sagemaker.remote_function import remote
import pytest

@remote()
def divide(a, b):
    return a/b

# the underlying job is completed successfully 
# and the function return is loaded
assert divide(10, 5) == 2

# the underlying job fails with "AlgorithmError" 
# and the function exception is loaded into local memory 
with pytest.raises(ZeroDivisionError):
    divide(10, 0)
```

**注記**  
修飾された関数はリモートジョブとして実行されます。スレッドが中断されても、基本となるジョブは停止しません。

### ローカル変数の値を変更する方法
<a name="train-remote-decorator-invocation-decorator-value"></a>

デコレータ関数はリモートマシン上で実行されます。修飾された関数内部の非ローカル変数や入力引数を変更しても、ローカル値は変更されません。

次のコードサンプルでは、デコレータ関数内部にリスト (list) と 辞書 (dict) が追加されています。これはデコレータ関数が呼び出されても変更されません。

```
a = []

@remote
def func():
    a.append(1)

# when func is invoked, a in the local memory is not modified        
func() 
func()

# a stays as []
    
a = {}
@remote
def func(a):
    # append new values to the input dictionary
    a["key-2"] = "value-2"
    
a = {"key": "value"}
func(a)

# a stays as {"key": "value"}
```

デコレータ関数内で宣言されたローカル変数の値を変更するには、関数から変数を返します。次のコードサンプルでは、ローカル変数の値が関数から返されるときに変更されることを示しています。

```
a = {"key-1": "value-1"}

@remote
def func(a):
    a["key-2"] = "value-2"
    return a

a = func(a)

-> {"key-1": "value-1", "key-2": "value-2"}
```

### データのシリアル化と逆シリアル化
<a name="train-remote-decorator-invocation-input-output"></a>

リモート関数を呼び出すと、SageMaker AI は入力ステージと出力ステージ中に関数の引数を自動的にシリアル化します。関数の引数と戻り値は [cloudpickle](https://github.com/cloudpipe/cloudpickle) を使用してシリアル化されます。SageMaker AI は、以下の Python オブジェクトと関数のシリアル化をサポートしています。
+ 辞書、リスト、浮動小数点数、整数、文字列、ブール値、タプルを含む組み込みの Python オブジェクト
+ Numpy 配列
+ Pandas DataFrame
+ Scikit-learn のデータセットと推定器
+ PyTorch モデル
+ TensorFlow モデル
+ XGBoost 用のブースタークラス

以下を使用する場合は、いくつか制限があります。
+ Dask DataFrames
+ XGBoost Dmatrix クラス
+ TensorFlow データセットとサブクラス
+ PyTorch モデル

以下のセクションには、リモート関数での制限がある以前の Python クラスを使用する際のベストプラクティス、シリアル化されたデータを SageMaker AI が保存する場所に関する情報、およびそのデータへのアクセスを管理する方法が含まれています。

#### リモートデータシリアル化のサポートが制限されている Python クラスのベストプラクティス
<a name="train-remote-decorator-invocation-input-output-bestprac"></a>

このセクションに記載されている Python クラスは制限付きで使用できます。次のセクションでは、次の Python クラスの使用方法に関するベストプラクティスについて説明します。
+ [Dask](https://www.dask.org/) DataFrames
+ XGBoost DMatric クラス
+ TensorFlow データセットとサブクラス
+ PyTorch モデル

##### Dask のベストプラクティス
<a name="train-remote-decorator-invocation-input-output-bestprac-dask"></a>

[Dask](https://www.dask.org/) は、Python のパラレルコンピューティングに使用されるオープンソースのライブラリです。このセクションでは、次の内容を説明します。
+ Dask DataFrame をリモート関数に渡す方法
+ サマリー統計を Dask DataFrame から Pandas DataFrame に変換する方法

##### Dask DataFrame をリモート関数に渡す方法
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pass"></a>

[Dask DataFrames](https://docs.dask.org/en/latest/dataframe.html) は、使用可能な量よりも多くのメモリを必要とするデータセットを格納できるため、大規模なデータセットの処理によく使用されます。これは、Dask DataFrame はローカルデータをメモリにロードしないためです。Dask DataFrame を関数引数としてリモート関数に渡すと、Dask はローカルディスクまたはクラウドストレージ内のデータにデータそのものではなく、参照を渡すことがあります。次のコードは、空の DataFrame を操作するリモート関数内に Dask DataFrame を渡す例を示しています。

```
#Do not pass a Dask DataFrame  to your remote function as follows
def clean(df: dask.DataFrame ):
    cleaned = df[] \ ...
```

Dask は、DataFrame を使用する場合にのみ Dask DataFrame からメモリにデータをロードします。リモート関数内で Dask DataFrame を使用する場合は、データへのパスを指定します。そうすると、Dask はコードの実行時に指定したデータパスから直接データセットを読み取ります。

次のコード例は、リモート関数 `clean` 内で Dask DataFrame を使用する方法を示しています。コード例では、`raw_data_path` は Dask DataFrame の代わりに clean に渡されます。コードを実行すると、`raw_data_path` で指定された Amazon S3 バケットの場所からデータセットが直接読み取られます。次に、`persist` 関数は、データセットをメモリに保持して後続の `random_split` 関数の実行を円滑化し、Dask DataFrame API 関数を使用して S3 バケットの出力データパスに書き戻します。

```
import dask.dataframe as dd

@remote(
   instance_type='{{ml.m5.24xlarge}}',
   volume_size={{300}}, 
   keep_alive_period_in_seconds={{600}})
#pass the data path to your remote function rather than the Dask DataFrame  itself
def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]):
    df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame 
    cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\
        .drop(['column_b', 'column_c'], axis=1)\
        .persist() #keep the data in memory to facilitate the following random_split operation

    train_df, test_df = cleaned.random_split(split_ratio, random_state=10)

    train_df.to_parquet(os.path.join(output_data_path, 'train')
    test_df.to_parquet(os.path.join(output_data_path, 'test'))
    
clean("{{s3://amzn-s3-demo-bucket/raw/}}", "{{s3://amzn-s3-demo-bucket/cleaned/}}", split_ratio={{[0.7, 0.3]}})
```

##### サマリー統計を Dask DataFrame から Pandas DataFrame に変換する方法
<a name="train-remote-decorator-invocation-input-output-bestprac-dask-pd"></a>

Dask DataFrame からのサマリー統計は、以下のコード例のように `compute` メソッドを呼び出すことで Pandas DataFrame に変換できます。この例では、S3 バケットには、メモリまたは Pandas DataFrame に適合しない大きな Dask データフレームが含まれています。次の例では、リモート関数がデータセットをスキャンし、出力統計を含む Dask DataFrame を `describe` から Pandas DataFrame に返します。

```
executor = RemoteExecutor(
    instance_type='{{ml.m5.24xlarge}}',
    volume_size={{300}}, 
    keep_alive_period_in_seconds={{600}})

future = executor.submit(lambda: dd.read_parquet("{{s3://amzn-s3-demo-bucket/raw/}}").describe().compute())

future.result()
```

##### XGBoost DMatric クラスのベストプラクティス
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost"></a>

DMatrix は、XGBoost がデータをロードするために使用する内部データ構造です。DMatrix オブジェクトは、コンピューティングセッション間で簡単に移動するためにピクル化することはできません。DMatrix インスタンスを直接渡すと、`SerializationError` のために失敗します。

##### データオブジェクトをリモート関数に渡して XGBoost でトレーニングする方法
<a name="train-remote-decorator-invocation-input-output-bestprac-xgboost-pass"></a>

Pandas DataFrame を DMatrix インスタンスに変換してリモート関数のトレーニングに使用するには、次のコードサンプルに示すように、リモート関数に直接渡します。

```
import xgboost as xgb

@remote
def train(df, params):
    #Convert a pandas dataframe into a DMatrix DataFrame and use it for training
    dtrain = DMatrix(df) 
    return xgb.train(dtrain, params)
```

##### TensorFlow データセットとサブクラスのベストプラクティス
<a name="train-remote-decorator-invocation-input-output-bestprac-tf"></a>

TensorFlow データセットとサブクラスは、TensorFlow がトレーニング中にデータをロードするために使用する内部オブジェクトです。TensorFlow のデータセットとサブクラスは、コンピューティングセッション間を簡単に移動するためにピクル化することはできません。Tensorflow のデータセットまたはサブクラスを直接渡すと、`SerializationError` のために失敗します。次のコードサンプルに示すように、Tensorflow I/O API を使用してストレージからデータをロードします。

```
import tensorflow as tf
import tensorflow_io as tfio

@remote
def train(data_path: str, params):
    
    dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt"))
    ...
    
train("{{s3://amzn-s3-demo-bucket/data}}", {})
```

##### PyTorch モデルのベストプラクティス
<a name="train-remote-decorator-invocation-input-output-bestprac-pytorch"></a>

PyTorch モデルはシリアル化可能で、ローカル環境とリモート関数の間で渡すことができます。ローカル環境とリモート環境のデバイスタイプ (GPU と CPU) が異なる場合、トレーニング済みのモデルをローカル環境に返すことはできません。たとえば、次のコードが GPU のないローカル環境で開発され、GPU を備えたインスタンスで実行された場合、トレーニング済みのモデルを直接返すと、`DeserializationError` という結果になります。

```
# Do not return a model trained on GPUs to a CPU-only environment as follows

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu") # a device without GPU capabilities
    
    model = Net().to(device)
    
    # train the model
    ...
    
    return model
    
model = train(...) #returns a DeserializationError if run on a device with GPU
```

GPU 環境でトレーニングされたモデルを CPU 機能のみを含むモデルに返すには、次のコード例に示すように PyTorch モデル I/O API を直接使用します。

```
import s3fs

model_path = "{{s3://amzn-s3-demo-bucket/folder/}}"

@remote(instance_type='{{ml.g4dn.xlarge}}')
def train(...):
    if torch.cuda.is_available():
        device = torch.device("cuda")
    else:
        device = torch.device("cpu")
    
    model = Net().to(device)
    
    # train the model
    ...
    
    fs = s3fs.FileSystem()
    with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file:
        torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU)
    
train(...) #use the model to train on either CPUs or GPUs

model = Net()
fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file:
    model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
```

#### シリアル化されたデータを SageMaker AI が保存する場所
<a name="train-remote-decorator-invocation-input-output-storage"></a>

リモート関数を呼び出すと、SageMaker AI は入力ステージと出力ステージ中に関数の引数をシリアル化し、値を返します。このシリアル化されたデータは、S3 バケットのルートディレクトリに保存されます。設定ファイルでルートディレクトリ `<s3_root_uri>` を指定します。パラメータ `job_name` は自動的に生成されます。

ルートディレクトリの下に、SageMaker AI は現在の作業ディレクトリ、シリアル化された関数、シリアル化された関数の引数、結果、およびシリアル化された関数の呼び出しによって発生した例外を格納する `<job_name>` フォルダを作成します。

`<job_name>` 下のディレクトリ `workdir` には、現在の作業ディレクトリの zip アーカイブのアーカイブが含まれています。zip アーカイブには、作業ディレクトリ内のすべての Python ファイルと、リモート関数を実行するのに必要な依存関係を指定する `requirements.txt` ファイルが含まれます。

以下は、設定ファイルで指定する S3 バケットのフォルダ構造の例です。

```
{{<s3_root_uri>}}/ # specified by s3_root_uri or S3RootUri
    <job_name>/ #automatically generated for you
        workdir/workspace.zip # archive of the current working directory (workdir)
        function/ # serialized function
        arguments/ # serialized function arguments
        results/ # returned output from the serialized function including the model
        exception/ # any exceptions from invoking the serialized function
```

S3 バケットで指定するルートディレクトリは、長期保存用ではありません。シリアル化されたデータは、シリアル化中に使用された Python バージョンと機械学習 (ML) フレームワークのバージョンと密接に関連付けられます。Python バージョンまたは機械学習フレームワークをアップグレードすると、シリアル化されたデータを使用できなくなる場合があります。代わりに、以下の手順を実行します。
+ モデルとモデルアーティファクトを、ご使用の Python バージョンおよび機械学習フレームワークに依存しない形式で保存します。
+ Python または機械学習フレームワークをアップグレードする場合、長期ストレージからモデル結果にアクセスします。

**重要**  
指定された時間の経過後にシリアル化されたデータを削除するには、S3 バケットに[有効期限設定](https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html)を行います。

**注記**  
Python [pickle](https://docs.python.org/3/library/pickle.html) モジュールを使用してシリアル化されたファイルは、CSV、Parquet、JSON などの他のデータ形式よりも移植性が低い場合があります。ソースが不明なピクル化ファイルをロードする場合は注意してください。

リモート関数の設定ファイルに含める内容の詳細については、「[設定ファイル](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)」を参照してください。

#### シリアル化されたデータへのアクセス
<a name="train-remote-decorator-invocation-input-output-access"></a>

管理者は、シリアル化されたデータの場所や構成ファイル内の暗号化設定など、シリアル化されたデータの設定を行うことができます。デフォルトでは、シリアル化されたデータは AWS Key Management Service (AWS KMS) キーで暗号化されます。管理者は、設定ファイルで指定したルートディレクトリへのアクセスを[バケットポリシー](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-bucket-policies.html)で制限することもできます。設定ファイルはプロジェクトおよびジョブ間で共有して使用できます。詳細については、「[ 設定ファイル](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html)」を参照してください。

## `RemoteExecutor` API を使用して関数を呼び出す
<a name="train-remote-decorator-invocation-api"></a>

`RemoteExecutor` API を使用して関数を呼び出すことができます。SageMaker AI Python SDK は `RemoteExecutor` 呼び出し内部のコードを SageMaker AI トレーニングジョブに変換します。その後、トレーニングジョブは非同期オペレーションとして関数を呼び出し、Future を返します。`RemoteExecutor` API を使用すると、複数のジョブを並列で実行できます。Python の Future の詳細については、「[Future](https://docs.python.org/3/library/asyncio-future.html)」を参照してください。

以下のコード例は、必要なライブラリをインポートする方法、関数を定義する方法、SageMaker AI インスタンスを起動する方法、および API を使用して `2` ジョブを並列実行するリクエストを送信する方法を示しています。

```
from sagemaker.remote_function import RemoteExecutor

def matrix_multiply(a, b):
    return np.matmul(a, b)


a = np.array([[1, 0],
             [0, 1]])
b = np.array([1, 2])

with RemoteExecutor(max_parallel_job=2, instance_type="{{ml.m5.large}}") as e:
    future = e.submit(matrix_multiply, a, b)

assert (future.result() == np.array([1,2])).all()
```

`RemoteExecutor` クラスは [concurrent.futures.Executor](https://docs.python.org/3/library/concurrent.futures.html) ライブラリの実装です。

次のコード例は、`RemoteExecutorAPI` を使用して関数を定義し、それを呼び出す方法を示しています。この例では、`RemoteExecutor` は合計では `4` ジョブを送信しますが、並列では `2` のみ送信します。最後の 2 つのジョブは、最小限のオーバーヘッドでクラスターを再利用します。

```
from sagemaker.remote_function.client import RemoteExecutor

def divide(a, b):
    return a/b 

with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e:
    futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]]

for future in futures:
    print(future.result())
```

`max_parallel_job` パラメータはレート制限メカニズムとしてのみ機能し、コンピューティングリソースの割り当ては最適化されません。前のコード例では、`RemoteExecutor` では、ジョブが送信される前に 2 つの並列ジョブのコンピューティングリソースを予約しません。`max_parallel_job` または @remote デコレータのその他のパラメータの詳細については、「[Remote function classes and methods specification](https://sagemaker.readthedocs.io/en/stable/remote_function/sagemaker.remote_function.html)」を参照してください。

### `RemoteExecutor` API の Future クラス
<a name="train-remote-decorator-invocation-api-future"></a>

future クラスは、非同期で呼び出されたときのトレーニングジョブからの return 関数を表す public クラスです。future クラスは [concurrent.futures.Future](https://docs.python.org/3/library/concurrent.futures.html) クラスを実装しています。このクラスを使用すると、基になるジョブを操作したり、データをメモリに読み込んだりできます。