Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Speech-to-speech Exemple
Note
Cette documentation concerne la version 1 d'Amazon Nova. Pour consulter le guide Amazon Nova 2 Sonic, consultez Getting started.
Cet exemple step-by-step explique comment implémenter une application de streaming audio simple en temps réel à l'aide du modèle Amazon Nova Sonic. Cette version simplifiée présente les fonctionnalités essentielles nécessaires pour créer une conversation audio avec le modèle Amazon Nova Sonic.
Vous pouvez accéder à l'exemple suivant dans notre GitHub référentiel d'échantillons Amazon Nova.
-
Définir les importations et la configuration
Cette section importe les bibliothèques nécessaires et définit les paramètres de configuration audio :
-
asyncio: pour la programmation asynchrone -
base64: pour l’encodage et le décodage des données audio -
pyaudio: pour la capture et la lecture audio -
Composants du kit SDK Amazon Bedrock pour le streaming
-
Les constantes audio définissent le format de capture audio (fréquence d’échantillonnage de 16 kHz, canal mono)
import os import asyncio import base64 import json import uuid import pyaudio from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput from aws_sdk_bedrock_runtime.models import InvokeModelWithBidirectionalStreamInputChunk, BidirectionalInputPayloadPart from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme from smithy_aws_core.credentials_resolvers.environment import EnvironmentCredentialsResolver # Audio configuration INPUT_SAMPLE_RATE = 16000 OUTPUT_SAMPLE_RATE = 24000 CHANNELS = 1 FORMAT = pyaudio.paInt16 CHUNK_SIZE = 1024 -
-
Définir la classe
SimpleNovaSonicLa classe
SimpleNovaSonicest la classe principale qui gère l’interaction avec Amazon Nova Sonic :-
model_id: l’ID du modèle Amazon Nova Sonic (amazon.nova-sonic-v1:0) -
region: Le Région AWS, la valeur par défaut estus-east-1 -
Unique IDs pour le suivi rapide et le suivi du contenu
-
Une file d’attente asynchrone pour la lecture audio
class SimpleNovaSonic: def __init__(self, model_id='amazon.nova-sonic-v1:0', region='us-east-1'): self.model_id = model_id self.region = region self.client = None self.stream = None self.response = None self.is_active = False self.prompt_name = str(uuid.uuid4()) self.content_name = str(uuid.uuid4()) self.audio_content_name = str(uuid.uuid4()) self.audio_queue = asyncio.Queue() self.display_assistant_text = False -
-
Initialiser le client
Cette méthode configure le client Amazon Bedrock avec les éléments suivants :
-
Le point de terminaison approprié pour la région spécifiée
-
Informations d'authentification utilisant des variables d'environnement pour les AWS informations d'identification
-
Le schéma d'authentification SigV4 pour les appels AWS d'API
def _initialize_client(self): """Initialize the Bedrock client.""" config = Config( endpoint_uri=f"https://bedrock-runtime.{self.region}.amazonaws.com", region=self.region, aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), http_auth_scheme_resolver=HTTPAuthSchemeResolver(), http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()} ) self.client = BedrockRuntimeClient(config=config) -
-
Gérer les événements
Cette méthode d’aide envoie des événements JSON au flux bidirectionnel, qui est utilisé pour toutes les communications avec le modèle Amazon Nova Sonic :
async def send_event(self, event_json): """Send an event to the stream.""" event = InvokeModelWithBidirectionalStreamInputChunk( value=BidirectionalInputPayloadPart(bytes_=event_json.encode('utf-8')) ) await self.stream.input_stream.send(event) -
Démarrer la séance
Cette méthode lance la session et configure les événements restants pour démarrer la diffusion audio. Ces événements doivent être envoyés dans le même ordre.
async def start_session(self): """Start a new session with Nova Sonic.""" if not self.client: self._initialize_client() # Initialize the stream self.stream = await self.client.invoke_model_with_bidirectional_stream( InvokeModelWithBidirectionalStreamOperationInput(model_id=self.model_id) ) self.is_active = True # Send session start event session_start = ''' { "event": { "sessionStart": { "inferenceConfiguration": { "maxTokens": 1024, "topP": 0.9, "temperature": 0.7 } } } } ''' await self.send_event(session_start) # Send prompt start event prompt_start = f''' {{ "event": {{ "promptStart": {{ "promptName": "{self.prompt_name}", "textOutputConfiguration": {{ "mediaType": "text/plain" }}, "audioOutputConfiguration": {{ "mediaType": "audio/lpcm", "sampleRateHertz": 24000, "sampleSizeBits": 16, "channelCount": 1, "voiceId": "matthew", "encoding": "base64", "audioType": "SPEECH" }} }} }} }} ''' await self.send_event(prompt_start) # Send system prompt text_content_start = f''' {{ "event": {{ "contentStart": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}", "type": "TEXT", "interactive": true, "role": "SYSTEM", "textInputConfiguration": {{ "mediaType": "text/plain" }} }} }} }} ''' await self.send_event(text_content_start) system_prompt = "You are a friendly assistant. The user and you will engage in a spoken dialog " \ "exchanging the transcripts of a natural real-time conversation. Keep your responses short, " \ "generally two or three sentences for chatty scenarios." text_input = f''' {{ "event": {{ "textInput": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}", "content": "{system_prompt}" }} }} }} ''' await self.send_event(text_input) text_content_end = f''' {{ "event": {{ "contentEnd": {{ "promptName": "{self.prompt_name}", "contentName": "{self.content_name}" }} }} }} ''' await self.send_event(text_content_end) # Start processing responses self.response = asyncio.create_task(self._process_responses()) -
Gérer l’entrée audio
Ces méthodes gèrent le cycle de vie de l’entrée audio :
-
start_audio_input: configure et démarre le flux d’entrée audio -
send_audio_chunk: encode et envoie des segments audio au modèle -
end_audio_input: ferme correctement le flux d’entrée audio
async def start_audio_input(self): """Start audio input stream.""" audio_content_start = f''' {{ "event": {{ "contentStart": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}", "type": "AUDIO", "interactive": true, "role": "USER", "audioInputConfiguration": {{ "mediaType": "audio/lpcm", "sampleRateHertz": 16000, "sampleSizeBits": 16, "channelCount": 1, "audioType": "SPEECH", "encoding": "base64" }} }} }} }} ''' await self.send_event(audio_content_start) async def send_audio_chunk(self, audio_bytes): """Send an audio chunk to the stream.""" if not self.is_active: return blob = base64.b64encode(audio_bytes) audio_event = f''' {{ "event": {{ "audioInput": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}", "content": "{blob.decode('utf-8')}" }} }} }} ''' await self.send_event(audio_event) async def end_audio_input(self): """End audio input stream.""" audio_content_end = f''' {{ "event": {{ "contentEnd": {{ "promptName": "{self.prompt_name}", "contentName": "{self.audio_content_name}" }} }} }} ''' await self.send_event(audio_content_end) -
-
Terminer la session
Cette méthode ferme correctement la session en :
-
Envoi d’un événement
promptEnd -
Envoi d’un événement
sessionEnd -
Fermeture du flux d’entrée
async def end_session(self): """End the session.""" if not self.is_active: return prompt_end = f''' {{ "event": {{ "promptEnd": {{ "promptName": "{self.prompt_name}" }} }} }} ''' await self.send_event(prompt_end) session_end = ''' { "event": { "sessionEnd": {} } } ''' await self.send_event(session_end) # close the stream await self.stream.input_stream.close() -
-
Gérer les réponses
Cette méthode traite en continu les réponses du modèle et effectue les opérations suivantes :
-
Attend la sortie du flux.
-
Analyse la réponse JSON.
-
Gère la sortie texte en l’imprimant sur la console avec la reconnaissance vocale automatique et la transcription.
-
Gère la sortie audio en la décodant et en la mettant en file d’attente pour la lecture.
async def _process_responses(self): """Process responses from the stream.""" try: while self.is_active: output = await self.stream.await_output() result = await output[1].receive() if result.value and result.value.bytes_: response_data = result.value.bytes_.decode('utf-8') json_data = json.loads(response_data) if 'event' in json_data: # Handle content start event if 'contentStart' in json_data['event']: content_start = json_data['event']['contentStart'] # set role self.role = content_start['role'] # Check for speculative content if 'additionalModelFields' in content_start: additional_fields = json.loads(content_start['additionalModelFields']) if additional_fields.get('generationStage') == 'SPECULATIVE': self.display_assistant_text = True else: self.display_assistant_text = False # Handle text output event elif 'textOutput' in json_data['event']: text = json_data['event']['textOutput']['content'] if (self.role == "ASSISTANT" and self.display_assistant_text): print(f"Assistant: {text}") elif self.role == "USER": print(f"User: {text}") # Handle audio output elif 'audioOutput' in json_data['event']: audio_content = json_data['event']['audioOutput']['content'] audio_bytes = base64.b64decode(audio_content) await self.audio_queue.put(audio_bytes) except Exception as e: print(f"Error processing responses: {e}") -
-
Lire l’audio
Cette méthode effectue les tâches suivantes :
-
Initialise un flux d’entrée
PyAudio -
Récupère en continu les données audio de la file d’attente
-
Lit l’audio via les haut-parleurs
-
Nettoie correctement les ressources une fois terminé
async def play_audio(self): """Play audio responses.""" p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=OUTPUT_SAMPLE_RATE, output=True ) try: while self.is_active: audio_data = await self.audio_queue.get() stream.write(audio_data) except Exception as e: print(f"Error playing audio: {e}") finally: stream.stop_stream() stream.close() p.terminate() -
-
Capturer l’audio
Cette méthode effectue les tâches suivantes :
-
Initialise un flux de sortie
PyAudio -
Démarre la session d’entrée audio
-
Capture en continu des segments audio à partir du microphone
-
Envoie chaque segment au modèle Amazon Nova Sonic
-
Nettoie correctement les ressources une fois terminé
async def capture_audio(self): """Capture audio from microphone and send to Nova Sonic.""" p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=INPUT_SAMPLE_RATE, input=True, frames_per_buffer=CHUNK_SIZE ) print("Starting audio capture. Speak into your microphone...") print("Press Enter to stop...") await self.start_audio_input() try: while self.is_active: audio_data = stream.read(CHUNK_SIZE, exception_on_overflow=False) await self.send_audio_chunk(audio_data) await asyncio.sleep(0.01) except Exception as e: print(f"Error capturing audio: {e}") finally: stream.stop_stream() stream.close() p.terminate() print("Audio capture stopped.") await self.end_audio_input() -
-
Exécution de la fonction principale
La fonction principale orchestre l’ensemble du processus en effectuant les opérations suivantes :
-
Crée un client Amazon Nova Sonic
-
Démarre la session
-
Crée des tâches de simultanéité pour la lecture et la capture audio
-
Attend que l’utilisateur appuie sur Entrée pour arrêter
-
Termine correctement la session et nettoie les tâches
async def main(): # Create Nova Sonic client nova_client = SimpleNovaSonic() # Start session await nova_client.start_session() # Start audio playback task playback_task = asyncio.create_task(nova_client.play_audio()) # Start audio capture task capture_task = asyncio.create_task(nova_client.capture_audio()) # Wait for user to press Enter to stop await asyncio.get_event_loop().run_in_executor(None, input) # End session nova_client.is_active = False # First cancel the tasks tasks = [] if not playback_task.done(): tasks.append(playback_task) if not capture_task.done(): tasks.append(capture_task) for task in tasks: task.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) # cancel the response task if nova_client.response and not nova_client.response.done(): nova_client.response.cancel() await nova_client.end_session() print("Session ended") if __name__ == "__main__": # Set AWS credentials if not using environment variables # os.environ['AWS_ACCESS_KEY_ID'] = "your-access-key" # os.environ['AWS_SECRET_ACCESS_KEY'] = "your-secret-key" # os.environ['AWS_DEFAULT_REGION'] = "us-east-1" asyncio.run(main()) -