Invocación de modelos para realizar inferencias en tiempo real - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Invocación de modelos para realizar inferencias en tiempo real

Después de usar Amazon SageMaker AI para implementar un modelo en un punto final, puede interactuar con el modelo enviándole solicitudes de inferencia. Para enviar una solicitud de inferencia a un modelo, debe invocar el punto de conexión que lo aloja. Puede invocar sus puntos de enlace mediante Amazon SageMaker Studio AWS SDKs, el o el. AWS CLI

Invoca tu modelo con Amazon Studio SageMaker

Después de implementar el modelo en un punto final, puede ver el punto final a través de Amazon SageMaker Studio y probarlo enviando solicitudes de inferencia únicas.

nota

SageMaker La IA solo admite las pruebas de puntos de enlace en Studio para puntos de enlace en tiempo real.

Para enviar una solicitud de inferencia de prueba a su punto de conexión
  1. Abre Amazon SageMaker Studio.

  2. En el panel de navegación de la izquierda, elija Implementaciones.

  3. En el menú desplegable, selecciona Puntos de conexión.

  4. Busque su punto de conexión por nombre y elija el nombre en la tabla. Los nombres de los puntos de conexión que aparecen en el panel Puntos de conexión se definen al implementar un modelo. El espacio de trabajo de Studio abre la página Punto de conexión en una pestaña nueva.

  5. Seleccione la pestaña Probar inferencia.

  6. En Opciones de prueba, seleccione una de las siguientes opciones:

    1. Seleccione Probar la solicitud de ejemplo para enviar inmediatamente una solicitud al punto de conexión. Utilice la opción Editor JSON para proporcionar datos de ejemplo en formato JSON y elija Enviar solicitud para enviar la solicitud al punto de conexión. Después de enviar la solicitud, Studio muestra el resultado de la inferencia en una tarjeta a la derecha del editor JSON.

    2. Seleccione Usar código de ejemplo del Python SDK para ver el código para enviar una solicitud al punto de conexión. A continuación, copie el ejemplo de código de la sección Ejemplo de solicitud de inferencia y ejecute el código en el entorno de pruebas.

La parte superior de la tarjeta muestra el tipo de solicitud que se envió al punto de conexión (solo se acepta JSON). La tarjeta muestra los campos siguientes:

  • Estado: muestra uno de los siguientes tipos de estado:

    • Success: la solicitud se ha realizado correctamente.

    • Failed: error en la solicitud. Aparece una respuesta en Motivo del error.

    • Pending: mientras la solicitud de inferencia está pendiente, el estado muestra un icono circular giratorio.

  • Duración de la ejecución: tiempo que duró la invocación (hora de finalización menos hora de inicio) en milisegundos.

  • Tiempo de solicitud: cuántos minutos han transcurrido desde que se envió la solicitud.

  • Tiempo de resultado: cuántos minutos han transcurrido desde que se envió la solicitud.

Invoque su modelo mediante el AWS SDK para Python (Boto3)

Si desea invocar un punto final del modelo en el código de su aplicación, puede utilizar uno de los AWS SDKs, incluido el. AWS SDK para Python (Boto3) Para invocar el punto de conexión con este SDK, utilice uno de los siguientes métodos de Python:

  • invoke_endpoint: envía una solicitud de inferencia a un punto de conexión del modelo y devuelve la respuesta que genera el modelo.

    Este método devuelve la carga útil de inferencia como respuesta cuando el modelo termina de generarla. Para obtener más información, consulte invoke_endpoint en la Referencia de la API del SDK AWS para Python (Boto3).

  • invoke_endpoint_with_response_stream: envía una solicitud de inferencia a un punto de conexión del modelo y transmite la respuesta de forma incremental mientras el modelo la genera.

    Con este método, la aplicación recibe partes de la respuesta a medida que están disponibles. Para obtener más información, consulte invoke_endpoint en la Referencia de la API del SDK AWS para Python (Boto3).

    Utilice este método únicamente para invocar modelos que admitan streaming de inferencias.

Antes de poder utilizar estos métodos en el código de la aplicación, debe inicializar un cliente de SageMaker AI Runtime y especificar el nombre del punto final. En el siguiente ejemplo, se configuran el cliente y el punto de conexión para el resto de los ejemplos:

import boto3 sagemaker_runtime = boto3.client( "sagemaker-runtime", region_name='aws_region') endpoint_name='endpoint-name'

Invocar para obtener una respuesta de inferencia

En el siguiente ejemplo se utiliza el método invoke_endpoint para invocar un punto de conexión con AWS SDK para Python (Boto3):

# Gets inference from the model hosted at the specified endpoint: response = sagemaker_runtime.invoke_endpoint( EndpointName=endpoint_name, Body=bytes('{"features": ["This is great!"]}', 'utf-8') ) # Decodes and prints the response body: print(response['Body'].read().decode('utf-8'))

En este ejemplo, se proporcionan datos de entrada en el Body campo para que la SageMaker IA los pase al modelo. Estos datos deben tener el mismo formato que se utilizó para el entrenamiento. El ejemplo almacena la respuesta en la variable response.

La variable response proporciona acceso al estado HTTP, al nombre del modelo implementado y a otros campos. El siguiente fragmento de código imprime el código del estado HTTP:

print(response["HTTPStatusCode"])

Invocar para transmitir una respuesta de inferencia

Si implementó un modelo que admite streaming de inferencias, puede invocar el modelo para recibir su carga útil de inferencia como un flujo de partes. El modelo entrega estas partes de forma incremental a medida que las genera. Cuando una aplicación recibe un flujo de inferencia, no necesita esperar a que el modelo genere toda la carga útil de respuesta. Por el contrario, la aplicación recibe inmediatamente partes de la respuesta a medida que están disponibles.

Al consumir un flujo de inferencia en la aplicación, puede crear interacciones en las que los usuarios perciban que la inferencia es rápida, ya que obtienen la primera parte de forma inmediata. Puede implementar la transmisión en streaming para que admita experiencias interactivas rápidas, como chatbots, asistentes virtuales y generadores de música. Por ejemplo, puede crear un chatbot que muestre de forma incremental el texto generado por un modelo de lenguaje grande (LLM).

Para obtener una transmisión de inferencia, puede usar el método invoke_endpoint_with_response_stream. En el cuerpo de la respuesta, el SDK proporciona un objeto EventStream, que proporciona la inferencia en forma de una serie de objetos PayloadPart.

ejemplo Flujo de inferencia

A continuación se muestra un ejemplo de objetos PayloadPart:

{'PayloadPart': {'Bytes': b'{"outputs": [" a"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" challenging"]}\n'}} {'PayloadPart': {'Bytes': b'{"outputs": [" problem"]}\n'}} . . .

En cada parte de la carga útil, el campo Bytes proporciona una parte de la respuesta de inferencia del modelo. Esta parte puede ser cualquier tipo de contenido que genere un modelo, como datos de texto, imagen o audio. En este ejemplo, las partes son objetos JSON que contienen texto generado a partir de un LLM.

La parte de carga útil suele contener un fragmento discreto de datos del modelo. En este ejemplo, los fragmentos discretos son objetos JSON completos. Algunas veces, la respuesta de streaming divide los fragmentos en varias partes de la carga útil o combina varios fragmentos en una sola parte de la carga útil. En el siguiente ejemplo se muestra un fragmento de datos en formato JSON dividido en dos partes de carga útil:

{'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}}

Cuando escriba el código de una aplicación que procesa un flujo de inferencia, incluya la lógica que gestiona estas divisiones y combinaciones ocasionales de los datos. Como estrategia, puede escribir código que concatene el contenido de Bytes mientras la aplicación recibe las partes de la carga. Al concatenar aquí los datos JSON del ejemplo, se combinan los datos en un cuerpo JSON delimitado por nuevas líneas. A continuación, el código podría procesar el flujo analizando todo el objeto JSON en cada línea.

El siguiente ejemplo muestra el JSON delimitado por nuevas líneas que crearía al concatenar el contenido del ejemplo de Bytes:

{"outputs": [" a"]} {"outputs": [" challenging"]} {"outputs": [" problem"]} . . .
ejemplo Código para procesar un flujo de inferencia

El siguiente ejemplo de clase de Python, SmrInferenceStream, demuestra cómo se puede procesar un flujo de inferencia que envía datos de texto en formato JSON:

import io import json # Example class that processes an inference stream: class SmrInferenceStream: def __init__(self, sagemaker_runtime, endpoint_name): self.sagemaker_runtime = sagemaker_runtime self.endpoint_name = endpoint_name # A buffered I/O stream to combine the payload parts: self.buff = io.BytesIO() self.read_pos = 0 def stream_inference(self, request_body): # Gets a streaming inference response # from the specified model endpoint: response = self.sagemaker_runtime\ .invoke_endpoint_with_response_stream( EndpointName=self.endpoint_name, Body=json.dumps(request_body), ContentType="application/json" ) # Gets the EventStream object returned by the SDK: event_stream = response['Body'] for event in event_stream: # Passes the contents of each payload part # to be concatenated: self._write(event['PayloadPart']['Bytes']) # Iterates over lines to parse whole JSON objects: for line in self._readlines(): resp = json.loads(line) part = resp.get("outputs")[0] # Returns parts incrementally: yield part # Writes to the buffer to concatenate the contents of the parts: def _write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) # The JSON objects in buffer end with '\n'. # This method reads lines to yield a series of JSON objects: def _readlines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): self.read_pos += len(line) yield line[:-1]

En este ejemplo se procesa el flujo de inferencia de la siguiente manera:

  • Inicializa un cliente SageMaker AI Runtime y establece el nombre del punto final del modelo. Para poder obtener un flujo de inferencia, el modelo que aloja el punto de conexión debe admitir streaming de inferencia.

  • En el método stream_inference de ejemplo, recibe un cuerpo de solicitud y lo pasa al método invoke_endpoint_with_response_stream del SDK.

  • Itera cada evento del objeto EventStream que devuelve el SDK.

  • De cada evento, obtiene el contenido del objeto Bytes en el objeto PayloadPart.

  • En el método _write de ejemplo, escribe en un búfer para concatenar el contenido de los objetos Bytes. El contenido combinado forma un cuerpo JSON delimitado por nuevas líneas.

  • Utiliza el método _readlines de ejemplo para obtener una serie iterable de objetos JSON.

  • En cada objeto JSON, obtiene una parte de la inferencia.

  • Con la expresión yield, devuelve las partes de forma incremental.

El siguiente ejemplo crea y utiliza un objeto SmrInferenceStream:

request_body = {"inputs": ["Large model inference is"], "parameters": {"max_new_tokens": 100, "enable_sampling": "true"}} smr_inference_stream = SmrInferenceStream( sagemaker_runtime, endpoint_name) stream = smr_inference_stream.stream_inference(request_body) for part in stream: print(part, end='')

Este ejemplo pasa un cuerpo de solicitud al método stream_inference. Itera la respuesta para imprimir cada parte que devuelve el flujo de inferencia.

En el ejemplo se da por sentado que el modelo del punto de conexión especificado es un LLM que genera texto. El resultado de este ejemplo es un cuerpo de texto generado que se imprime de forma incremental:

a challenging problem in machine learning. The goal is to . . .

Invoque su modelo mediante el AWS CLI

Puede invocar el punto final de su modelo ejecutando comandos con la tecla AWS Command Line Interface ()AWS CLI. AWS CLI admite solicitudes de inferencia estándar con el comando invoke-endpoint y admite solicitudes de inferencia asincrónicas con el comando invoke-endpoint-async.

nota

AWS CLI No admite solicitudes de inferencia de transmisión.

En el siguiente ejemplo se utiliza el comando invoke-endpoint para enviar una solicitud de inferencia a un punto de conexión del modelo:

aws sagemaker-runtime invoke-endpoint \ --endpoint-name endpoint_name \ --body fileb://$file_name \ output_file.txt

En el parámetro --endpoint-name, proporcione el nombre del punto de conexión que especificó al crearlo. Para el --body parámetro, proporcione los datos de entrada para que la SageMaker IA los pase al modelo. Los datos deben tener el mismo formato que se utilizó para el entrenamiento. En este ejemplo se muestra cómo enviar datos binarios a su punto de conexión.

Para obtener más información sobre cuándo usar file:// Over fileb:// al pasar el contenido de un archivo a un parámetro del AWS CLI, consulte Mejores prácticas para los parámetros de archivos locales.

Para obtener más información y ver parámetros adicionales que puede pasar, consulte invoke-endpoint en la Referencia de comandos de AWS CLI .

Si el comando invoke-endpoint se ejecuta correctamente, devuelve una respuesta similar a la siguiente:

{ "ContentType": "<content_type>; charset=utf-8", "InvokedProductionVariant": "<Variant>" }

Si el comando no se ejecuta correctamente, compruebe si la carga útil de entrada tiene el formato correcto.

Vea el resultado de la invocación comprobando el archivo de salida del archivo (output_file.txt en este ejemplo).

more output_file.txt

Invoque su modelo mediante el AWS SDK para Python

Invoque para transmitir bidireccionalmente una solicitud de inferencia y una respuesta

Si desea invocar un punto final modelo en el código de su aplicación para admitir la transmisión bidireccional, puede usar el nuevo SDK experimental para Python que admite la capacidad de transmisión bidireccional con compatibilidad con HTTP/2. Este SDK permite una comunicación bidireccional en tiempo real entre la aplicación cliente y el SageMaker terminal, lo que le permite enviar solicitudes de inferencia de forma incremental y, al mismo tiempo, recibir respuestas en streaming a medida que las genera el modelo. Esto resulta especialmente útil para aplicaciones interactivas en las que tanto el cliente como el servidor necesitan intercambiar datos de forma continua a través de una conexión persistente.

nota

El nuevo SDK experimental es diferente del SDK estándar de Boto3 y admite conexiones bidireccionales persistentes para el intercambio de datos. Al utilizar el SDK experimental de Python, recomendamos encarecidamente fijar estrictamente una versión del SDK para cualquier caso de uso no experimental.

Para invocar tu punto final con transmisión bidireccional, usa este método. invoke_endpoint_with_bidirectional_stream Este método establece una conexión persistente que le permite transmitir varios fragmentos de carga útil a su modelo y, al mismo tiempo, recibir respuestas en tiempo real a medida que el modelo procesa los datos. La conexión permanece abierta hasta que se cierre explícitamente el flujo de entrada o hasta que el terminal cierre la conexión, lo que permite un tiempo de conexión de hasta 30 minutos.

Requisitos previos

Para poder utilizar la transmisión bidireccional en el código de la aplicación, debe:

  1. Instale el SDK HTTP/2 experimental en SageMaker tiempo de ejecución

  2. Configure AWS las credenciales para su cliente SageMaker Runtime

  3. Implemente un modelo que admita la transmisión bidireccional a un punto final SageMaker

Configure el cliente de transmisión bidireccional

El siguiente ejemplo muestra cómo inicializar los componentes necesarios para la transmisión bidireccional:

from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme # Configuration AWS_REGION = "us-west-2" BIDI_ENDPOINT = f"https://runtime.sagemaker.{AWS_REGION}.amazonaws.com:8443" ENDPOINT_NAME = "your-endpoint-name" # Initialize the client configuration config = Config( endpoint_uri=BIDI_ENDPOINT, region=AWS_REGION, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) # Create the SageMaker Runtime HTTP/2 client client = SageMakerRuntimeHTTP2Client(config=config)

Cliente de transmisión bidireccional completo

En el siguiente ejemplo, se muestra cómo crear un cliente de streaming bidireccional que envíe varias cargas de texto a un SageMaker punto final y procese las respuestas en tiempo real:

import asyncio import logging from sagemaker_runtime_http2.client import SageMakerRuntimeHTTP2Client from sagemaker_runtime_http2.config import Config, HTTPAuthSchemeResolver from sagemaker_runtime_http2.models import ( InvokeEndpointWithBidirectionalStreamInput, RequestStreamEventPayloadPart, RequestPayloadPart ) from smithy_aws_core.identity import EnvironmentCredentialsResolver from smithy_aws_core.auth.sigv4 import SigV4AuthScheme logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SageMakerBidirectionalClient: def __init__(self, endpoint_name, region="us-west-2"): self.endpoint_name = endpoint_name self.region = region self.client = None self.stream = None self.response_task = None self.is_active = False def _initialize_client(self): bidi_endpoint = f"runtime.sagemaker.{self.region}.amazonaws.com:8443" config = Config( endpoint_uri=bidi_endpoint, region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), auth_scheme_resolver=HTTPAuthSchemeResolver(), auth_schemes={"aws.auth#sigv4": SigV4AuthScheme(service="sagemaker")} ) self.client = SageMakerRuntimeHTTP2Client(config=config) async def start_session(self): """Establish a bidirectional streaming connection with the endpoint.""" if not self.client: self._initialize_client() logger.info(f"Starting session with endpoint: {self.endpoint_name}") self.stream = await self.client.invoke_endpoint_with_bidirectional_stream( InvokeEndpointWithBidirectionalStreamInput(endpoint_name=self.endpoint_name) ) self.is_active = True # Start processing responses concurrently self.response_task = asyncio.create_task(self._process_responses()) async def send_message(self, message): """Send a single message to the endpoint.""" if not self.is_active: raise RuntimeError("Session not active. Call start_session() first.") logger.info(f"Sending message: {message}") payload = RequestPayloadPart(bytes_=message.encode('utf-8')) event = RequestStreamEventPayloadPart(value=payload) await self.stream.input_stream.send(event) async def send_multiple_messages(self, messages, delay=1.0): """Send multiple messages with a delay between each.""" for message in messages: await self.send_message(message) await asyncio.sleep(delay) async def end_session(self): """Close the bidirectional streaming connection.""" if not self.is_active: return await self.stream.input_stream.close() self.is_active = False logger.info("Stream closed") # Cancel the response processing task if self.response_task and not self.response_task.done(): self.response_task.cancel() async def _process_responses(self): """Process incoming responses from the endpoint.""" try: output = await self.stream.await_output() output_stream = output[1] while self.is_active: result = await output_stream.receive() if result is None: logger.info("No more responses") break if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') logger.info(f"Received: {response_data}") except Exception as e: logger.error(f"Error processing responses: {e}") # Example usage async def run_bidirectional_client(): client = SageMakerBidirectionalClient(endpoint_name="your-endpoint-name") try: # Start the session await client.start_session() # Send multiple messages messages = [ "I need help with", "my account balance", "I can help with that", "and recent charges" ] await client.send_multiple_messages(messages) # Wait for responses to be processed await asyncio.sleep(2) # End the session await client.end_session() logger.info("Session ended successfully") except Exception as e: logger.error(f"Client error: {e}") await client.end_session() if __name__ == "__main__": asyncio.run(run_bidirectional_client())

El cliente inicializa el cliente HTTP/2 en SageMaker tiempo de ejecución con el URI del punto final regional en el puerto 8443, que es necesario para las conexiones de streaming bidireccionales. El session() método start_ llama invoke_endpoint_with_bidirectional_stream() para establecer la conexión persistente y crea una tarea asíncrona para procesar las respuestas entrantes de forma simultánea.

El send_event() método agrupa los datos de la carga útil en los objetos de solicitud correspondientes y los envía a través del flujo de entrada, mientras que el _process_responses() método escucha y procesa continuamente las respuestas del punto final a medida que llegan. Este enfoque bidireccional permite una interacción en tiempo real, en la que tanto el envío de solicitudes como la recepción de respuestas se realizan simultáneamente a través de la misma conexión.