Invocare modelli per l’inferenza in tempo reale - Amazon SageMaker AI

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Invocare modelli per l’inferenza in tempo reale

Dopo aver utilizzato Amazon SageMaker AI per distribuire un modello su un endpoint, puoi interagire con il modello inviandogli richieste di inferenza. Per inviare una richiesta di inferenza a un modello, invoca l’endpoint che lo ospita. Puoi richiamare i tuoi endpoint utilizzando Amazon SageMaker Studio AWS SDKs, the o il. AWS CLI

Richiama il tuo modello utilizzando Amazon Studio SageMaker

Dopo aver distribuito il modello su un endpoint, puoi visualizzare l'endpoint tramite Amazon SageMaker Studio e testarlo inviando singole richieste di inferenza.

Nota

SageMaker L'intelligenza artificiale supporta i test degli endpoint in Studio solo per endpoint in tempo reale.

Per inviare una richiesta di inferenza di test al tuo endpoint
  1. Avvia Amazon SageMaker Studio.

  2. Nel riquadro di navigazione sinistro scegli Implementazioni.

  3. Dal menu a discesa, scegli Endpoint.

  4. Cerca il tuo endpoint per nome e scegli il nome nella tabella. I nomi degli endpoint elencati nel pannello Endpoint vengono definiti quando si distribuisce un modello. Lo spazio di lavoro Studio mostra la pagina Endpoint in una nuova scheda.

  5. Scegli la scheda Testa inferenza.

  6. Per Opzioni di test, seleziona una delle seguenti opzioni:

    1. Seleziona Testa la richiesta di esempio per inviare immediatamente una richiesta al tuo endpoint. Utilizza l’editor JSON per fornire dati di esempio in formato JSON e scegli Invia richiesta per inviare la richiesta al tuo endpoint. Dopo l’invio della richiesta, Studio mostra l’output dell’inferenza in una scheda a destra dell’editor JSON.

    2. Seleziona Usa codice di esempio Python SDK per visualizzare il codice per l’invio di una richiesta all’endpoint. Quindi, copia l’esempio di codice dalla sezione Richiesta di inferenza di esempio ed esegui il codice dal tuo ambiente di test.

La parte superiore della scheda mostra il tipo di richiesta che è stata inviata all'endpoint (è accettato solo JSON). La scheda mostra i seguenti campi:

  • Stato: visualizza uno dei seguenti tipi di stato:

    • Success: la richiesta è riuscita.

    • Failed: esito negativo della richiesta. Una risposta viene visualizzata in Motivo dell'errore.

    • Pending: mentre la richiesta di inferenza è in sospeso, lo stato mostra un'icona circolare rotante.

  • Durata dell'esecuzione: quanto tempo è durato l'invocazione (ora di fine meno l'ora di inizio) in millisecondi.

  • Ora della richiesta: quanti minuti sono trascorsi dall'invio della richiesta.

  • Ora del risultato: quanti minuti sono trascorsi dalla restituzione del risultato.

Invocare un modello mediante AWS SDK per Python (Boto3)

Se desideri richiamare un endpoint modello nel codice dell'applicazione, puoi utilizzarne uno AWSSDKs, incluso il. AWS SDK per Python (Boto3) Per invocare l’endpoint con questo SDK, utilizza uno dei seguenti metodi Python:

  • invoke_endpoint - Invia una richiesta di inferenza a un endpoint del modello e restituisce la risposta generata dal modello.

    Questo metodo restituisce il payload di inferenza come una risposta dopo che il modello ha terminato la sua generazione. Per ulteriori informazioni, consulta invoke_endpoint nella documentazione di riferimento dell’API per AWS SDK per Python (Boto3).

  • invoke_endpoint_with_response_stream - Invia una richiesta di inferenza a un endpoint del modello e trasmette la risposta in modo incrementale mentre il modello la genera.

    Con questo metodo, l’applicazione riceve parti della risposta non appena le parti diventano disponibili. Per ulteriori informazioni, consulta invoke_endpoint nella documentazione di riferimento dell’API per AWS SDK per Python (Boto3).

    Utilizza questo metodo solo per richiamare modelli che supportano lo streaming inferenziale.

Prima di poter utilizzare questi metodi nel codice dell'applicazione, è necessario inizializzare un client SageMaker AI Runtime e specificare il nome dell'endpoint. L'esempio seguente configura il client e l'endpoint per il resto degli esempi seguenti:

import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'

Richiamare per ottenere una risposta di inferenza

L'esempio seguente utilizza il metodo invoke_endpoint per richiamare un endpoint con AWS SDK per Python (Boto3).

# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))

Questo esempio fornisce dati di input nel Body campo affinché l' SageMaker IA li trasmetta al modello. Questi dati devono essere nello stesso formato utilizzato per l'addestramento. L’esempio assegna la risposta alla variabile response.

La variabile response fornisce l'accesso allo stato HTTP, al nome del modello distribuito e ad altri campi. Il seguente frammento di codice mostra il codice di stato HTTP:

print(response["HTTPStatusCode"])

Richiamare per trasmettere una risposta di inferenza

Se hai distribuito un modello che supporta lo streaming di inferenza, puoi richiamare il modello per ricevere il payload di inferenza sotto forma di flusso di parti. Il modello fornisce queste parti in modo incrementale man mano che il modello le genera. Quando un'applicazione riceve un flusso di inferenza, non deve attendere che il modello generi l'intero payload di risposta. Invece, l’applicazione riceve immediatamente parti della risposta non appena diventano disponibili.

Utilizzando un flusso di inferenza nell'applicazione, è possibile creare interazioni in cui gli utenti percepiscono l'inferenza come rapida perché ottengono immediatamente la prima parte. Puoi implementare lo streaming per supportare esperienze interattive veloci, come chatbot, assistenti virtuali e generatori di musica. Ad esempio, è possibile creare un chatbot che mostri in modo incrementale il testo generato da un modello di linguaggio di grandi dimensioni (LLM).

Per ottenere un flusso di inferenza, è possibile utilizzare il metodo invoke_endpoint_with_response_stream. Nel corpo della risposta, l'SDK fornisce un oggetto EventStream, che fornisce l'inferenza sotto forma di una serie di oggetti PayloadPart.

Esempio Flusso di inferenza

Di seguito è riportato un esempio di un flusso di oggetti PayloadPart:

{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .

In ogni parte del payload, il campo Bytes fornisce una parte della risposta di inferenza del modello. Questa parte può essere qualsiasi tipo di contenuto generato da un modello, ad esempio testo, immagini o dati audio. In questo esempio, le porzioni sono oggetti JSON che contengono testo generato da un LLM.

Di solito, la parte del payload contiene una parte discreta di dati del modello. In questo esempio i blocchi discreti sono oggetti JSON interi. Occasionalmente, la risposta in streaming divide i blocchi su più parti del payload oppure combina più blocchi in un'unica parte del payload. L'esempio seguente mostra un blocco di dati in formato JSON suddiviso in due parti del payload:

{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}

Quando scrivi codice applicativo che elabora un flusso di inferenza, includi la logica che gestisca queste suddivisioni e combinazioni occasionali di dati. Come strategia, potete scrivere codice che concatena i contenuti di Bytes mentre l'applicazione riceve le parti del payload. Concatenando qui i dati JSON di esempio, puoi combinare i dati in un corpo JSON delimitato da una nuova riga. Quindi, il codice potrebbe elaborare lo stream analizzando l'intero oggetto JSON su ogni riga.

L'esempio seguente mostra il codice JSON delimitato da nuove righe che creereste quando concatenate il contenuto di esempio di Bytes:

{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
Esempio Codice per elaborare un flusso di inferenza

Il seguente esempio di classe Python, SmrInferenceStream, dimostra come è possibile elaborare un flusso di inferenza che invia dati di testo in formato JSON:

import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]

Questo esempio elabora il flusso di inferenza effettuando le seguenti operazioni:

  • Inizializza un client SageMaker AI Runtime e imposta il nome di un endpoint del modello. Prima di poter ottenere un flusso di inferenza, il modello ospitato dall'endpoint deve supportare lo streaming di inferenza.

  • Nel metodo stream_inference di esempio, riceve il corpo della richiesta e lo passa al metodo invoke_endpoint_with_response_stream dell'SDK.

  • Itera su ogni evento nell'oggetto EventStream restituito dall'SDK.

  • Da ogni evento, ottiene il contenuto dell'oggetto Bytes nell'oggetto PayloadPart.

  • Nel metodo _write di esempio, scrive in un buffer per concatenare il contenuto degli oggetti Bytes. I contenuti combinati formano un corpo JSON delimitato da una nuova riga.

  • Utilizza il metodo _readlines di esempio per ottenere una serie iterabile di oggetti JSON.

  • In ogni oggetto JSON, ottiene una parte dell'inferenza.

  • Con l'espressione yield, restituisce i pezzi in modo incrementale.

L'esempio seguente crea e utilizza un oggetto SmrInferenceStream:

request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')

Questo esempio passa un corpo della richiesta al metodo stream_inference. Esegue un'iterazione sulla risposta per stampare ogni pezzo restituito dal flusso di inferenza.

L'esempio presuppone che il modello nell'endpoint specificato sia un LLM che genera testo. L'output di questo esempio è un corpo di testo generato che viene stampato in modo incrementale:

a challenging problem in machine learning. The goal is to . . .

Invocare un modello mediante AWS CLI

È possibile richiamare l'endpoint del modello eseguendo comandi con (). AWS Command Line Interface AWS CLI AWS CLI supporta richieste di inferenza standard con il comando invoke-endpoint e supporta richieste di inferenza asincrone con il comando invoke-endpoint-async.

Nota

AWS CLINon supporta le richieste di inferenza in streaming.

L'esempio seguente utilizza il comando invoke-endpoint per inviare una richiesta di inferenza a un endpoint del modello:

aws sagemaker-runtime invoke-endpoint \ --endpoint-name endpoint_name \ --body fileb://$file_name \ output_file.txt

Per il parametro --endpoint-name fornisci il nome specificato quando hai creato l’endpoint. Per il --body parametro, fornisci i dati di input che l' SageMaker IA deve passare al modello. I dati devono essere nello stesso formato utilizzato per l'addestramento. Questo esempio mostra come inviare dati binari all'endpoint.

Per ulteriori informazioni su quando utilizzare file:// over fileb:// quando si passa il contenuto di un file a un parametro diAWS CLI, vedere Best Practices for Local File Parameters.

Per ulteriori informazioni e per visualizzare i parametri aggiuntivi che è possibile passare, consulta invoke-endpoint nel Riferimento al comando AWS CLI.

Se il comando invoke-endpoint ha esito positivo, restituisce una risposta come la seguente:

{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }

Se il comando non ha esito positivo, controlla se il payload di input è nel formato corretto.

Visualizza l'output della chiamata controllando il file di output del file (output_file.txt in questo esempio).

more output_file.txt

Invoca il tuo modello usando l'AWSSDK per Python

Invoca per trasmettere in streaming bidirezionale una richiesta e una risposta di inferenza

Se desideri richiamare un endpoint modello nel codice dell'applicazione per supportare lo streaming bidirezionale, puoi utilizzare il nuovo SDK sperimentale per Python che supporta la funzionalità di streaming bidirezionale con supporto HTTP/2. Questo SDK consente una comunicazione bidirezionale in tempo reale tra l'applicazione client e l' SageMaker endpoint, consentendoti di inviare richieste di inferenza in modo incrementale e ricevere contemporaneamente risposte in streaming man mano che il modello le genera. Ciò è particolarmente utile per le applicazioni interattive in cui sia il client che il server devono scambiarsi dati in modo continuo tramite una connessione persistente.

Nota

Il nuovo SDK sperimentale è diverso dall'SDK Boto3 standard e supporta connessioni bidirezionali persistenti per lo scambio di dati. Durante l'utilizzo dell'SDK Python sperimentale, consigliamo vivamente di associare rigorosamente una versione dell'SDK per qualsiasi caso d'uso non sperimentale.

Per richiamare il tuo endpoint con streaming bidirezionale, usa il metodo. invoke_endpoint_with_bidirectional_stream Questo metodo stabilisce una connessione persistente che consente di trasmettere più blocchi di payload al modello mentre si ricevono risposte in tempo reale mentre il modello elabora i dati. La connessione rimane aperta finché non si chiude esplicitamente il flusso di input o finché l'endpoint non chiude la connessione, supportando fino a 30 minuti di tempo di connessione.

Prerequisiti

Prima di poter utilizzare lo streaming bidirezionale nel codice dell'applicazione, devi:

  1. Installa l'SDK sperimentale SageMaker Runtime HTTP/2

  2. Imposta le AWS credenziali per il tuo client Runtime SageMaker

  3. Implementa un modello che supporti lo streaming bidirezionale su un endpoint SageMaker

Configura il client di streaming bidirezionale

L'esempio seguente mostra come inizializzare i componenti richiesti per lo streaming bidirezionale:

from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)

Client di streaming bidirezionale completo

L'esempio seguente dimostra come creare un client di streaming bidirezionale che invia più payload di testo a un SageMaker endpoint ed elabora le risposte in tempo reale:

import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())

Il client inizializza il client SageMaker Runtime HTTP/2 con l'URI dell'endpoint regionale sulla porta 8443, necessario per le connessioni di streaming bidirezionali. Il session() metodo start_ chiama invoke_endpoint_with_bidirectional_stream() per stabilire la connessione persistente e crea un'attività asincrona per elaborare contemporaneamente le risposte in arrivo.

Il send_event() metodo racchiude i dati del payload negli oggetti di richiesta appropriati e li invia tramite il flusso di input, mentre ascolta ed elabora continuamente le risposte dall'_process_responses()endpoint non appena arrivano. Questo approccio bidirezionale consente l'interazione in tempo reale in cui sia l'invio delle richieste che la ricezione delle risposte avvengono contemporaneamente sulla stessa connessione.