Codice di inferenza personalizzato con servizi di hosting - Amazon SageMaker AI

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Codice di inferenza personalizzato con servizi di hosting

Questa sezione spiega come Amazon SageMaker AI interagisce con un contenitore Docker che esegue il tuo codice di inferenza per i servizi di hosting. Utilizza queste informazioni per scrivere il codice di inferenza e creare un'immagine Docker.

In che modo l' SageMaker intelligenza artificiale gestisce la tua immagine di inferenza

Per configurare un container per l'esecuzione come un eseguibile, utilizza un'istruzione ENTRYPOINT in un Dockerfile. Tenere presente quanto segue:

  • Per l'inferenza dei modelli, l' SageMaker intelligenza artificiale esegue il contenitore come:

    docker run image serve

    SageMaker L'IA sostituisce CMD le istruzioni predefinite in un contenitore specificando l'serveargomento dopo il nome dell'immagine. L'argomento serve sostituisce gli argomenti che fornisci con il comando CMD nel Dockerfile.

     

  • SageMaker L'IA si aspetta che tutti i contenitori vengano eseguiti con utenti root. Crea il tuo container in modo che utilizzi solo utenti root. Quando l' SageMaker intelligenza artificiale esegue il contenitore, gli utenti che non dispongono dell'accesso a livello di root possono causare problemi di autorizzazione.

     

  • Ti consigliamo di utilizzare il modulo exec dell'istruzione ENTRYPOINT:

    ENTRYPOINT ["executable", "param1", "param2"]

    Esempio:

    ENTRYPOINT ["python", "k_means_inference.py"]

    Il modulo exec dell'istruzione ENTRYPOINT avvia l'eseguibile direttamente, non come figlio di /bin/sh. Ciò gli consente di ricevere segnali simili SIGTERM e provenienti SIGKILL dalle operazioni dell' SageMaker API, il che è un requisito.

     

    Ad esempio, quando si utilizza l'CreateEndpointAPI per creare un endpoint, l' SageMaker intelligenza artificiale fornisce il numero di istanze di calcolo ML richieste dalla configurazione dell'endpoint, specificato nella richiesta. SageMaker L'IA esegue il contenitore Docker su tali istanze.

     

    Se riduci il numero di istanze che eseguono il backup dell'endpoint (chiamando l'UpdateEndpointWeightsAndCapacitiesAPI), SageMaker AI esegue un comando per arrestare il contenitore Docker sulle istanze che vengono terminate. Il comando invia il segnale SIGTERM e poi invia il segnale SIGKILL trenta secondi più tardi.

     

    Se aggiorni l'endpoint (chiamando l'UpdateEndpointAPI), SageMaker AI avvia un altro set di istanze di calcolo ML ed esegue i contenitori Docker che contengono il tuo codice di inferenza su di esse. Poi esegue un comando per interrompere il precedente container Docker. Per interrompere un container Docker, il comando invia il segnale SIGTERM e poi invia il segnale SIGKILL 30 secondi più tardi.

     

  • SageMaker AI utilizza la definizione del contenitore fornita nella CreateModelrichiesta per impostare le variabili di ambiente e il nome host DNS per il contenitore nel modo seguente:

     

    • Imposta le variabili di ambiente utilizzando la ContainerDefinition.Environment string-to-string mappa.

    • Imposta l'hostname DNS utilizzando ContainerDefinition.ContainerHostname.

       

  • Se prevedi di utilizzare dispositivi GPU per le inferenze di modelli (specificando le istanze di calcolo ML basate su GPU nella richiesta CreateEndpointConfig), accertati che i container siano compatibili con nvidia-docker. Non aggregare i driver NVIDIA con l'immagine. Per ulteriori informazioni su nvidia-docker, consulta NVIDIA/nvidia-docker.

     

  • Non puoi usare l'tiniinizializzatore come punto di ingresso nei contenitori SageMaker AI perché viene confuso dagli argomenti train andserve.

In che modo l' SageMaker IA carica gli artefatti del modello

Nella richiesta CreateModelAPI, puoi utilizzare il S3DataSource parametro ModelDataUrl o per identificare la posizione S3 in cui sono archiviati gli artefatti del modello. SageMaker L'intelligenza artificiale copia gli artefatti del modello dalla posizione S3 alla /opt/ml/model directory per essere utilizzati dal codice di inferenza. Il container ha accesso in sola lettura a /opt/ml/model. Non scrivere in questa directory.

Il valore di ModelDataUrl deve puntare a un file tar.gz. Altrimenti, l' SageMaker IA non scaricherà il file.

Se hai addestrato il tuo modello all' SageMaker intelligenza artificiale, gli artefatti del modello vengono salvati come un singolo file tar compresso in Amazon S3. Se hai addestrato il tuo modello al di fuori dell' SageMaker intelligenza artificiale, devi creare questo singolo file tar compresso e salvarlo in una posizione S3. SageMaker AI decomprime questo file tar nella directory/opt/ml/modelprima dell'avvio del contenitore.

Per distribuire modelli di grandi dimensioni, consigliamo di seguire Implementazione di modelli non compressi.

Come il tuo container deve rispondere alle richieste di inferenza

Per ottenere inferenze, l'applicazione client invia una richiesta POST all'endpoint AI. SageMaker SageMaker L'IA passa la richiesta al contenitore e restituisce il risultato dell'inferenza dal contenitore al client.

Per ulteriori informazioni sulle richieste di inferenza che il contenitore riceverà, consulta le seguenti azioni nel riferimento all'API di Amazon SageMaker AI:

Requisiti per i container di inferenza

Per rispondere alle richieste di inferenza, il container deve soddisfare i seguenti requisiti:

  • SageMaker AI rimuove tutte le POST intestazioni tranne quelle supportate da. InvokeEndpoint SageMaker L'IA potrebbe aggiungere intestazioni aggiuntive. I container dell'inferenza devono essere in grado di ignorare queste intestazioni aggiuntive.

  • Per ricevere le richieste di inferenza, il container deve disporre di un server Web in ascolto sulla porta 8080 e deve accettare le richieste POST agli endpoint /invocations e /ping.

  • I container di modello del cliente devono accettare le richieste di connessione socket entro 250 ms.

  • I container di modello del cliente devono rispondere alle richieste entro 60 secondi. Il modello stesso può avere un tempo di elaborazione massimo di 60 secondi prima di rispondere a /invocations. Se il modello impiega 50-60 secondi di tempo di elaborazione, il timeout del socket dell'SDK deve essere impostato su 70 secondi.

  • Il contenitore modello di un cliente che supporta lo streaming bidirezionale deve:

    • supportare WebSockets le connessioni sulla porta da 8080 a/invocations-bidirectional-stream per impostazione predefinita.

    • dispongono di un server Web in ascolto sulla porta 8080 e devono accettare le richieste POST agli endpoint /ping.

    • Oltre ai controlli dello stato del contenitore tramite HTTP, il contenitore deve rispondere con Pong Frame per (RFC6455), per l'invio di WebSocket Ping Frame.

Esempio funzioni di invocazione

Negli esempi seguenti viene illustrato come il codice nel container può elaborare richieste di inferenza. Questi esempi gestiscono le richieste inviate dalle applicazioni client utilizzando l' InvokeEndpoint azione.

FastAPI

FastAPI è un framework web per la creazione APIs con Python.

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

In questo esempio, la invocations funzione gestisce la richiesta di inferenza che l' SageMaker IA invia all'endpoint. /invocations

Flask

Flask è un framework per lo sviluppo di applicazioni web con Python.

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

In questo esempio, la invoke funzione gestisce la richiesta di inferenza che l' SageMaker IA invia all'endpoint. /invocations

Esempio funzioni di invocazione per le richieste di streaming

Gli esempi seguenti mostrano come il codice nel container di inferenza può elaborare le richieste di inferenza in streaming. Questi esempi gestiscono le richieste inviate dalle applicazioni client utilizzando l'azione InvokeEndpointWithResponseStream .

Quando un container gestisce una richiesta di inferenza in streaming, restituisce l'inferenza del modello in blocchi incrementali man mano che il modello le genera. Le applicazioni client iniziano a ricevere risposte immediatamente quando sono disponibili. Non è necessario attendere che il modello generi l'intera risposta. Puoi implementare lo streaming per supportare esperienze interattive veloci, come chatbot, assistenti virtuali e generatori di musica.

FastAPI

FastAPI è un framework web per la creazione APIs con Python.

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

In questo esempio, la invocations funzione gestisce la richiesta di inferenza che l' SageMaker IA invia all'endpoint. /invocations Per lo streaming della risposta, l'esempio utilizza la classe StreamingResponse dal framework Starlette.

Flask

Flask è un framework per lo sviluppo di applicazioni web con Python.

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

In questo esempio, la invocations funzione gestisce la richiesta di inferenza che l' SageMaker IA invia all'endpoint. /invocations Per lo streaming della risposta, l'esempio utilizza la funzione flask.stream_with_context del framework Flask.

Esempio Esempi di funzioni di invocazione per lo streaming bidirezionale

Gli esempi seguenti mostrano come il codice del contenitore può elaborare richieste e risposte di inferenza in streaming. Questi esempi gestiscono le richieste di streaming inviate dalle applicazioni client utilizzando l' InvokeEndpointWithBidirectionalStreamazione.

Un contenitore con funzionalità di streaming bidirezionale gestisce le richieste di inferenza in streaming in cui le parti vengono generate in modo incrementale sul client e trasmesse al contenitore. Restituisce l'inferenza del modello al client sotto forma di una serie di parti man mano che il modello le genera. Le applicazioni client iniziano a ricevere risposte immediatamente quando sono disponibili. Non è necessario attendere che la richiesta sia completamente generata dal client o che il modello generi l'intera risposta. Puoi implementare lo streaming bidirezionale per supportare esperienze interattive veloci, come chatbot, assistenti vocali interattivi di intelligenza artificiale e traduzioni in tempo reale per un'esperienza più in tempo reale.

FastAPI

FastAPI è un framework web per la creazione APIs con Python.

import sys import asyncio import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import uvicorn app = FastAPI() ... @app.websocket("/invocations-bidirectional-stream") async def websocket_invoke(websocket: WebSocket): """ WebSocket endpoint with RFC 6455 ping/pong and fragmentation support Handles: - Text messages (JSON) - including fragmented frames - Binary messages - including fragmented frames - Ping frames (automatically responds with pong) - Pong frames (logs receipt) - Fragmented frames per RFC 6455 Section 5.4 """ await manager.connect(websocket) # Fragment reassembly buffers per RFC 6455 Section 5.4 text_fragments = [] binary_fragments = [] while True: # Use receive() to handle all WebSocket frame types message = await websocket.receive() print(f"Received message: {message}") if message["type"] == "websocket.receive": if "text" in message: # Handle text frames (including fragments) text_data = message["text"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it text_fragments.append(text_data) print(f"Received text fragment: {len(text_data)} chars (more coming)") else: # This is the final frame or a complete message if text_fragments: # Reassemble fragmented message text_fragments.append(text_data) complete_text = "".join(text_fragments) text_fragments.clear() print(f"Reassembled fragmented text message: {len(complete_text)} chars total") await handle_text_message(websocket, complete_text) else: # Complete message in single frame await handle_text_message(websocket, text_data) elif "bytes" in message: # Handle binary frames (including fragments) binary_data = message["bytes"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it binary_fragments.append(binary_data) print(f"Received binary fragment: {len(binary_data)} bytes (more coming)") else: # This is the final frame or a complete message if binary_fragments: # Reassemble fragmented message binary_fragments.append(binary_data) complete_binary = b"".join(binary_fragments) binary_fragments.clear() print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total") await handle_binary_message(websocket, complete_binary) else: # Complete message in single frame await handle_binary_message(websocket, binary_data) elif message["type"] == "websocket.ping": # Handle ping frames - RFC 6455 Section 5.5.2 ping_data = message.get("bytes", b"") print(f"Received PING frame with payload: {ping_data}") # FastAPI automatically sends pong response elif message["type"] == "websocket.pong": # Handle pong frames pong_data = message.get("bytes", b"") print(f"Received PONG frame with payload: {pong_data}") elif message["type"] == "websocket.close": # Handle close frames - RFC 6455 Section 5.5.1 close_code = message.get("code", 1000) close_reason = message.get("reason", "") print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'") # Send close frame response if not already closing try: await websocket.close(code=close_code, reason=close_reason) print(f"Sent CLOSE frame response - Code: {close_code}") except Exception as e: print(f"Error sending close frame: {e}") break elif message["type"] == "websocket.disconnect": print("Client initiated disconnect") break else: print(f"Received unknown message type: {message['type']}") break async def handle_binary_message(websocket: WebSocket, binary_data: bytes): """Handle incoming binary messages (complete or reassembled from fragments)""" print(f"Processing complete binary message: {len(binary_data)} bytes") try: # Echo back the binary data await websocket.send_bytes(binary_data) except Exception as e: print(f"Error handling binary message: {e}") async def handle_text_message(websocket: WebSocket, data: str): """Handle incoming text messages""" try: # Send response back to the same client await manager.send_personal_message(data, websocket) except Exception as e: print(f"Error handling text message: {e}") def main(): if len(sys.argv) > 1 and sys.argv[1] == "serve": print("Starting server on port 8080...") uvicorn.run(app, host="0.0.0.0", port=8080) else: print("Usage: python app.py serve") sys.exit(1) if __name__ == "__main__": main()

In questo esempio, la websocket_invoke funzione gestisce la richiesta di inferenza che l' SageMaker IA invia all'endpoint. /invocations-bidirectional-stream Mostra la gestione delle richieste di streaming e lo streaming delle risposte al client.

Come il tuo container deve rispondere alle richieste di controllo dello stato (Ping)

SageMaker L'intelligenza artificiale lancia nuovi contenitori di inferenza nelle seguenti situazioni:

  • Risposta a CreateEndpoint UpdateEndpoint e chiamate API UpdateEndpointWeightsAndCapacities

  • Applicazione di patch di sicurezza

  • Sostituzione delle istanze non integre

Subito dopo l'avvio del contenitore, l' SageMaker IA inizia a inviare richieste GET periodiche all'endpoint. /ping

Il più semplice requisito per il container è di rispondere con un codice di stato HTTP 200 e un corpo vuoto. Ciò indica all' SageMaker IA che il contenitore è pronto ad accettare richieste di inferenza sull'endpoint. /invocations

Se il container non inizia a passare i controlli dell’integrità rispondendo costantemente con 200 secondi durante gli 8 minuti successivi all’avvio, l’avvio della nuova istanza ha esito negativo. Ciò causa un errore in CreateEndpoint e lascia l’endpoint in uno stato di errore. L’aggiornamento richiesto da UpdateEndpoint non viene completato, le patch di sicurezza non verranno applicate e le istanze non integre non vengono sostituite.

Nonostante la barra minima per il container è di fornire uno 200 statico, uno sviluppatore di container può utilizzare questa funzionalità per eseguire maggiori controlli. Il timeout della richiesta sui tentativi /ping è 2 secondi.

Inoltre, un contenitore in grado di gestire richieste di streaming bidirezionali deve rispondere con un Pong Frame (per WebSocket protocollo RFC6455) a un Ping Frame. Se non viene ricevuto alcun Pong Frame per 5 Ping consecutivi, la connessione al contenitore verrà chiusa dalla SageMaker piattaforma AI. SageMaker La piattaforma AI risponderà anche ai Ping Frames del container modello con Pong Frames.

Contratto container per supportare funzionalità di streaming bidirezionale

Se desideri ospitare il tuo contenitore modello come endpoint SageMaker AI che supporta funzionalità di streaming bidirezionale, il contenitore modello deve supportare il seguente contratto:

1. Etichetta Docker bidirezionale

Il contenitore del modello dovrebbe avere un'etichetta Docker che indichi alla piattaforma SageMaker AI che la funzionalità di streaming bidirezionale è supportata su questo contenitore.

com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true

2. Support WebSocket Connection per le chiamate

Il contenitore modello di un cliente che supporta lo streaming bidirezionale deve supportare per impostazione predefinita WebSockets le connessioni sulla porta 8080 to. /invocations-bidirectional-stream

Questo percorso può essere sovrascritto passando X-Amzn-SageMaker-Model l'intestazione -Invocation-Path quando si richiama l'API. InvokeEndpointWithBidirectionalStream Inoltre, gli utenti possono specificare una stringa di query da aggiungere a questo percorso passando l'intestazione -Query-String quando richiamano l'API. X-Amzn-SageMaker-Model InvokeEndpointWithBidirectionalStream

3. Richiedi la gestione dello stream

<Blob>I payload di input dell' InvokeEndpointWithBidirectionalStream API vengono trasmessi in streaming come una serie di PayloadParts, che è solo un wrapper di un blocco binario («Bytes»:):

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE> "P": <String> } }

3.1. Frame di dati

SageMaker AI passa l'input PayloadParts al contenitore del modello come frame di WebSocket dati (RFC6455-Section-5.6)

  1. SageMaker L'IA non ispeziona il blocco binario.

  2. Alla ricezione di un input PayloadPart

    • SageMaker L'IA crea esattamente un WebSocket Data Frame daPayloadPart.Bytes, quindi lo passa al contenitore del modello.

    • SePayloadPart.DataType = UTF8, SageMaker AI crea un frame di dati di testo

    • Se PayloadPart.DataType non presenta oPayloadPart.DataType = BINARY, SageMaker AI crea un Binary Data Frame

  3. Per una sequenza di PayloadParts con e terminata da un conPayloadPart.CompletionState = PARTIAL, l' SageMaker IA li traduce in un PayloadPart messaggio WebSocket frammentato RFC6455-Section-5.4: Frammentazione: PayloadPart.CompletionState = COMPLETE

    • La larghezza iniziale PayloadPart.CompletionState = PARTIAL verrà tradotta in un Data Frame, PayloadPart con FIN bit clear WebSocket .

    • Il PayloadParts with successivo PayloadPart.CompletionState = PARTIAL verrà tradotto in WebSocket Continuation Frames con FIN bit clear.

    • Il risultato finale PayloadPart PayloadPart.CompletionState = COMPLETE verrà tradotto in WebSocket Continuation Frame con bit FIN impostato.

  4. SageMaker L'IA non codifica o decodifica il blocco binario dall'input PayloadPart, i byte vengono passati al contenitore del modello così com'è.

  5. SageMaker L'intelligenza artificiale non combina più input in uno solo. PayloadParts BinaryDataFrame

  6. SageMaker L'intelligenza artificiale non suddivide un input PayloadPart in più BinaryDataFrames input.

Esempio: flusso di messaggi frammentato

Client sends: PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"} Container receives: Frame 1: Text Data Frame with "Hello " (FIN=0) Frame 2: Continuation Frame with "World" (FIN=1)

3.2. Telai di controllo

Oltre ai Data Frames, SageMaker AI invia anche Control Frames al contenitore del modello (RFC6455-Section-5.5):

  1. Close Frame: L' SageMaker IA può inviare Close Frame (RFC6455-Section-5.5.1) al contenitore del modello se la connessione viene chiusa per qualsiasi motivo.

  2. Ping Frame: l' SageMaker IA invia Ping Frame (RFC6455-Section-5.5.2) una volta ogni 60 secondi, il contenitore del modello deve rispondere con Pong Frame. Se non viene ricevuto alcun Pong Frame (RFC6455-Section-5.5.3) per 5 Ping consecutivi, la connessione verrà chiusa dall'IA. SageMaker

  3. Pong Frame: SageMaker AI risponderà ai Ping Frame del contenitore modello con Pong Frames.

4. Gestione del flusso di risposta

L'output viene trasmesso in streaming come una serie di PayloadParts, ModelStreamErrors o InternalStreamFailures.

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE>, }, "ModelStreamError": { "ErrorCode": <String>, "Message": <String> }, "InternalStreamFailure": { "Message": <String> } }

4.1. Frame di dati

SageMaker L'IA converte i frame di dati ricevuti dal contenitore del modello in output PayloadParts:

  1. Dopo aver ricevuto un WebSocket Text Data Frame dal contenitore del modello, l' SageMaker IA ottiene i byte grezzi dal Text Data Frame e li avvolge in una risposta PayloadPart, nel frattempo impostata. PayloadPart.DataType = UTF8

  2. Quando riceve un WebSocket Binary Data Frame dal contenitore del modello, l' SageMaker IA avvolge direttamente i byte dal frame di dati in una risposta, nel frattempo impostata. PayloadPart PayloadPart.DataType = BINARY

  3. Per i messaggi frammentati come definiti nella RFC6455 -Section-5.4: Frammentazione:

    • Il Data Frame iniziale con FIN bit clear verrà tradotto in un with. PayloadPart PayloadPart.CompletionState = PARTIAL

    • I successivi Continuation Frames con FIN bit clear verranno tradotti in PayloadParts withPayloadPart.CompletionState = PARTIAL.

    • Il Continuation Frame finale con set di bit FIN verrà tradotto in PayloadPart with. PayloadPart.CompletionState = COMPLETE

  4. SageMaker L'IA non codifica o decodifica i byte ricevuti dai contenitori del modello, i byte vengono passati al contenitore del modello così com'è.

  5. SageMaker L'IA non combina più frame di dati ricevuti dal contenitore del modello in un'unica risposta. PayloadPart

  6. SageMaker L'intelligenza artificiale non suddivide un Data Frame ricevuto dal contenitore del modello in più risposte PayloadParts.

Esempio: Streaming Response Flow

Container sends: Frame 1: Text Data Frame with "Generating" (FIN=0) Frame 2: Continuation Frame with " response..." (FIN=1) Client receives: PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}

4.2. Telai di controllo

SageMaker L'IA risponde ai seguenti Control Frames dal contenitore del modello:

  1. Dopo aver ricevuto un Close Frame (RFC6455-Section-5.5.1) dal contenitore del modello, SageMaker AI ModelStreamError inserirà il codice di stato (RFC6455-Section-7.4) e i messaggi di errore e lo trasmetterà all'utente finale.

  2. Alla ricezione di un Ping Frame (RFC6455-Section-5.5.2) dal contenitore del modello, l'IA risponderà con Pong Frame. SageMaker

  3. Pong Frame (RFC6455-Section-5.5.3): Se non viene ricevuto alcun Pong Frame per 5 ping consecutivi, la connessione verrà chiusa dall'IA. SageMaker