

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# リアルタイム推論用のモデルを呼び出す
<a name="realtime-endpoints-test-endpoints"></a>

Amazon SageMaker AI を使用してモデルをエンドポイントにデプロイしたら、そこに推論リクエストを送信してモデルと対話できます。モデルに推論リクエストを送信するには、それをホストするエンドポイントを呼び出します。Amazon SageMaker Studio、 AWS SDK、または AWS CLIを使って、エンドポイントを呼び出します。

## Amazon SageMaker Studio を使ってモデルを呼び出す
<a name="realtime-endpoints-test-endpoints-studio"></a>

モデルをエンドポイントにデプロイした後、Amazon SageMaker Studio を使用してエンドポイントを表示し、単一の推論リクエストを送信することでエンドポイントをテストできます。

**注記**  
SageMaker AI はリアルタイムエンドポイントに対するエンドポイントテストのみを Studio でサポートしています。

**エンドポイントにテスト推論リクエストを送信するには**

1. Amazon SageMaker Studio を起動します。

1. 左側のナビゲーションペインで、**[デプロイ]** を選択します。

1. ドロップダウンから **[Endpoints]** (エンドポイント) を選択します。

1. エンドポイントを名前で検索し、表から名前を選択します。**[エンドポイント]** パネルに一覧表示されているエンドポイント名は、モデルをデプロイするときに定義されます。Studio ワークスペースで、**[エンドポイント]** ページが新しいタブに開きます。

1. **[推論のテスト]** タブを選択します。

1. **[テストオプション]** で、次のいずれかのオプションを選択します。

   1. **[サンプルリクエストをテストする]** を選択すると、エンドポイントにリクエストがすぐに送信されます。**[JSON エディタ]** を使用してサンプルデータを JSON 形式で送信し、**[リクエストの送信]** を選択してリクエストをエンドポイントに送信します。リクエストを送信すると、Studio の JSON エディタの右側のカードに推論の出力が表示されます。

   1. **[Python SDK サンプルコードを使用する]** を選択すると、エンドポイントにリクエストを送信するためのコードが表示されます。そのコードを **[推論リクエストの例]** セクションからコピーし、テスト環境で実行します。

カード上部に、エンドポイントに送信されたリクエストのタイプが表示されます (現在は JSON のみが受け入れられます)。カードには次のフィールドが表示されます。
+ **ステータス** – 次のいずれかのステータスタイプが表示されます。
  + `Success` – 成功したリクエスト。
  + `Failed` – 失敗したリクエスト。**[失敗の理由]** の下にレスポンスが表示されます。
  + `Pending` – 推論リクエストの保留中は、ステータスに回転する円形のアイコンが表示されます。
+ **実行時間** – 呼び出しにかかった時間 (終了時刻から開始時刻を引いた時間) をミリ秒単位で表します。
+ **リクエスト時間**: リクエストが送信されてから何分経過したか。
+ **結果時間**: 結果が返されてから何分経過したか。

## を使用してモデルを呼び出す AWS SDK for Python (Boto3)
<a name="realtime-endpoints-test-endpoints-api"></a>

アプリケーションコードでモデルエンドポイントを呼び出す場合は、 を含むいずれかの AWS SDKs を使用できます AWS SDK for Python (Boto3)。この SDK を使用してエンドポイントを呼び出すには、次のいずれかの Python メソッドを使用します。
+ `invoke_endpoint` – モデルエンドポイントに推論リクエストを送信し、モデルが生成したレスポンスを返します。

  このメソッドは、モデルが生成を完了した後、推論ペイロードを 1 つのレスポンスとして返します。詳細については、「*AWS SDK for Python (Boto3) API リファレンス*」の「[invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)」を参照してください。
+ `invoke_endpoint_with_response_stream` - モデルエンドポイントに推論リクエストを送信し、モデルが推論を生成している間、レスポンスを順番にストリーミングします。

  このメソッドを使用すると、アプリケーションはレスポンスの一部が使用可能になるとすぐに受信します。詳細については、「*AWS SDK for Python (Boto3) API リファレンス*」の「[invoke\$1endpoint](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime/client/invoke_endpoint.html)」を参照してください。

  このメソッドは、推論ストリーミングをサポートするモデルを呼び出す場合にのみ使用してください。

これらのメソッドをアプリケーションコードで使用する前に、SageMaker AI ランタイムクライアントを起動し、エンドポイントの名前を指定する必要があります。次の例で、残りの例のクライアントとエンドポイントを設定します。

```
import boto3

sagemaker_runtime = boto3.client(
    "sagemaker-runtime", region_name='aws_region')

endpoint_name='endpoint-name'
```

### 推論レスポンスを取得するために呼び出す
<a name="test-invoke-endpoint"></a>

次の例では、`invoke_endpoint` メソッドを使用し、 AWS SDK for Python (Boto3)でエンドポイントを呼び出します。

```
# Gets inference from the model hosted at the specified endpoint:
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    Body=bytes('{"features": ["This is great!"]}', 'utf-8')
    )

# Decodes and prints the response body:
print(response['Body'].read().decode('utf-8'))
```

この例では、SageMaker AI への入力データを `Body` フィールドで指定し、モデルに渡しています。このデータは、トレーニングに使われたのと同じ形式である必要があります。例では、レスポンスを `response` 変数に割り当てています。

`response` 変数は、HTTP ステータス、デプロイされたモデルの名前、および他のフィールドへのアクセスを提供します。次のスニペットは HTTP ステータスコードを出力します:

```
print(response["HTTPStatusCode"])
```

### 推論レスポンスをストリーミングするために呼び出す
<a name="test-invoke-endpoint-with-response-stream"></a>

推論ストリーミングをサポートするモデルをデプロイした場合、そのモデルを呼び出して、その推論ペイロードをパートのストリームとして受け取ることができます。モデルがこれらのパートを生成すると、段階的に配信されます。推論ストリームを受信する場合、アプリケーションはモデルがレスポンスペイロード全体を生成するのを待つ必要はありません。代わりに、アプリケーションはレスポンスの一部が使用可能になるとすぐに受信します。

アプリケーションで推論ストリームを使用することで、ユーザーは最初のパートをすぐに取得できるので、推論が速いと認識できるインタラクションを作成できます。ストリーミングを実装し、チャットボット、仮想アシスタント、ミュージックジェネレーターなどの高速でインタラクティブな体験をサポートできます。例えば、大規模言語モデル (LLM) が生成するテキストを段階的に表示するチャットボットを作成できます。

推論ストリームを取得するには、`invoke_endpoint_with_response_stream` メソッドを使用します。レスポンスの本文で、SDK は推論を一連の `PayloadPart` オブジェクトとして出力する `EventStream` オブジェクトを提供します。

**Example 推論ストリーム**  
`PayloadPart` オブジェクトのストリームの例を次に示します。  

```
{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}}
{'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}}
. . .
```
各ペイロード部分において、`Bytes` フィールドがモデルからの推論レスポンスの一部を提供します。この部分は、テキスト、イメージ、オーディオデータなど、モデルが生成するどのコンテンツタイプにもなります。この例では、LLM から生成されたテキストを含む JSON オブジェクトです。  
通常、ペイロード部分にはモデルからの個別のデータチャンクが含まれます。この例では、個別のチャンクは JSON オブジェクト全体です。ストリーミングレスポンスは、チャンクを複数のペイロード部分に分割したり、複数のチャンクを 1 つのペイロードパートに組み合わせたりすることがあります。次の例は、2 つのペイロードパートに分割された JSON 形式のデータチャンクを示しています。  

```
{'PayloadPart': {'Bytes': b'{"outputs": '}}
{'PayloadPart': {'Bytes': b'[" problem"]}\n'}}
```
推論ストリームを処理するアプリケーションコードを記述するときは、このような時折発生するデータの分割や組み合わせを処理するロジックを含めてください。1 つの方法として、アプリケーションがペイロードパートを受け取る間、`Bytes` のコンテンツを連結するコードを記述することができます。ここで例の JSON データを連結することで、データを改行で区切られた JSON 本文に結合できます。その後、コードで各行の JSON オブジェクト全体を解析してストリームを処理できます。  
次の例は、例の `Bytes` のコンテンツを連結したときに作成される、改行で区切られた JSON です。  

```
{"outputs": [" a"]}
{"outputs": [" challenging"]}
{"outputs": [" problem"]}
. . .
```

**Example 推論ストリームを処理するコード**  

次の Python クラス `SmrInferenceStream` の例は、JSON 形式でテキストデータを送信する推論ストリームを処理する方法を示しています。

```
import io
import json

# Example class that processes an inference stream:
class SmrInferenceStream:
    
    def __init__(self, sagemaker_runtime, endpoint_name):
        self.sagemaker_runtime = sagemaker_runtime
        self.endpoint_name = endpoint_name
        # A buffered I/O stream to combine the payload parts:
        self.buff = io.BytesIO() 
        self.read_pos = 0
        
    def stream_inference(self, request_body):
        # Gets a streaming inference response 
        # from the specified model endpoint:
        response = self.sagemaker_runtime\
            .invoke_endpoint_with_response_stream(
                EndpointName=self.endpoint_name, 
                Body=json.dumps(request_body), 
                ContentType="application/json"
        )
        # Gets the EventStream object returned by the SDK:
        event_stream = response['Body']
        for event in event_stream:
            # Passes the contents of each payload part
            # to be concatenated:
            self._write(event['PayloadPart']['Bytes'])
            # Iterates over lines to parse whole JSON objects:
            for line in self._readlines():
                resp = json.loads(line)
                part = resp.get("outputs")[0]
                # Returns parts incrementally:
                yield part
    
    # Writes to the buffer to concatenate the contents of the parts:
    def _write(self, content):
        self.buff.seek(0, io.SEEK_END)
        self.buff.write(content)

    # The JSON objects in buffer end with '\n'.
    # This method reads lines to yield a series of JSON objects:
    def _readlines(self):
        self.buff.seek(self.read_pos)
        for line in self.buff.readlines():
            self.read_pos += len(line)
            yield line[:-1]
```

この例では、次のように推論ストリームを処理します。
+ SageMaker AI ランタイムクライアントを初期化し、モデルエンドポイントの名前を設定します。推論ストリームを取得する前に、エンドポイントがホストするモデルは推論ストリーミングをサポートしている必要があります。
+ 例の `stream_inference` メソッドでリクエスト本文を受け取り、SDK の `invoke_endpoint_with_response_stream` メソッドに渡します。
+ SDK が返す `EventStream` オブジェクト内の各イベントを反復処理します。
+ 各イベントから、`PayloadPart` オブジェクト内の `Bytes` オブジェクトのコンテンツを取得します。
+ 例の `_write` メソッドで、`Bytes` オブジェクトのコンテンツを連結するためにバッファーに書き込みます。組み合わせられたコンテンツは、改行で区切られた JSON ボディを形成します。
+ 例の `_readlines` メソッドを使用して、反復可能な一連の JSON オブジェクトを取得します。
+ 各 JSON オブジェクトで、推論のピースを取得します。
+ `yield` 式を使用すると、パートを段階的に返します。

次の例では、`SmrInferenceStream` オブジェクトを作成し、使用します。

```
request_body = {"inputs": ["Large model inference is"],
                "parameters": {"max_new_tokens": 100,
                               "enable_sampling": "true"}}
smr_inference_stream = SmrInferenceStream(
    sagemaker_runtime, endpoint_name)
stream = smr_inference_stream.stream_inference(request_body)
for part in stream:
    print(part, end='')
```

この例では、リクエスト本文を `stream_inference` メソッドに渡しています。レスポンスを反復処理して、推論ストリームが返す各ピースを出力しています。

この例では、指定されたエンドポイントのモデルがテキストを生成する LLM であることを前提としています。この例の出力は、段階的に出力される生成テキストの本文です。

```
a challenging problem in machine learning. The goal is to . . .
```

## を使用してモデルを呼び出す AWS CLI
<a name="realtime-endpoints-test-endpoints-cli"></a>

 AWS Command Line Interface () でコマンドを実行することで、モデルエンドポイントを呼び出すことができますAWS CLI。 AWS CLI は `invoke-endpoint` コマンドによる標準的な推論リクエストをサポートし、`invoke-endpoint-async` コマンドにより非同期推論リクエストもサポートします。

**注記**  
 AWS CLI は、ストリーミング推論リクエストをサポートしていません。

次の例で、`invoke-endpoint` コマンドを使用して推論リクエストをモデルエンドポイントに送信します。

```
aws sagemaker-runtime invoke-endpoint \
    --endpoint-name endpoint_name \
    --body fileb://$file_name \
    output_file.txt
```

`--endpoint-name` パラメータには、エンドポイントを作成したときに指定した名前を入力します。`--body` パラメータには、SageMaker AI がモデルに渡す入力データを指定します。データは、トレーニングに使われたのと同じ形式である必要があります。この例は、バイナリデータをエンドポイントに送信する方法を示しています。

ファイルの内容を のパラメータに渡す`fileb://`ときに を`file://`いつ上書きするかの詳細については AWS CLI、[「ローカルファイルパラメータのベストプラクティス](https://aws.amazon.com/blogs/developer/best-practices-for-local-file-parameters/)」を参照してください。

渡すことができる追加のパラメータの詳細については、「AWS CLI コマンドリファレンス」の「[https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html](https://docs.aws.amazon.com/cli/latest/reference/sagemaker-runtime/invoke-endpoint.html)」を参照してください。

`invoke-endpoint` コマンドが成功すると、次のようなレスポンスが返されます。

```
{
    "ContentType": "<content_type>; charset=utf-8",
    "InvokedProductionVariant": "<Variant>"
}
```

コマンドが成功しない場合は、入力ペイロードが正しい形式かどうかを確認してください。

ファイル出力ファイル (この例では `output_file.txt`) を確認することで、呼び出しの出力を表示します。

```
more output_file.txt
```

## AWS SDK for Python を使用してモデルを呼び出す
<a name="realtime-endpoints-test-endpoints-sdk"></a>

### を呼び出して推論リクエストとレスポンスを双方向にストリーミングする
<a name="realtime-endpoints-test-endpoints-sdk-overview"></a>

アプリケーションコードでモデルエンドポイントを呼び出して双方向ストリーミングをサポートする場合は、HTTP/2 をサポートする双方向ストリーミング機能をサポートする[新しい実験的な SDK for Python](https://github.com/awslabs/aws-sdk-python) を使用できます。この SDK により、クライアントアプリケーションと SageMaker エンドポイント間のリアルタイムの双方向通信が可能になり、モデルが生成するストリーミングレスポンスを同時に受信しながら、推論リクエストを段階的に送信できます。これは、クライアントとサーバーの両方が永続的な接続を介して継続的にデータを交換する必要があるインタラクティブアプリケーションに特に役立ちます。

**注記**  
新しい実験的な SDK は、標準の Boto3 SDK とは異なり、データ交換のための永続的な双方向接続をサポートしています。実験的な Python SDK を使用する場合は、実験以外のユースケースのために SDK のバージョンに厳密に固定することを強くお勧めします。

双方向ストリーミングでエンドポイントを呼び出すには、 `invoke_endpoint_with_bidirectional_stream`メソッドを使用します。このメソッドは、モデルがデータを処理するときにリアルタイムでレスポンスを受信しながら、複数のペイロードチャンクをモデルにストリーミングできる永続的な接続を確立します。入力ストリームを明示的に閉じるか、エンドポイントが接続を閉じるまで接続は開いたままになり、最大 30 分の接続時間をサポートします。

### 前提条件
<a name="realtime-endpoints-test-endpoints-sdk-prereq"></a>

アプリケーションコードで双方向ストリーミングを使用する前に、以下を行う必要があります。

1. 実験的な SageMaker ランタイム HTTP/2 SDK をインストールする

1. SageMaker ランタイムクライアントの AWS 認証情報を設定する

1. 双方向ストリーミングをサポートするモデルを SageMaker エンドポイントにデプロイする

### 双方向ストリーミングクライアントを設定する
<a name="realtime-endpoints-test-endpoints-sdk-setup-client"></a>

次の例は、双方向ストリーミングに必要なコンポーネントを初期化する方法を示しています。

```
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

# Configuration
AWS_REGION = "us-west-2"
BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443"
ENDPOINT_NAME = "your-endpoint-name"

# Initialize the client configuration
config = Config(
    endpoint_uri=BIDI_ENDPOINT,
    region=AWS_REGION,
    aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
    auth_scheme_resolver=HTTPAuthSchemeResolver(),
    auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
)

# Create the SageMaker Runtime HTTP/2 client
client = SageMakerRuntimeHTTP2Client(config=config)
```

### 双方向ストリーミングクライアントを完了する
<a name="realtime-endpoints-test-endpoints-sdk-complete-client"></a>

次の例は、複数のテキストペイロードを SageMaker エンドポイントに送信し、レスポンスをリアルタイムで処理する双方向ストリーミングクライアントを作成する方法を示しています。

```
import asyncio
import logging
from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client
from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver
from sagemaker_runtime_http2.models import (
    InvokeEndpointWithBidirectionalStreamInput, 
    RequestStreamEventPayloadPart, 
    RequestPayloadPart
)
from smithy_aws_core.identity import EnvironmentCredentialsResolver
from smithy_aws_core.auth.sigv4 import SigV4AuthScheme

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SageMakerBidirectionalClient:
    
    def __init__(self, endpoint_name, region="us-west-2"):
        self.endpoint_name = endpoint_name
        self.region = region
        self.client = None
        self.stream = None
        self.response_task = None
        self.is_active = False
        
    def _initialize_client(self):
        bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443"
        config = Config(
            endpoint_uri=bidi_endpoint,
            region=self.region,
            aws_credentials_identity_resolver=EnvironmentCredentialsResolver(),
            auth_scheme_resolver=HTTPAuthSchemeResolver(),
            auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")}
        )
        self.client = SageMakerRuntimeHTTP2Client(config=config)
    
    async def start_session(self):
        """Establish a bidirectional streaming connection with the endpoint."""
        if not self.client:
            self._initialize_client()
            
        logger.info(f"Starting session with endpoint: {self.endpoint_name}")
        self.stream = await self.client.invoke_endpoint_with_bidirectional_stream(
            InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name)
        )
        self.is_active = True
        
        # Start processing responses concurrently
        self.response_task = asyncio.create_task(self._process_responses())
    
    async def send_message(self, message):
        """Send a single message to the endpoint."""
        if not self.is_active:
            raise RuntimeError("Session not active. Call start_session() first.")
            
        logger.info(f"Sending message: {message}")
        payload = RequestPayloadPart(bytes_=message.encode('utf-8'))
        event = RequestStreamEventPayloadPart(value=payload)
        await self.stream.input_stream.send(event)
    
    async def send_multiple_messages(self, messages, delay=1.0):
        """Send multiple messages with a delay between each."""
        for message in messages:
            await self.send_message(message)
            await asyncio.sleep(delay)
    
    async def end_session(self):
        """Close the bidirectional streaming connection."""
        if not self.is_active:
            return
            
        await self.stream.input_stream.close()
        self.is_active = False
        logger.info("Stream closed")
        
        # Cancel the response processing task
        if self.response_task and not self.response_task.done():
            self.response_task.cancel()
    
    async def _process_responses(self):
        """Process incoming responses from the endpoint."""
        try:
            output = await self.stream.await_output()
            output_stream = output[1]
            
            while self.is_active:
                result = await output_stream.receive()
                
                if result is None:
                    logger.info("No more responses")
                    break
                
                if result.value and result.value.bytes_:
                    response_data = result.value.bytes_.decode('utf-8')
                    logger.info(f"Received: {response_data}")
                    
        except Exception as e:
            logger.error(f"Error processing responses: {e}")

# Example usage
async def run_bidirectional_client():
    client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name")
    
    try:
        # Start the session
        await client.start_session()
        
        # Send multiple messages
        messages = [
            "I need help with", 
            "my account balance", 
            "I can help with that", 
            "and recent charges"
        ]
        await client.send_multiple_messages(messages)
        
        # Wait for responses to be processed
        await asyncio.sleep(2)
        
        # End the session
        await client.end_session()
        logger.info("Session ended successfully")
        
    except Exception as e:
        logger.error(f"Client error: {e}")
        await client.end_session()

if __name__ == "__main__":
    asyncio.run(run_bidirectional_client())
```

クライアントは、双方向ストリーミング接続に必要なポート 8443 のリージョンエンドポイント URI を使用して SageMaker Runtime HTTP/2 クライアントを初期化します。start\$1`session()` メソッドは `invoke_endpoint_with_bidirectional_stream()`を呼び出して永続的な接続を確立し、受信レスポンスを同時に処理する非同期タスクを作成します。

`send_event()` メソッドはペイロードデータを適切なリクエストオブジェクトにラップし、入力ストリームを介して送信します。一方、`_process_responses()`メソッドはエンドポイントからのレスポンスを継続的にリッスンして処理します。この双方向アプローチにより、リクエストの送信とレスポンスの受信の両方が同じ接続で同時に行われるリアルタイムのやり取りが可能になります。