* Add code * add code * add code * Rename messages * rename * add code * Add demo * docs + demos + bug fixes * add code * styles * user guide * Styles * Add code * misc docs updates * print nit * whisper + pr * url for images * whsiper update * Fix bugs * remove demo files * version number * Fix pypi readme * Fix * demos * Add llama code editor * Update llama code editor and object detection cookbook * Add more cookbook demos * add code * Fix links for PR deploys * add code * Fix the install * add tts * TTS docs * Typo * Pending bubbles for reply on pause * Stream redesign (#63) * better error handling * Websocket error handling * add code --------- Co-authored-by: Freddy Boulton <freddyboulton@hf-freddy.local> * remove docs from dist * Some docs typos * more typos * upload changes + docs * docs * better phone * update docs * add code * Make demos better * fix docs + websocket start_up * remove mention of FastAPI app * fastphone tweaks * add code * ReplyOnStopWord fixes * Fix cookbook * Fix pypi readme * add code * bump versions * sambanova cookbook * Fix tags * Llm voice chat * kyutai tag * Add error message to all index.html * STT module uses Moonshine * Not required from typing extensions * fix llm voice chat * Add vpn warning * demo fixes * demos * Add more ui args and gemini audio-video * update cookbook * version 9 --------- Co-authored-by: Freddy Boulton <freddyboulton@hf-freddy.local>
14 KiB
Audio Streaming
Reply On Pause
Typically, you want to run a python function whenever a user has stopped speaking. This can be done by wrapping a python generator with the ReplyOnPause class and passing it to the handler argument of the Stream object.
=== "Code" ```python from fastrtc import ReplyOnPause, Stream
def response(audio: tuple[int, np.ndarray]): # (1)
sample_rate, audio_array = audio
# Generate response
for audio_chunk in generate_response(sample_rate, audio_array):
yield (sample_rate, audio_chunk) # (2)
stream = Stream(
handler=ReplyOnPause(response),
modality="audio",
mode="send-receive"
)
```
1. The python generator will receive the **entire** audio up until the user stopped. It will be a tuple of the form (sampling_rate, numpy array of audio). The array will have a shape of (1, num_samples). You can also pass in additional input components.
2. The generator must yield audio chunks as a tuple of (sampling_rate, numpy audio array). Each numpy audio array must have a shape of (1, num_samples).
=== "Notes" 1. The python generator will receive the entire audio up until the user stopped. It will be a tuple of the form (sampling_rate, numpy array of audio). The array will have a shape of (1, num_samples). You can also pass in additional input components.
2. The generator must yield audio chunks as a tuple of (sampling_rate, numpy audio array). Each numpy audio array must have a shape of (1, num_samples).
The ReplyOnPause class will handle the voice detection and turn taking logic automatically!
!!! warning "Argument Order"
The first argument to the function must be the audio
!!! tip "Parameters"
You can customize the voice detection parameters by passing in algo_options and model_options to the ReplyOnPause class.
```python
from fastrtc import AlgoOptions, SileroVadOptions
stream = Stream(
handler=ReplyOnPause(
response,
algo_options=AlgoOptions(
audio_chunk_duration=0.6,
started_talking_threshold=0.2,
speech_threshold=0.1
),
model_options=SileroVadOptions(
threshold=0.5,
min_speech_duration_ms=250,
min_silence_duration_ms=100
)
)
)
```
Reply On Stopwords
You can configure your AI model to run whenever a set of "stop words" are detected, like "Hey Siri" or "computer", with the ReplyOnStopWords class.
The API is similar to ReplyOnPause with the addition of a stop_words parameter.
=== "Code" ``` py from fastrtc import Stream, ReplyOnStopWords
def response(audio: tuple[int, np.ndarray]):
"""This function must yield audio frames"""
...
for numpy_array in generated_audio:
yield (sampling_rate, numpy_array, "mono")
stream = Stream(
handler=ReplyOnStopWords(generate,
input_sample_rate=16000,
stop_words=["computer"]), # (1)
modality="audio",
mode="send-receive"
)
```
1. The `stop_words` can be single words or pairs of words. Be sure to include common misspellings of your word for more robust detection, e.g. "llama", "lamma". In my experience, it's best to use two very distinct words like "ok computer" or "hello iris".
=== "Notes"
1. The stop_words can be single words or pairs of words. Be sure to include common misspellings of your word for more robust detection, e.g. "llama", "lamma". In my experience, it's best to use two very distinct words like "ok computer" or "hello iris".
!!! tip "Extra Dependencies"
The ReplyOnStopWords class requires the the stopword extra. Run pip install fastrtc[stopword] to install it.
!!! warning "English Only"
The ReplyOnStopWords class is currently only supported for English.
Stream Handler
ReplyOnPause and ReplyOnStopWords are implementations of a StreamHandler. The StreamHandler is a low-level abstraction that gives you arbitrary control over how the input audio stream and output audio stream are created. The following example echos back the user audio.
=== "Code" ``` py import gradio as gr from gradio_webrtc import WebRTC, StreamHandler from queue import Queue
class EchoHandler(StreamHandler):
def __init__(self) -> None:
super().__init__()
self.queue = Queue()
def receive(self, frame: tuple[int, np.ndarray]) -> None: # (1)
self.queue.put(frame)
def emit(self) -> None: # (2)
return self.queue.get()
def copy(self) -> StreamHandler:
return EchoHandler()
def shutdown(self) -> None: # (3)
pass
def start_up(self) -> None: # (4)
pass
stream = Stream(
handler=EchoHandler(),
modality="audio",
mode="send-receive"
)
```
1. The `StreamHandler` class implements three methods: `receive`, `emit` and `copy`. The `receive` method is called when a new frame is received from the client, and the `emit` method returns the next frame to send to the client. The `copy` method is called at the beginning of the stream to ensure each user has a unique stream handler.
2. The `emit` method SHOULD NOT block. If a frame is not ready to be sent, the method should return `None`.
3. The `shutdown` method is called when the stream is closed. It should be used to clean up any resources.
4. The `start_up` method is called when the stream is first created. It should be used to initialize any resources. See [Talk To OpenAI](https://huggingface.co/spaces/fastrtc/talk-to-openai-gradio) or [Talk To Gemini](https://huggingface.co/spaces/fastrtc/talk-to-gemini-gradio) for an example of a `StreamHandler` that uses the `start_up` method to connect to an API.
=== "Notes"
1. The StreamHandler class implements three methods: receive, emit and copy. The receive method is called when a new frame is received from the client, and the emit method returns the next frame to send to the client. The copy method is called at the beginning of the stream to ensure each user has a unique stream handler.
2. The emit method SHOULD NOT block. If a frame is not ready to be sent, the method should return None.
3. The shutdown method is called when the stream is closed. It should be used to clean up any resources.
4. The start_up method is called when the stream is first created. It should be used to initialize any resources. See Talk To OpenAI or Talk To Gemini for an example of a StreamHandler that uses the start_up method to connect to an API.
!!! tip See this Talk To Gemini for a complete example of a more complex stream handler.
Async Stream Handlers
It is also possible to create asynchronous stream handlers. This is very convenient for accessing async APIs from major LLM developers, like Google and OpenAI. The main difference is that receive, emit, and start_up are now defined with async def.
Here is a complete example of using AsyncStreamHandler for using the Google Gemini real time API:
=== "Code" ``` py
from fastrtc import AsyncStreamHandler
import asyncio
import base64
import os
import google.generativeai as genai
from google.generativeai.types import (
LiveConnectConfig, SpeechConfig,
VoiceConfig, PrebuiltVoiceConfig
)
class GeminiHandler(AsyncStreamHandler):
"""Handler for the Gemini API"""
def __init__(
self,
expected_layout: Literal["mono"] = "mono",
output_sample_rate: int = 24000,
output_frame_size: int = 480,
) -> None:
super().__init__(
expected_layout,
output_sample_rate,
output_frame_size,
input_sample_rate=16000,
)
self.input_queue: asyncio.Queue = asyncio.Queue()
self.output_queue: asyncio.Queue = asyncio.Queue()
self.quit: asyncio.Event = asyncio.Event()
def copy(self) -> "GeminiHandler":
return GeminiHandler(
expected_layout="mono",
output_sample_rate=self.output_sample_rate,
output_frame_size=self.output_frame_size,
)
async def start_up(self):
await self.wait_for_args()
api_key, voice_name = self.latest_args[1:]
client = genai.Client(
api_key=api_key or os.getenv("GEMINI_API_KEY"),
http_options={"api_version": "v1alpha"},
)
config = LiveConnectConfig(
response_modalities=["AUDIO"], # type: ignore
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name=voice_name,
)
)
),
)
async with client.aio.live.connect(
model="gemini-2.0-flash-exp", config=config
) as session:
async for audio in session.start_stream(
stream=self.stream(), mime_type="audio/pcm"
):
if audio.data:
array = np.frombuffer(audio.data, dtype=np.int16)
self.output_queue.put_nowait(array)
async def stream(self) -> AsyncGenerator[bytes, None]:
while not self.quit.is_set():
try:
audio = await asyncio.wait_for(self.input_queue.get(), 0.1)
yield audio
except (asyncio.TimeoutError, TimeoutError):
pass
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
_, array = frame
array = array.squeeze()
audio_message = encode_audio(array)
self.input_queue.put_nowait(audio_message)
async def emit(self) -> tuple[int, np.ndarray]:
array = await self.output_queue.get()
return (self.output_sample_rate, array)
def shutdown(self) -> None:
self.quit.set()
self.args_set.clear()
```
Text To Speech
You can use an on-device text to speech model if you have the tts extra installed.
Import the get_tts_model function and call it with the model name you want to use.
At the moment, the only model supported is kokoro.
The get_tts_model function returns an object with three methods:
tts: Synchronous text to speech.stream_tts_sync: Synchronous text to speech streaming.stream_tts: Asynchronous text to speech streaming.
from fastrtc import get_tts_model
model = get_tts_model(model="kokoro")
for audio in model.stream_tts_sync("Hello, world!"):
yield audio
async for audio in model.stream_tts("Hello, world!"):
yield audio
audio = model.tts("Hello, world!")
!!! tip
You can customize the audio by passing in an instace of KokoroTTSOptions to the method.
See here for a list of available voices.
```python
from fastrtc import KokoroTTSOptions, get_tts_model
model = get_tts_model(model="kokoro")
options = KokoroTTSOptions(
voice="af_heart",
speed=1.0,
lang="en-us"
)
audio = model.tts("Hello, world!", options=options)
```
Speech To Text
You can use an on-device speech to text model if you have the stt or stopword extra installed.
Import the get_stt_model function and call it with the model name you want to use.
At the moment, the only models supported are moonshine/base and moonshine/tiny.
The get_stt_model function returns an object with the following method:
stt: Synchronous speech to text.
from fastrtc import get_stt_model
model = get_stt_model(model="moonshine/base")
audio = (16000, np.random.randint(-32768, 32768, size=(1, 16000)))
text = model.stt(audio)
!!! tip "Example"
See LLM Voice Chat for an example of using the stt method in a ReplyOnPause handler.
!!! warning "English Only"
The stt model is currently only supported for English.
Requesting Inputs
In ReplyOnPause and ReplyOnStopWords, any additional input data is automatically passed to your generator. For StreamHandlers, you must manually request the input data from the client.
You can do this by calling await self.wait_for_args() (for AsyncStreamHandlers) in either the emit or receive methods. For a StreamHandler, you can call self.wait_for_args_sync().
We can access the value of this component via the latest_args property of the StreamHandler. The latest_args is a list storing each of the values. The 0th index is the dummy string __webrtc_value__.
Telephone Integration
In order for your handler to work over the phone, you must make sure that your handler is not expecting any additional input data besides the audio.
If you call await self.wait_for_args() your stream will wait forever for the additional input data.
The stream handlers have a phone_mode property that is set to True if the stream is running over the phone. You can use this property to determine if you should wait for additional input data.
def emit(self):
if self.phone_mode:
self.latest_args = [None]
else:
await self.wait_for_args()
ReplyOnPause
The generator you pass to ReplyOnPause must have default arguments for all arguments except audio.
If you yield AdditionalOutputs, they will be passed in as the input arguments to the generator the next time it is called.
!!! tip
See Talk To Claude for an example of a ReplyOnPause handler that is compatible with telephone usage. Notice how the input chatbot history is yielded as an AdditionalOutput on each invocation.