Invocation de modèles pour une inférence en temps réel - Amazon SageMaker AI

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Invocation de modèles pour une inférence en temps réel

Après avoir utilisé Amazon SageMaker AI pour déployer un modèle sur un point de terminaison, vous pouvez interagir avec le modèle en lui envoyant des demandes d'inférence. Pour envoyer une demande d’inférence à un modèle, vous invoquez le point de terminaison qui l’héberge. Vous pouvez appeler vos points de terminaison à l'aide d'Amazon SageMaker Studio, du AWS SDKs, ou duAWS CLI.

Invoquez votre modèle à l'aide d'Amazon SageMaker Studio

Après avoir déployé votre modèle sur un point de terminaison, vous pouvez consulter le point de terminaison via Amazon SageMaker Studio et tester votre point de terminaison en envoyant des demandes d'inférence uniques.

Note

SageMaker L'IA ne prend en charge les tests de terminaux dans Studio que pour les points de terminaison en temps réel.

Pour envoyer une demande d’inférence de test à votre point de terminaison
  1. Lancez Amazon SageMaker Studio.

  2. Dans le panneau de navigation de gauche, sélectionnez Déploiements.

  3. Dans le menu déroulant, sélectionnez Endpoints (Points de terminaison).

  4. Recherchez votre point de terminaison par son nom, puis choisissez-le dans le tableau. Les noms de point de terminaison répertoriés dans le panneau Points de terminaison sont définis lorsque vous déployez un modèle. L’espace de travail Studio ouvre la page Point de terminaison dans un nouvel onglet.

  5. Sélectionnez l’onglet Tester l’inférence.

  6. Dans Options de test, sélectionnez l’une des options suivantes :

    1. Sélectionnez Tester l’exemple de demande pour envoyer immédiatement une demande à votre point de terminaison. Utilisez l’éditeur JSON pour fournir des exemples de données au format JSON, puis choisissez Envoyer la demande pour envoyer la demande à votre point de terminaison. Après l’envoi de votre demande, Studio affiche le résultat de l’inférence dans une carte à droite de l’éditeur JSON.

    2. Sélectionnez Utiliser un exemple de code du kit SDK Python pour afficher le code permettant d’envoyer une demande au point de terminaison. Copiez ensuite l’exemple de code depuis la section Exemple de demande d’inférence et exécutez le code depuis votre environnement de test.

Le haut de la carte affiche le type de demande qui a été envoyée au point de terminaison (seul JSON est accepté). La carte affiche les champs suivants :

  • Statut : affiche l’un des types de statut suivants :

    • Success : la demande a réussi.

    • Failed : la demande a échoué. Une réponse apparaît sous Motif de l’échec.

    • Pending : une icône circulaire et en rotation apparaît pendant que la demande d’inférence est en attente.

  • Longueur d’exécution : durée de l’invocation (heure de fin moins l’heure de début) en millisecondes.

  • Durée de la demande : nombre de minutes qui se sont écoulées depuis l’envoi de la demande.

  • Durée du résultat : nombre de minutes qui se sont écoulées depuis le renvoi du résultat.

Invocation de votre modèle à l’aide du kit AWS SDK pour Python (Boto3)

Si vous souhaitez invoquer un point de terminaison modèle dans le code de votre application, vous pouvez utiliser l'un des AWSSDKs, notamment leAWS SDK pour Python (Boto3). Pour invoquer votre point de terminaison à l’aide de ce kit SDK, vous devez utiliser l’une des méthodes Python suivantes :

  • invoke_endpoint : envoie une demande d’inférence à un point de terminaison du modèle et renvoie la réponse générée par le modèle.

    Cette méthode renvoie les données utiles d’inférence sous la forme d’une réponse une fois que le modèle a fini de la générer. Pour plus d’informations, consultez invoke_endpoint dans la Référence des API du kit AWS SDK pour Python (Boto).

  • invoke_endpoint_with_response_stream : envoie une demande d’inférence à un point de terminaison du modèle et diffuse la réponse de manière incrémentielle pendant que le modèle la génère.

    Avec cette méthode, votre application reçoit des parties de la réponse dès qu’elles sont disponibles. Pour plus d’informations, consultez invoke_endpoint dans la Référence des API du kit AWS SDK pour Python (Boto).

    Utilisez cette méthode uniquement pour invoquer des modèles qui prennent en charge le streaming d’inférence.

Avant de pouvoir utiliser ces méthodes dans le code de votre application, vous devez initialiser un client SageMaker AI Runtime et spécifier le nom de votre point de terminaison. L’exemple suivant configure le client et le point de terminaison pour les autres exemples suivants :

import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'

Invocation pour obtenir une réponse d’inférence

L’exemple suivant utilise la méthode invoke_endpoint pour invoquer un point de terminaison avec le kit AWS SDK pour Python (Boto3) :

# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))

Cet exemple fournit des données d'entrée dans le Body champ pour que l' SageMaker IA les transmette au modèle. Ces données doivent être dans le même format que celui utilisé pour l’entraînement. L’exemple assigne la réponse à la variable response.

La variable response permet d’accéder au statut HTTP, au nom du modèle déployé et à d’autres champs. L’extrait de code suivant imprime le code de statut HTTP :

print(response["HTTPStatusCode"])

Invoquer pour diffuser une réponse d’inférence

Si vous avez déployé un modèle qui prend en charge le streaming d’inférence, vous pouvez invoquer le modèle pour recevoir ses données utiles d’inférence sous forme de flux de parties. Le modèle fournit ces pièces progressivement au fur et à mesure qu’il les génère. Lorsqu’une application reçoit un flux d’inférence, elle n’a pas besoin d’attendre que le modèle génère la totalité des données utiles de réponse. Au lieu de cela, l’application reçoit immédiatement des parties de la réponse dès qu’elles sont disponibles.

En consommant un flux d’inférence dans votre application, vous pouvez créer des interactions dans lesquelles vos utilisateurs perçoivent l’inférence comme étant rapide, car ils obtiennent immédiatement la première partie. Vous pouvez mettre en œuvre le streaming pour prendre en charge des expériences interactives rapides, telles que les chatbots, les assistants virtuels et les générateurs de musique. Par exemple, vous pouvez créer un chatbot qui affiche progressivement le texte généré par un grand modèle de langage (LLM).

Pour obtenir un flux d’inférence, vous pouvez utiliser la méthode invoke_endpoint_with_response_stream. Dans le corps de la réponse, le kit SDK fournit un objet EventStream, qui donne l’inférence sous la forme d’une série d’objets PayloadPart.

Exemple Flux d’inférence

L’exemple suivant est un flux d’objets PayloadPart :

{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .

Dans chaque partie des données utiles, le champ Bytes fournit une partie de la réponse d’inférence du modèle. Cette partie peut être n’importe quel type de contenu généré par un modèle, tel que du texte, des images ou des données audio. Dans cet exemple, les parties sont des objets JSON contenant du texte généré à partir d’un LLM.

En général, la partie de la charge utile contient un fragment discret de données du modèle. Dans cet exemple, les fragments discrets sont des objets JSON entiers. Parfois, la réponse de streaming divise les fragments sur plusieurs parties de la charge utile, ou elle combine plusieurs fragments en une seule partie de charge utile. L’exemple suivant montre un fragment de données au format JSON divisé en deux parties de données utiles :

{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}

Lorsque vous écrivez du code d’application qui traite un flux d’inférence, incluez une logique qui gère ces divisions et combinaisons de données occasionnelles. Une stratégie consisterait à écrire du code qui concatène le contenu d’Bytes pendant que votre application reçoit les parties des données utiles. En concaténant les données JSON d’exemple ici, vous combineriez les données dans un corps JSON délimité par de nouvelles lignes. Ensuite, votre code pourrait traiter le flux en analysant l’ensemble de l’objet JSON sur chaque ligne.

L’exemple suivant montre le JSON délimité par de nouvelles lignes que vous créeriez lorsque vous concaténez le contenu de l’exemple d’Bytes :

{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
Exemple Code pour traiter un flux d’inférence

L’exemple de classe Python suivant, SmrInferenceStream, montre comment traiter un flux d’inférence qui envoie des données texte au format JSON :

import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]

Cet exemple traite le flux d’inférence en procédant comme suit :

  • Initialise un client SageMaker AI Runtime et définit le nom d'un point de terminaison modèle. Avant de pouvoir obtenir un flux d’inférence, le modèle que le point de terminaison héberge doit prendre en charge le streaming d’inférence.

  • Dans l’exemple de méthode stream_inference, il reçoit le corps d’une demande et le transmet à la méthode invoke_endpoint_with_response_stream du kit SDK.

  • Il itère sur chaque événement de l’objet EventStream renvoyé par le kit SDK.

  • À partir de chaque événement, il obtient le contenu de l’objet Bytes dans l’objet PayloadPart.

  • Dans l’exemple de méthode _write, il écrit dans un tampon pour concaténer le contenu des objets Bytes. Le contenu combiné forme un corps JSON délimité par de nouvelles lignes.

  • Il utilise l’exemple de méthode _readlines pour obtenir une série itérable d’objets JSON.

  • Dans chaque objet JSON, il obtient une partie de l’inférence.

  • Avec l’expression yield, il renvoie les pièces de manière incrémentielle.

L’exemple suivant crée et utilise un objet SmrInferenceStream :

request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')

Cet exemple transmet un corps de demande à la méthode stream_inference. Il itère la réponse pour imprimer chaque élément renvoyé par le flux d’inférence.

L’exemple suppose que le modèle au point de terminaison spécifié est un LLM qui génère du texte. Le résultat de cet exemple est un corps de texte généré qui s’imprime de manière incrémentielle :

a challenging problem in machine learning. The goal is to . . .

Invocation de votre modèle à l’aide du kit AWS CLI

Vous pouvez appeler le point de terminaison de votre modèle en exécutant des commandes avec le AWS Command Line Interface (AWS CLI). L’AWS CLI prend en charge les demandes d’inférence standard avec la commande invoke-endpoint et prend en charge les demandes d’inférence asynchrones avec la commande invoke-endpoint-async.

Note

Le AWS CLI ne prend pas en charge les demandes d'inférence en streaming.

L’exemple suivant utilise la commande invoke-endpoint pour envoyer une demande d’inférence à un point de terminaison du modèle :

aws sagemaker-runtime invoke-endpoint \ --endpoint-name endpoint_name \ --body fileb://$file_name \ output_file.txt

Pour le paramètre --endpoint-name, indiquez le nom de point de terminaison que vous avez spécifié lors de sa création. Pour le --body paramètre, fournissez les données d'entrée que l' SageMaker IA doit transmettre au modèle. Les données doivent être dans le même format que celui utilisé pour l’entraînement. Cet exemple montre comment envoyer des données binaires à votre point de terminaison.

Pour plus d'informations sur les circonstances dans lesquelles utiliser le file:// over fileb:// lors du transfert du contenu d'un fichier à un paramètre duAWS CLI, consultez la section Meilleures pratiques relatives aux paramètres de fichiers locaux.

Pour plus d’informations et pour voir les paramètres supplémentaires que vous pouvez transmettre, consultez invoke-endpoint dans la Référence des commandes de l’AWS CLI.

Si la commande invoke-endpoint réussit, elle renvoie une réponse telle que la suivante :

{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }

Si la commande échoue, vérifiez si le format des données utiles d’entrée est correct.

Affichez la sortie de l’invocation en vérifiant le fichier de sortie du fichier (output_file.txt dans cet exemple).

more output_file.txt

Appelez votre modèle à l'aide du AWS SDK pour Python

Invoquer pour diffuser de manière bidirectionnelle une demande et une réponse d'inférence

Si vous souhaitez invoquer un point de terminaison modèle dans le code de votre application pour prendre en charge le streaming bidirectionnel, vous pouvez utiliser le nouveau SDK expérimental pour Python qui prend en charge la fonctionnalité de streaming bidirectionnel avec prise en charge HTTP/2. Ce SDK permet une communication bidirectionnelle en temps réel entre votre application cliente et le SageMaker point de terminaison, ce qui vous permet d'envoyer des demandes d'inférence de manière incrémentielle tout en recevant simultanément des réponses en streaming au fur et à mesure que le modèle les génère. Cela est particulièrement utile pour les applications interactives où le client et le serveur doivent échanger des données en continu via une connexion permanente.

Note

Le nouveau SDK expérimental est différent du SDK Boto3 standard et prend en charge les connexions bidirectionnelles persistantes pour l'échange de données. Lors de l'utilisation du SDK Python expérimental, nous vous conseillons vivement d'épingler strictement une version du SDK pour tous les cas d'utilisation non expérimentaux.

Pour appeler votre point de terminaison avec le streaming bidirectionnel, utilisez la invoke_endpoint_with_bidirectional_stream méthode. Cette méthode établit une connexion permanente qui vous permet de diffuser plusieurs segments de charge utile vers votre modèle tout en recevant des réponses en temps réel lorsque le modèle traite les données. La connexion reste ouverte jusqu'à ce que vous fermiez explicitement le flux d'entrée ou que le terminal ferme la connexion, ce qui permet une durée de connexion maximale de 30 minutes.

Conditions préalables

Avant de pouvoir utiliser le streaming bidirectionnel dans le code de votre application, vous devez :

  1. Installez le SDK expérimental SageMaker Runtime HTTP/2

  2. Configuration des AWS informations d'identification pour votre client SageMaker Runtime

  3. Déployez un modèle qui prend en charge le streaming bidirectionnel vers un terminal SageMaker

Configuration du client de streaming bidirectionnel

L'exemple suivant montre comment initialiser les composants requis pour le streaming bidirectionnel :

from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)

Client de streaming bidirectionnel complet

L'exemple suivant montre comment créer un client de streaming bidirectionnel qui envoie plusieurs charges utiles de texte à un SageMaker point de terminaison et traite les réponses en temps réel :

import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())

Le client initialise le client SageMaker Runtime HTTP/2 avec l'URI du point de terminaison régional sur le port 8443, qui est requis pour les connexions de streaming bidirectionnelles. La session() méthode start_ appelle invoke_endpoint_with_bidirectional_stream() pour établir la connexion persistante et crée une tâche asynchrone pour traiter les réponses entrantes simultanément.

La send_event() méthode enveloppe les données de charge utile dans les objets de demande appropriés et les envoie via le flux d'entrée, tandis que la _process_responses() méthode écoute et traite en permanence les réponses du point de terminaison au fur et à mesure qu'elles arrivent. Cette approche bidirectionnelle permet une interaction en temps réel où l'envoi de demandes et la réception de réponses se font simultanément via la même connexion.