具託管服務的自訂推論程式碼 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

具託管服務的自訂推論程式碼

本節說明 Amazon SageMaker AI 與執行您自有的推論程式碼以取得託管服務之 Docker 容器的互動方式。請運用本文資訊來撰寫推論程式碼和建立 Docker 映像。

SageMaker AI 執行推論映像的方式

欲設定容器做為可執行檔來執行,請使用 Dockerfile 的 ENTRYPOINT 指示。注意下列事項:

  • 對於模型推論,SageMaker AI 執行容器的方法如下所示:

    docker run image serve

    SageMaker AI 在映像名稱後指定 serve 引數,藉此覆寫容器中預設的 CMD 陳述式。您在 Dockerfile 中搭配 CMD 指令提供的引數,會被 serve 引數覆寫。

     

  • SageMaker AI 預期所有容器都使用根使用者執行。建立您的容器,使其僅使用根使用者。SageMaker AI 執行您的容器時,沒有根層級存取權的使用者可能會造成許可問題。

     

  • 建議使用 ENTRYPOINT 指示的 exec 格式:

    ENTRYPOINT ["executable", "param1", "param2"]

    例如:

    ENTRYPOINT ["python", "k_means_inference.py"]

    ENTRYPOINT 指示的 exec 格式會直接做為可執行檔啟動,而非 /bin/sh 的子項。如此其便能自 SageMaker API 作業接收 SIGTERMSIGKILL 等訊號,而這是一項要求。

     

    例如,使用 CreateEndpoint API 建立端點時,SageMaker AI 所佈建的機器學習 (ML) 運算執行個體數量,正是您在請求中所指定的端點組態所需的數量。SageMaker AI 會在這些執行個體上執行 Docker 容器。

     

    若減少支援端點的執行個體數目 (藉由呼叫 UpdateEndpointWeightsAndCapacities API),SageMaker AI 會執行命令,讓被終止的執行個體上的 Docker 容器停止。該命令會傳送出 SIGTERM 訊號,三十秒後再傳送 SIGKILL 訊號。

     

    若更新端點 (藉由呼叫 UpdateEndpoint API),SageMaker AI 會啟動另一組機器學習 (ML) 運算執行個體,並在上面執行包含您自有的推論程式碼的 Docker 容器。然後會執行命令,將前一個 Docker 容器停止。若要停止 Docker 容器,命令會傳送 SIGTERM 訊號,30 秒後再傳送 SIGKILL 訊號。

     

  • SageMaker AI 會使用您在 CreateModel 請求中提供的容器定義,設定環境變數和容器的 DNS 主機名稱,如下所示:

     

    • 使用 ContainerDefinition.Environment 字串對字串的映射來設定環境變數。

    • 它使用 ContainerDefinition.ContainerHostname 設定 DNS 主機名稱。

       

  • 如果您計劃使用 GPU 裝置進行模型推論 (在您的 CreateEndpointConfig 請求中指定以 GPU 為基礎的機器學習 (ML) 運算執行個體),請確保您的容器與 nvidia-docker 相容。請勿將 NVIDIA 驅動程式與映像整合成套件。如需 nvidia-docker 的詳細資訊,請參閱 NVIDIA/nvidia-docker

     

  • 您無法將 tini 初始設定式作為 SageMaker AI 容器的進入點使用,因為 trainserve 引數會混淆該設定式。

SageMaker AI 載入模型成品的方式

CreateModel API 請求中,您可以使用 ModelDataUrlS3DataSource 參數來識別存放模型成品的 S3 位置。SageMaker AI 會從 S3 位置將模型成品複製到 /opt/ml/model 目錄內,以供您的推論程式碼使用。您的容器只有 /opt/ml/model 的唯讀存取權限。請勿寫入此目錄。

ModelDataUrl 必須指向 tar.gz 檔案。否則,SageMaker AI 不會下載檔案。

如果在 SageMaker AI 訓練您的模型,則模型成品會儲存為 Amazon S3 的單一壓縮 tar 檔案。如果您在 SageMaker AI 外部訓練模型,則需要建立此單一壓縮 tar 檔案,並將其儲存在 S3 位置。容器開始之前,SageMaker AI 會將此 tar 檔案解壓縮至 /opt/ml/model 目錄。

對於部署大型模型,我們建議您依照 部署未壓縮的模型

容器對推論請求應有的回應方式

為了取得推論,用戶端應用程式會傳送 POST 請求至 SageMaker AI 端點。SageMaker AI 將請求傳遞至容器,再將容器所得的推論結果傳回用戶端。

如需容器將收到的推論請求的詳細資訊,請參閱 Amazon SageMaker AI API 參考中的下列動作:

推論容器的需求

若要回應推論請求,您的容器須符合下列要求:

  • SageMaker AI 會移除所有 POST 標題,僅會留下由 InvokeEndpoint 支援的部分。SageMaker AI 可能會新增其他標題。推論容器必須能安全地忽略這類額外的標題。

  • 為了接收推論請求,容器須擁有可以監聽 8080 連接埠的 Web 伺服器,且需接受傳至 /invocations/ping 端點的 POST 請求。

  • 客戶的模型容器必須在 250 毫秒內接受插槽連線請求。

  • 客戶的模型容器必須在 60 秒內回應請求。模型本身在回應 /invocations 之前的處理時間上限為 60 秒。如果您的模型處理時間需要 50-60 秒,則 SDK 的插槽逾時應設為 70 秒。

  • 支援雙向串流的客戶模型容器必須:

    • 根據預設, 支援連接埠 8080 到 /invocations-bidirectional-stream 的 WebSockets 連線。

    • 讓 Web 伺服器接聽連接埠 8080,且必須接受對 /ping 端點的 POST 請求。

    • 除了透過 HTTP 進行容器運作狀態檢查之外,容器還必須回應每個 (RFC6455) 的 Pong Frame,以便傳送 WebSocket Ping Frame。

範例調用函式

下列範例示範容器中的程式碼如何處理推論請求。這些範例處理用戶端應用程式使用 InvokeEndpoint 動作傳送的請求。

FastAPI

FastAPI 是使用 Python 建置 API 的 Web 架構。

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此範例中,invocations 函式處理 SageMaker AI 傳送至 /invocations 端點的推論請求。

Flask

Flask 是使用 Python 開發 Web 應用程式的架構。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

在此範例中,invoke 函式處理 SageMaker AI 傳送至 /invocations 端點的推論請求。

範例串流請求的調用函式

下列範例示範推論容器中的程式碼如何處理串流推論請求。這些範例處理用戶端應用程式使用 InvokeEndpointWithResponseStream 動作傳送的請求。

容器處理串流推論請求時,在模型產生推論時會以遞增方式連續傳回一部分的模型推論。用戶端應用程式在可用時立即開始接收回應。該應用程式不需要等待模型產生整個回應。您可以實作串流以支援快速互動體驗,例如聊天機器人、虛擬助理和音樂產生器。

FastAPI

FastAPI 是使用 Python 建置 API 的 Web 架構。

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

在此範例中,invocations 函式處理 SageMaker AI 傳送至 /invocations 端點的推論請求。為了串流回應,該範例使用 Starlette 架構中的StreamingResponse 類別。

Flask

Flask 是使用 Python 開發 Web 應用程式的架構。

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

在此範例中,invocations 函式處理 SageMaker AI 傳送至 /invocations 端點的推論請求。為了串流回應,該範例使用 Flask 架構中的 flask.stream_with_context 函式。

範例雙向串流的範例調用函數

下列範例示範容器中的程式碼如何處理串流推論請求和回應。這些範例使用 InvokeEndpointWithBidirectionalStream 動作來處理用戶端應用程式傳送的串流請求。

具有雙向串流功能的容器會處理串流推論請求,其中部分會在用戶端遞增產生並串流至容器。它會在模型產生模型時,以一系列部分的形式將模型的推論傳回給用戶端。用戶端應用程式在可用時立即開始接收回應。他們不需要等待在用戶端完全產生的 請求,也不需要等待模型產生整個回應。您可以實作雙向串流以支援快速互動式體驗,例如聊天機器人、互動式語音 AI 助理和即時翻譯,以獲得更即時的體驗。

FastAPI

FastAPI 是使用 Python 建置 API 的 Web 架構。

import sys import asyncio import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import uvicorn app = FastAPI() ... @app.websocket("/invocations-bidirectional-stream") async def websocket_invoke(websocket: WebSocket): """ WebSocket endpoint with RFC 6455 ping/pong and fragmentation support Handles: - Text messages (JSON) - including fragmented frames - Binary messages - including fragmented frames - Ping frames (automatically responds with pong) - Pong frames (logs receipt) - Fragmented frames per RFC 6455 Section 5.4 """ await manager.connect(websocket) # Fragment reassembly buffers per RFC 6455 Section 5.4 text_fragments = [] binary_fragments = [] while True: # Use receive() to handle all WebSocket frame types message = await websocket.receive() print(f"Received message: {message}") if message["type"] == "websocket.receive": if "text" in message: # Handle text frames (including fragments) text_data = message["text"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it text_fragments.append(text_data) print(f"Received text fragment: {len(text_data)} chars (more coming)") else: # This is the final frame or a complete message if text_fragments: # Reassemble fragmented message text_fragments.append(text_data) complete_text = "".join(text_fragments) text_fragments.clear() print(f"Reassembled fragmented text message: {len(complete_text)} chars total") await handle_text_message(websocket, complete_text) else: # Complete message in single frame await handle_text_message(websocket, text_data) elif "bytes" in message: # Handle binary frames (including fragments) binary_data = message["bytes"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it binary_fragments.append(binary_data) print(f"Received binary fragment: {len(binary_data)} bytes (more coming)") else: # This is the final frame or a complete message if binary_fragments: # Reassemble fragmented message binary_fragments.append(binary_data) complete_binary = b"".join(binary_fragments) binary_fragments.clear() print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total") await handle_binary_message(websocket, complete_binary) else: # Complete message in single frame await handle_binary_message(websocket, binary_data) elif message["type"] == "websocket.ping": # Handle ping frames - RFC 6455 Section 5.5.2 ping_data = message.get("bytes", b"") print(f"Received PING frame with payload: {ping_data}") # FastAPI automatically sends pong response elif message["type"] == "websocket.pong": # Handle pong frames pong_data = message.get("bytes", b"") print(f"Received PONG frame with payload: {pong_data}") elif message["type"] == "websocket.close": # Handle close frames - RFC 6455 Section 5.5.1 close_code = message.get("code", 1000) close_reason = message.get("reason", "") print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'") # Send close frame response if not already closing try: await websocket.close(code=close_code, reason=close_reason) print(f"Sent CLOSE frame response - Code: {close_code}") except Exception as e: print(f"Error sending close frame: {e}") break elif message["type"] == "websocket.disconnect": print("Client initiated disconnect") break else: print(f"Received unknown message type: {message['type']}") break async def handle_binary_message(websocket: WebSocket, binary_data: bytes): """Handle incoming binary messages (complete or reassembled from fragments)""" print(f"Processing complete binary message: {len(binary_data)} bytes") try: # Echo back the binary data await websocket.send_bytes(binary_data) except Exception as e: print(f"Error handling binary message: {e}") async def handle_text_message(websocket: WebSocket, data: str): """Handle incoming text messages""" try: # Send response back to the same client await manager.send_personal_message(data, websocket) except Exception as e: print(f"Error handling text message: {e}") def main(): if len(sys.argv) > 1 and sys.argv[1] == "serve": print("Starting server on port 8080...") uvicorn.run(app, host="0.0.0.0", port=8080) else: print("Usage: python app.py serve") sys.exit(1) if __name__ == "__main__": main()

在此範例中,websocket_invoke 函式處理 SageMaker AI 傳送至 /invocations-bidirectional-stream 端點的推論請求。它會顯示處理串流請求並將回應串流回用戶端。

容器對運作狀態檢查 (Ping) 請求應有的回應方式

SageMaker AI 會在以下情況啟動新的推論容器:

  • 回應 CreateEndpointUpdateEndpointUpdateEndpointWeightsAndCapacities API 呼叫

  • 安全性修補程式

  • 取代狀態不佳的執行個體

容器啟動後不久,SageMaker AI 會開始定期傳送 GET 請求至 /ping 端點。

容器上最簡單的請求是以 HTTP 200 狀態碼和空內文做為回應。這是通知 SageMaker AI,容器已準備好可在 /invocations 端點接受推論請求。

如果容器並未開始通過運作狀態檢查,即在啟動後的 8 分鐘內持續回應 200 狀態碼,則新執行個體啟動會失敗。這會導致 CreateEndpoint 失敗,使端點處於失敗狀態。UpdateEndpoint 請求的更新不會完成,不會套用安全修補程式,且不會取代狀況不良的執行個體。

雖然容器的最低標準是傳回靜態的 200,容器開發人員也能運用此功能來進行更加深入的檢查。/ping 嘗試的請求逾時為 2 秒。

此外,能夠處理雙向串流請求的容器必須使用 Pong Frame (每個 WebSocket 通訊協定 RFC6455) 回應 Ping Frame。如果連續 5 個 Ping 未收到 Pong Frame,SageMaker AI 平台會關閉與容器的連線。SageMaker AI 平台也會使用 Pong Frames 從模型容器回應 Ping Frames。

支援雙向串流功能的容器合約

如果您想要將模型容器託管為支援雙向串流功能的 SageMaker AI 端點,則模型容器必須支援以下合約:

1。雙向 Docker 標籤

模型容器應該有一個 Docker 標籤,向 SageMaker AI 平台指出此容器支援雙向串流功能。

com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true

2. 支援叫用 WebSocket 連線

根據/invocations-bidirectional-stream預設,支援雙向串流的客戶模型容器必須支援連接埠 8080 到 的 WebSockets 連線。

叫用 InvokeEndpointWithBidirectionalStream API 時,傳遞 X-Amzn-SageMaker-Model-Invocation-Path 標頭可以覆寫此路徑。此外,使用者可以指定要附加至此路徑的查詢字串,方法是在叫用 InvokeEndpointWithBidirectionalStream API 時傳遞 X-Amzn-SageMaker-Model-Query-String 標頭。

3. 請求串流處理

InvokeEndpointWithBidirectionalStream API 輸入承載會以一系列的 PayloadParts 串流,這只是二進位區塊的包裝函式 (「位元組」:<Blob>):

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE> "P": <String> } }

3.1. 資料框架

SageMaker AI 會將輸入 PayloadParts 傳遞至模型容器做為 WebSocket Data Frames (RFC6455-Section-5.6)

  1. SageMaker AI 不會檢查為二進位區塊。

  2. 接收輸入 PayloadPart 時

    • SageMaker AI 從 建立恰好一個 WebSocket 資料框架PayloadPart.Bytes,然後將其傳遞至模型容器。

    • 如果為 PayloadPart.DataType = UTF8,SageMaker AI 會建立文字資料框架

    • 如果 PayloadPart.DataType 不存在 或 PayloadPart.DataType = BINARY,SageMaker AI 會建立二進位資料框架

  3. 對於一系列具有 的 PayloadPartsPayloadPart.CompletionState = PARTIAL,以及由具有 的 PayloadPart 終止PayloadPart.CompletionState = COMPLETE,SageMaker AI 會將它們轉換為 WebSocket 分段訊息 RFC6455-Section-5.4: Fragmentation

    • 具有 的初始 PayloadPart PayloadPart.CompletionState = PARTIAL會轉譯為 WebSocket Data Frame,其中 FIN 位元清除。

    • 搭配 的後續 PayloadParts PayloadPart.CompletionState = PARTIAL會轉譯為具有 FIN 位元清除的 WebSocket 持續影格。

    • 使用 的最終 PayloadPart PayloadPart.CompletionState = COMPLETE將轉譯為具有 FIN 位元集的 WebSocket 持續架構。

  4. SageMaker AI 不會從輸入 PayloadPart 編碼或解碼二進位區塊,位元組會依原狀傳遞至模型容器。

  5. SageMaker AI 不會將多個輸入 PayloadParts 合併為一個 BinaryDataFrame。

  6. SageMaker AI 不會將一個輸入 PayloadPart 區塊化為多個 BinaryDataFrames。

範例:分段訊息流程

Client sends: PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"} Container receives: Frame 1: Text Data Frame with "Hello " (FIN=0) Frame 2: Continuation Frame with "World" (FIN=1)

3.2. 控制影格

除了資料影格之外,SageMaker AI 還會將控制影格傳送至模型容器 (RFC6455-Section-5.5):

  1. 關閉影格:如果連線因任何原因關閉,SageMaker AI 可能會傳送關閉影格 (RFC6455-Section-5.5.1) 給模型容器。

  2. Ping Frame:SageMaker AI 每 60 秒傳送一次 Ping Frame (RFC6455-Section-5.5.2),模型容器必須使用 Pong Frame 回應。如果連續 5 個 Ping 未收到 Pong Frame (RFC6455-Section-5.5.3),SageMaker AI 會關閉連線。

  3. Pong Frame:SageMaker AI 將使用 Pong Frames 從模型容器回應 Ping Frames。

4. 回應串流處理

輸出會串流為一系列的 PayloadParts、ModelStreamErrors 或 InternalStreamFailures。

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE>, }, "ModelStreamError": { "ErrorCode": <String>, "Message": <String> }, "InternalStreamFailure": { "Message": <String> } }

4.1. 資料框架

SageMaker AI 會將從模型容器接收的資料框架轉換為輸出 PayloadParts:

  1. 從模型容器接收 WebSocket Text Data Frame 時,SageMaker AI 會從文字資料框架取得原始位元組,並將其包裝為回應 PayloadPart,同時設定 PayloadPart.DataType = UTF8

  2. 從模型容器接收 WebSocket 二進位資料框架時,SageMaker AI 會將資料框架中的位元組直接包裝為回應 PayloadPart,同時設定 PayloadPart.DataType = BINARY

  3. 對於 RFC6455-Section-5.4: Fragmentation 中定義的分段訊息:

    • 具有 FIN 位元清除的初始資料框架將轉換為具有 的 PayloadPartPayloadPart.CompletionState = PARTIAL

    • 具有 FIN 位元清除的後續連續影格將使用 轉換為 PayloadPartsPayloadPart.CompletionState = PARTIAL

    • 具有 FIN 位元集的最終 Continuation Frame 將使用 轉換為 PayloadPartPayloadPart.CompletionState = COMPLETE

  4. SageMaker AI 不會對從模型容器收到的位元組進行編碼或解碼,位元組會依原狀傳遞到模型容器。

  5. SageMaker AI 不會將從模型容器收到的多個資料框架合併為一個回應 PayloadPart。

  6. SageMaker AI 不會將從模型容器接收的資料框架分成多個回應 PayloadParts。

範例:串流回應流程

Container sends: Frame 1: Text Data Frame with "Generating" (FIN=0) Frame 2: Continuation Frame with " response..." (FIN=1) Client receives: PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}

4.2. 控制影格

SageMaker AI 從模型容器回應下列控制框架:

  1. 從模型容器接收關閉影格 (RFC6455-Section-5.5.1) 時,SageMaker AI 會將狀態碼 (RFC6455-Section-7.4) 和失敗訊息包裝到 ModelStreamError,並將其串流回最終使用者。

  2. 從模型容器接收 Ping Frame (RFC6455-Section-5.5.2) 時,SageMaker AI 將回應 Pong Frame。

  3. Pong Frame(RFC6455-Section-5.5.3):如果連續 5 個 Ping 未收到 Pong Frame,SageMaker AI 會關閉連線。