ホスティングサービスでのカスタム推論コードの使用 - Amazon SageMaker AI

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ホスティングサービスでのカスタム推論コードの使用

このセクションでは、Amazon SageMaker AI がホスティングサービス用の独自の推論コードを実行する Docker コンテナとやり取りする方法について説明します。この情報を使用して、推論コードを書き込み、Docker イメージを作成します。

SageMaker AI が推論イメージを実行する方法

コンテナを実行可能ファイルとして実行するように設定するには、Dockerfile で ENTRYPOINT 命令を使用します。次の点に注意してください。

  • モデル推論の場合、SageMaker AI はコンテナを次のように実行します。

    docker run image serve

    SageMaker AI はイメージ名の後に serve 引数を指定して、コンテナ内のデフォルトの CMD ステートメントを上書きします。serve 引数は、Dockerfile の CMD コマンドで指定した引数よりも優先されます。

     

  • SageMaker AI では、すべてのコンテナがルートユーザーで実行されることを想定しています。ルートユーザーのみを使用するようにコンテナを作成します。SageMaker AI がコンテナを実行する際、ルートレベルのアクセス権を持たないユーザーによって権限の問題が発生する場合があります。

     

  • exec 命令の ENTRYPOINT フォームを使用することをお勧めします。

    ENTRYPOINT ["executable", "param1", "param2"]

    例:

    ENTRYPOINT ["python", "k_means_inference.py"]

    exec 命令の ENTRYPOINT フォームは、/bin/sh の子としてではなく、直接実行ファイルを開始します。これにより、SageMaker API オペレーションから SIGTERMSIGKILL のようなシグナルを受信できます。これは要件です。

     

    例えば、エンドポイントを作成する CreateEndpoint API を使用する場合に、SageMaker AI はリクエストで指定するエンドポイント設定で必要な ML コンピューティングインスタンスの数をプロビジョニングします。SageMaker AI はそれらのインスタンスで Docker コンテナを実行します。

     

    エンドポイントをバックアップするインスタンスの数を減らす場合 (UpdateEndpointWeightsAndCapacities API の呼び出しによって)、SageMaker AI は終了するインスタンスの Docker コンテナを停止するコマンドを実行します。コマンドは、SIGTERM シグナルを送信し、30 秒後に SIGKILL シグナルを送信します。

     

    エンドポイントを更新する場合 (UpdateEndpoint API の呼び出しによって)、SageMaker AI は別の一連の ML コンピューティングインスタンスを起動して、推論コードを含む Docker コンテナを実行します。次に、前の Docker コンテナを停止するコマンドを実行します。Docker コンテナを停止するために、コマンドは、SIGTERM シグナルを送信し、30 秒後に SIGKILL シグナルを送信します。

     

  • SageMaker AI は CreateModel リクエストで指定したコンテナ定義を使用してコンテナの環境変数と DNS ホスト名を次のように設定します。

     

    • ContainerDefinition.Environment 文字列間マップを使用して環境変数を設定します。

    • ContainerDefinition.ContainerHostname を使用して DNS ホスト名を設定します。

       

  • モデル推論に GPU デバイスを使用 (CreateEndpointConfig リクエストで GPU ベースの ML コンピューティングインスタンスを指定) する予定の場合は、コンテナが nvidia-docker 互換であることを確認してください。NVIDIA ドライバーをイメージにバンドルしないでください。nvidia-docker の詳細については、NVIDIA/nvidia-docker を参照してください。

     

  • tini 引数と train 引数で混乱が生じるため、SageMaker AI コンテナでは、serve イニシャライザをエントリポイントとして使用することはできません。

SageMaker AI がモデルアーティファクトをロードする方法

CreateModel API リクエストでは、ModelDataUrl または S3DataSource パラメータを使用して、モデルアーティファクトが保存されている S3 の場所を特定することができます。SageMaker AI は、推論コードで使用するために、モデルアーティファクトを S3 の場所から /opt/ml/model ディレクトリにコピーします。コンテナは /opt/ml/model に読み取り専用でアクセスできます。このディレクトリには書き込まないでください。

ModelDataUrl は tar.gz ファイルを指す必要があります。この情報がないと、SageMaker AI はファイルをダウンロードできません。

モデルを SageMaker AI でトレーニングした場合、モデルアーティファクトは Amazon S3 の単一の圧縮 tar ファイルとして保存されます。SageMaker AI の外部でモデルをトレーニングした場合は、この単一の圧縮 tar ファイルを作成して S3 の場所に保存する必要があります。コンテナが起動する前に、SageMaker AI は、この tar ファイルを /opt/ml/model ディレクトリに解凍します。

大規模なモデルをデプロイする場合は、非圧縮モデルのデプロイ の配置に従うことをお勧めします。

コンテナが推論リクエストに応答する方法

推論を取得するため、クライアントアプリケーションは POST リクエストを SageMaker AI エンドポイントに送信します。SageMaker AI はリクエストをコンテナに渡し、推論結果をコンテナからクライアントに返します。

コンテナが受け取る推論リクエストの詳細については、Amazon SageMaker AI API リファレンスの以下のアクションを参照してください。

推論コンテナの要件

推論リクエストに応答するには、コンテナが次の要件を満たしている必要があります。

  • InvokeEndpoint でサポートされているものを除き、SageMaker AI はすべての POST ヘッダーを削除します。SageMaker AI は追加のヘッダーを追加する場合があります。推論コンテナはこれらの追加ヘッダーを安全に無視できる必要があります。

  • 推論リクエストを受信するには、コンテナにポート 8080 でリッスンするウェブサーバーが必要であり、/invocations および /ping エンドポイントへの POST リクエストを受け入れる必要があります。

  • 顧客のモデルコンテナは、250 ミリ秒以内にソケット接続リクエストを受け入れる必要があります。

  • 顧客のモデルコンテナは、60 秒以内にリクエストに応答する必要があります。モデル自体は、/invocations に応答するまで 60 秒の最大処理時間をかけることができます。モデルの処理時間が 50 ~ 60 秒かかる場合は、SDK ソケットタイムアウトを 70 秒に設定する必要があります。

  • 双方向ストリーミングをサポートする顧客のモデルコンテナは、以下を行う必要があります。

    • は、デフォルトでポート 8080 から /invocations-bidirectional-stream への WebSockets 接続をサポートします。 invocations-bidirectional-stream

    • ポート 8080 でリッスンしているウェブサーバーがあり、/ping エンドポイントへの POST リクエストを受け入れる必要があります。

    • HTTP 経由のコンテナヘルスチェックに加えて、コンテナは送信される WebSocket Ping Frame の (RFC6455) に従って Pong Frame で応答する必要があります。

例呼び出し関数

以下の例は、コンテナ内のコードで推論リクエストを処理する方法を示しています。これらの例は、InvokeEndpoint アクションを使用してクライアントアプリケーションが送信するリクエストを処理します。

FastAPI

FastAPI は Python を使用して API を構築するためのウェブフレームワークです。

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

この例では、invocations 関数は SageMaker AI が /invocations エンドポイントに送信する推論リクエストを処理します。

Flask

Flask は、Python を使用してウェブアプリケーションを開発するためのフレームワークです。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

この例では、invoke 関数は SageMaker AI が /invocations エンドポイントに送信する推論リクエストを処理します。

例ストリーミングリクエストの呼び出し関数

以下の例は、推論コンテナ内のコードで推論リクエストを処理する方法を示しています。これらの例は、InvokeEndpointWithResponseStream アクションを使用してクライアントアプリケーションが送信するリクエストを処理します。

コンテナがストリーミング推論リクエストを処理すると、モデルが推論を生成する際に、モデルの推論は一連のパーツとして段階的に返されます。レスポンスが利用可能になると、クライアントアプリケーションはすぐに受信し始めます。クライアントアプリケーションは、モデルがレスポンス全体を生成するのを待つ必要はありません。ストリーミングを実装し、チャットボット、仮想アシスタント、ミュージックジェネレーターなどの高速でインタラクティブな体験をサポートできます。

FastAPI

FastAPI は Python を使用して API を構築するためのウェブフレームワークです。

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

この例では、invocations 関数は SageMaker AI が /invocations エンドポイントに送信する推論リクエストを処理します。この例ではレスポンスをストリーミングするために、Starlette フレームワークの StreamingResponse クラスを使用しています。

Flask

Flask は、Python を使用してウェブアプリケーションを開発するためのフレームワークです。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

この例では、invocations 関数は SageMaker AI が /invocations エンドポイントに送信する推論リクエストを処理します。この例ではレスポンスをストリーミングするために、Flask フレームワークの flask.stream_with_context 関数を使用しています。

例双方向ストリーミングの呼び出し関数の例

次の例は、コンテナ内のコードがストリーミング推論リクエストとレスポンスを処理する方法を示しています。これらの例では、InvokeEndpointWithBidirectionalStream アクションを使用してクライアントアプリケーションが送信するストリーミングリクエストを処理します。

双方向ストリーミング機能を備えたコンテナは、パートがクライアントで段階的に生成され、コンテナにストリーミングされるストリーミング推論リクエストを処理します。モデルの推論は、モデルが生成する一連のパートとしてクライアントに返されます。レスポンスが利用可能になると、クライアントアプリケーションはすぐに受信し始めます。クライアントで完全に生成された へのリクエストや、モデルがレスポンス全体を生成するのを待つ必要はありません。双方向ストリーミングを実装して、チャットボット、インタラクティブな音声 AI アシスタント、リアルタイム翻訳などの高速なインタラクティブエクスペリエンスをサポートし、よりリアルタイムのエクスペリエンスを実現できます。

FastAPI

FastAPI は Python を使用して API を構築するためのウェブフレームワークです。

import sys import asyncio import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import uvicorn app = FastAPI() ... @app.websocket("/invocations-bidirectional-stream") async def websocket_invoke(websocket: WebSocket): """ WebSocket endpoint with RFC 6455 ping/pong and fragmentation support Handles: - Text messages (JSON) - including fragmented frames - Binary messages - including fragmented frames - Ping frames (automatically responds with pong) - Pong frames (logs receipt) - Fragmented frames per RFC 6455 Section 5.4 """ await manager.connect(websocket) # Fragment reassembly buffers per RFC 6455 Section 5.4 text_fragments = [] binary_fragments = [] while True: # Use receive() to handle all WebSocket frame types message = await websocket.receive() print(f"Received message: {message}") if message["type"] == "websocket.receive": if "text" in message: # Handle text frames (including fragments) text_data = message["text"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it text_fragments.append(text_data) print(f"Received text fragment: {len(text_data)} chars (more coming)") else: # This is the final frame or a complete message if text_fragments: # Reassemble fragmented message text_fragments.append(text_data) complete_text = "".join(text_fragments) text_fragments.clear() print(f"Reassembled fragmented text message: {len(complete_text)} chars total") await handle_text_message(websocket, complete_text) else: # Complete message in single frame await handle_text_message(websocket, text_data) elif "bytes" in message: # Handle binary frames (including fragments) binary_data = message["bytes"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it binary_fragments.append(binary_data) print(f"Received binary fragment: {len(binary_data)} bytes (more coming)") else: # This is the final frame or a complete message if binary_fragments: # Reassemble fragmented message binary_fragments.append(binary_data) complete_binary = b"".join(binary_fragments) binary_fragments.clear() print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total") await handle_binary_message(websocket, complete_binary) else: # Complete message in single frame await handle_binary_message(websocket, binary_data) elif message["type"] == "websocket.ping": # Handle ping frames - RFC 6455 Section 5.5.2 ping_data = message.get("bytes", b"") print(f"Received PING frame with payload: {ping_data}") # FastAPI automatically sends pong response elif message["type"] == "websocket.pong": # Handle pong frames pong_data = message.get("bytes", b"") print(f"Received PONG frame with payload: {pong_data}") elif message["type"] == "websocket.close": # Handle close frames - RFC 6455 Section 5.5.1 close_code = message.get("code", 1000) close_reason = message.get("reason", "") print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'") # Send close frame response if not already closing try: await websocket.close(code=close_code, reason=close_reason) print(f"Sent CLOSE frame response - Code: {close_code}") except Exception as e: print(f"Error sending close frame: {e}") break elif message["type"] == "websocket.disconnect": print("Client initiated disconnect") break else: print(f"Received unknown message type: {message['type']}") break async def handle_binary_message(websocket: WebSocket, binary_data: bytes): """Handle incoming binary messages (complete or reassembled from fragments)""" print(f"Processing complete binary message: {len(binary_data)} bytes") try: # Echo back the binary data await websocket.send_bytes(binary_data) except Exception as e: print(f"Error handling binary message: {e}") async def handle_text_message(websocket: WebSocket, data: str): """Handle incoming text messages""" try: # Send response back to the same client await manager.send_personal_message(data, websocket) except Exception as e: print(f"Error handling text message: {e}") def main(): if len(sys.argv) > 1 and sys.argv[1] == "serve": print("Starting server on port 8080...") uvicorn.run(app, host="0.0.0.0", port=8080) else: print("Usage: python app.py serve") sys.exit(1) if __name__ == "__main__": main()

この例では、websocket_invoke 関数は SageMaker AI が /invocations-bidirectional-stream エンドポイントに送信する推論リクエストを処理します。ストリームリクエストとストリームレスポンスの処理がクライアントに返されます。

コンテナがヘルスチェック (Ping) リクエストに応答する方法

SageMaker AI は、以下の状況で新しい推論コンテナを起動します。

  • CreateEndpointUpdateEndpoint、および UpdateEndpointWeightsAndCapacities API 呼び出しへの応答

  • セキュリティパッチ

  • 異常のあるインスタンスの置き換え

コンテナの起動直後に、SageMaker AI は定期的に GET リクエストを /ping エンドポイントに送信し始めます。

コンテナの最も単純な要件は、HTTP 200 のステータスコードと空の本文で応答することです。これにより SageMaker AI に対し、コンテナが /invocations エンドポイントでの推論リクエストを受け入れる準備ができていることを示されます。

コンテナがスタートアップ後 8 分間に、常に 200 秒で応答しヘルスチェックを通過しない場合、新しいインスタンスの起動は失敗します。これにより CreateEndpoint は失敗し、エンドポイントは障害状態のままになります。UpdateEndpoint によって要求された更新は完了せず、セキュリティパッチは適用されず、異常のあるインスタンスは置き換えられません。

コンテナが静的な 200 を返す最小限のバーがありますが、コンテナ開発者はこの機能を使用してより深いチェックを実行できます。/ping 試行のリクエストのタイムアウトは 2 秒です。

さらに、双方向ストリーミングリクエストを処理できるコンテナは、Pong フレーム (WebSocket プロトコル RFC6455 に準拠) で Ping フレームに応答する必要があります。Pong Frame が 5 回連続して受信されない場合、コンテナへの接続は SageMaker AI プラットフォームによって閉じられます。SageMaker AI プラットフォームは、Pong Frames を持つモデルコンテナから Ping Frames にも応答します。

双方向ストリーミング機能をサポートするコンテナ契約

双方向ストリーミング機能をサポートする SageMaker AI エンドポイントとしてモデルコンテナをホストする場合は、モデルコンテナが以下の契約をサポートしている必要があります。

1. 双方向 Docker ラベル

モデルコンテナには、このコンテナで双方向ストリーミング機能がサポートされていることを SageMaker AI プラットフォームに示す Docker ラベルが必要です。

com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true

2. 呼び出しの WebSocket 接続のサポート

双方向ストリーミングをサポートする顧客のモデルコンテナは/invocations-bidirectional-stream、デフォルトでポート 8080 から への WebSockets 接続をサポートする必要があります。

このパスは、InvokeEndpointWithBidirectionalStream API を呼び出すときに X-Amzn-SageMaker-Model-Invocation-Path ヘッダーを渡すことで上書きできます。さらに、ユーザーは InvokeEndpointWithBidirectionalStream API を呼び出すときに X-Amzn-SageMaker-Model-Query-String ヘッダーを渡すことで、このパスに追加するクエリ文字列を指定できます。

3. リクエストストリームの処理

InvokeEndpointWithBidirectionalStream API 入力ペイロードは、バイナリチャンク (「バイト」: <Blob>) のラッパーにすぎない一連の PayloadParts として にストリーミングされます。

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE> "P": <String> } }

3.1。データフレーム

SageMaker AI が入力 PayloadParts をモデルコンテナに WebSocket データフレームとして渡す (RFC6455-Section-5.6)

  1. SageMaker AI はバイナリチャンクを検査しません。

  2. 入力 PayloadPart の受信時

    • SageMaker AI は、 から WebSocket データフレームを 1 つだけ作成しPayloadPart.Bytes、それをモデルコンテナに渡します。

    • の場合PayloadPart.DataType = UTF8、SageMaker AI はテキストデータフレームを作成します

    • PayloadPart.DataType に または が存在しない場合PayloadPart.DataType = BINARY、SageMaker AI はバイナリデータフレームを作成します。

  3. を使用した一連の PayloadParts でPayloadPart.CompletionState = PARTIAL、 を使用した PayloadPart によって終了された場合PayloadPart.CompletionState = COMPLETE、SageMaker AI はそれらを WebSocket フラグメント化メッセージ RFC6455-Section-5.4: フラグメント化に変換します。

    • を使用した最初の PayloadPart は、FIN ビットがクリアされた WebSocket データフレームに変換PayloadPart.CompletionState = PARTIALされます。

    • を使用した後続の PayloadParts は、FIN ビットクリアの WebSocket 継続フレームに変換PayloadPart.CompletionState = PARTIALされます。

    • を使用した最終的な PayloadPart は、FIN ビットが設定された WebSocket 継続フレームに変換PayloadPart.CompletionState = COMPLETEされます。

  4. SageMaker AI は入力 PayloadPart からバイナリチャンクをエンコードまたはデコードせず、バイトはそのままモデルコンテナに渡されます。

  5. SageMaker AI は、複数の入力 PayloadParts を 1 つの BinaryDataFrame に結合しません。

  6. SageMaker AI は、1 つの入力 PayloadPart を複数の BinaryDataFrames にチャンクしません。

例: フラグメント化されたメッセージフロー

Client sends: PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"} Container receives: Frame 1: Text Data Frame with "Hello " (FIN=0) Frame 2: Continuation Frame with "World" (FIN=1)

3.2。コントロールフレーム

データフレームに加えて、SageMaker AI はコントロールフレームをモデルコンテナ (RFC6455-Section-5.5) に送信します。

  1. Close Frame: 何らかの理由で接続が閉じられた場合、SageMaker AI は Close Frame (RFC6455-Section-5.5.1) をモデルコンテナに送信することがあります。

  2. Ping フレーム: SageMaker AI は 60 秒に 1 回 Ping フレーム (RFC6455-Section-5.5.2) を送信します。モデルコンテナは Pong フレームで応答する必要があります。連続 5 回の Ping で Pong Frame (RFC6455-Section-5.5.3) を受信しない場合、接続は SageMaker AI によって閉じられます。

  3. Pong Frame: SageMaker AI は、Pong Frames を持つモデルコンテナから Ping Frames に応答します。

4. レスポンスストリームの処理

出力は、一連の PayloadParts、ModelStreamErrors、または InternalStreamFailures としてストリーミングされます。

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE>, }, "ModelStreamError": { "ErrorCode": <String>, "Message": <String> }, "InternalStreamFailure": { "Message": <String> } }

4.1。データフレーム

SageMaker AI は、モデルコンテナから受信したデータフレームを出力 PayloadParts に変換します。

  1. モデルコンテナから WebSocket Text Data Frame を受信すると、SageMaker AI は Text Data Frame から raw バイトを取得し、それをレスポンス PayloadPart にラップし、同時に を設定しますPayloadPart.DataType = UTF8

  2. モデルコンテナから WebSocket バイナリデータフレームを受信すると、SageMaker AI はデータフレームからのバイトをレスポンス PayloadPart に直接ラップし、同時に を設定しますPayloadPart.DataType = BINARY

  3. RFC6455-Section-5.4: Fragmentation で定義されているフラグメント化されたメッセージの場合:

    • FIN ビットがクリアされた最初のデータフレームは、 の PayloadPart に変換されますPayloadPart.CompletionState = PARTIAL

    • FIN ビットがクリアされた後続の継続フレームは、 で PayloadParts に変換されますPayloadPart.CompletionState = PARTIAL

    • FIN ビットが設定された最後の継続フレームは、 を使用して PayloadPart に変換されますPayloadPart.CompletionState = COMPLETE

  4. SageMaker AI はモデルコンテナから受信したバイトをエンコードまたはデコードせず、バイトはそのままモデルコンテナに渡されます。

  5. SageMaker AI は、モデルコンテナから受信した複数のデータフレームを 1 つのレスポンス PayloadPart に結合しません。

  6. SageMaker AI は、モデルコンテナから受信したデータフレームを複数のレスポンス PayloadParts にチャンクしません。

例: ストリーミングレスポンスフロー

Container sends: Frame 1: Text Data Frame with "Generating" (FIN=0) Frame 2: Continuation Frame with " response..." (FIN=1) Client receives: PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}

4.2。コントロールフレーム

SageMaker AI は、モデルコンテナから次のコントロールフレームに応答します。

  1. モデルコンテナからクローズフレーム (RFC6455-Section-5.5.1) を受信すると、SageMaker AI はステータスコード (RFC6455-Section-7.4) と失敗メッセージを ModelStreamError にラップし、エンドユーザーにストリーミングします。

  2. モデルコンテナから Ping フレーム (RFC6455-Section-5.5.2) を受信すると、SageMaker AI は Pong Frame で応答します。

  3. Pong Frame(RFC6455-Section-5.5.3): Pong Frame が 5 回連続して受信されない場合、接続は SageMaker AI によって閉じられます。