Fix websocket interruption (#291)

* Code

* Fix

* add code

* interruptions

* Add code

* code

* Add code

* Add code

* code
This commit is contained in:
Freddy Boulton
2025-04-22 14:40:19 -04:00
committed by GitHub
parent a68023101d
commit 074e9c9345
9 changed files with 287 additions and 21 deletions

View File

@@ -81,17 +81,11 @@ class WebSocketHandler:
self._graceful_shutdown_task: asyncio.Task | None = None
def _clear_queue(self):
old_queue = self.queue
self.queue = asyncio.Queue()
logger.debug("clearing queue")
i = 0
while not old_queue.empty():
try:
old_queue.get_nowait()
i += 1
except asyncio.QueueEmpty:
break
logger.debug("popped %d items from queue", i)
while not self.queue.empty():
self.queue.get_nowait()
i += 1
logger.debug("websocket: popped %d items from queue", i)
def set_args(self, args: list[Any]):
self.stream_handler.set_args(args)
@@ -260,8 +254,8 @@ class WebSocketHandler:
async def _emit_loop(self):
try:
while not self.quit.is_set():
wait_duration = 0.02
output = await self.queue.get()
if output is not None:
frame, output = split_output(output)
if isinstance(output, AdditionalOutputs):
@@ -279,6 +273,7 @@ class WebSocketHandler:
if self.stream_handler.phone_mode
else self.stream_handler.output_sample_rate
)
duration = np.atleast_2d(frame[1]).shape[1] / frame[0]
mulaw_audio = convert_to_mulaw(
frame[1],
frame[0],
@@ -287,9 +282,6 @@ class WebSocketHandler:
audio_payload = base64.b64encode(mulaw_audio).decode("utf-8")
if self.websocket and self.stream_id:
sample_rate, audio_array = frame[:2]
duration = len(audio_array) / sample_rate
self.playing_durations.append(duration)
payload = {
@@ -298,9 +290,10 @@ class WebSocketHandler:
}
if self.stream_handler.phone_mode:
payload["streamSid"] = self.stream_id
# yield audio slightly faster than real-time
wait_duration = 0.75 * duration
await self.websocket.send_json(payload)
await asyncio.sleep(0.02)
await asyncio.sleep(wait_duration)
except asyncio.CancelledError:
logger.debug("Emit loop cancelled")