Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
343 changes: 172 additions & 171 deletions livekit-plugins/livekit-plugins-soniox/livekit/plugins/soniox/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from livekit.agents import (
APIConnectionError,
APIConnectOptions,
APIError,
APIStatusError,
APITimeoutError,
LanguageCode,
Expand All @@ -50,6 +51,13 @@
FINALIZED_TOKEN = "<fin>"


class _SessionFinished(Exception):
"""Internal sentinel raised by the recv task when Soniox signals a normal
end-of-session (`finished` frame followed by a clean WS close). `_run`
catches it so the gather unwinds and sibling tasks (e.g. send_audio
blocked on an empty queue) are cancelled by the surrounding finally."""


def is_end_token(token: dict) -> bool:
"""Return True if the given token marks an end or finalized event."""
return token.get("text") in (END_TOKEN, FINALIZED_TOKEN)
Expand Down Expand Up @@ -222,7 +230,6 @@ def __init__(
super().__init__(stt=stt, conn_options=conn_options, sample_rate=stt._params.sample_rate)
self._stt: STT = stt
self._ws: aiohttp.ClientWebSocketResponse | None = None
self._reconnect_event = asyncio.Event()

self.audio_queue: asyncio.Queue[bytes | str] = asyncio.Queue()

Expand Down Expand Up @@ -300,70 +307,45 @@ def _report_processed_audio_duration(self, total_audio_proc_ms: float) -> None:
self._reported_duration_ms = int(total_audio_proc_ms)

async def _run(self) -> None:
"""Manage connection lifecycle, spawning tasks and handling reconnection."""
while True:
try:
ws = await self._connect_ws()
self._ws = ws
# Create task for audio processing, voice turn detection and message handling.
tasks: list[asyncio.Task[None]] = [
asyncio.create_task(self._prepare_audio_task()),
asyncio.create_task(self._send_audio_task()),
asyncio.create_task(self._recv_messages_task()),
asyncio.create_task(self._keepalive_task()),
]
wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

tasks_group: asyncio.Future[Any] = asyncio.gather(*tasks)
try:
done, _ = await asyncio.wait(
[tasks_group, wait_reconnect_task],
return_when=asyncio.FIRST_COMPLETED,
)

for task in done:
if task != wait_reconnect_task:
task.result()

if wait_reconnect_task not in done:
break

self._reconnect_event.clear()
finally:
await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
tasks_group.cancel()
tasks_group.exception()

except asyncio.TimeoutError as e:
logger.error(
f"Timeout during Soniox Speech-to-Text API connection/initialization: {e}"
)
raise APITimeoutError(
"Timeout connecting to or initializing Soniox Speech-to-Text API session"
) from e

except aiohttp.ClientResponseError as e:
logger.error(
"Soniox Speech-to-Text API status error during session init:"
+ f"{e.status} {e.message}"
)
raise APIStatusError(
message=e.message, status_code=e.status, request_id=None, body=None
) from e

except aiohttp.ClientError as e:
logger.error(f"Soniox Speech-to-Text API connection error: {e}")
raise APIConnectionError(f"Soniox Speech-to-Text API connection error: {e}") from e

except Exception as e:
logger.exception(f"Unexpected error occurred: {e}")
raise APIConnectionError(f"An unexpected error occurred: {e}") from e
# Close the WebSocket connection on finish.
finally:
if self._ws is not None:
await self._ws.close()
self._ws = None

"""Connect to Soniox, run the audio/recv tasks, and propagate errors so the
base class can apply its retry/backoff policy."""
try:
ws = await self._connect_ws()
except asyncio.TimeoutError as e:
raise APITimeoutError(
"Timeout connecting to or initializing Soniox Speech-to-Text API session"
) from e
except aiohttp.ClientResponseError as e:
raise APIStatusError(
message=e.message, status_code=e.status, request_id=None, body=None
) from e
except aiohttp.ClientError as e:
raise APIConnectionError(f"Soniox Speech-to-Text API connection error: {e}") from e

self._ws = ws
tasks: list[asyncio.Task[None]] = [
asyncio.create_task(self._prepare_audio_task()),
asyncio.create_task(self._send_audio_task()),
asyncio.create_task(self._recv_messages_task()),
asyncio.create_task(self._keepalive_task()),
]
try:
await asyncio.gather(*tasks)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
except _SessionFinished:
# Normal end-of-session signaled by the server; sibling tasks are
# cancelled by the finally block, then _run returns cleanly.
pass
except aiohttp.ClientError as e:
# Mid-stream transport failure (e.g. dropped connection) — surface it as
# APIError so the base class retry/backoff policy applies.
raise APIConnectionError(f"Soniox Speech-to-Text API connection error: {e}") from e
finally:
await utils.aio.gracefully_cancel(*tasks)
if self._ws is not None:
await self._ws.close()
self._ws = None

@utils.log_exceptions(logger=logger)
async def _keepalive_task(self) -> None:
"""Periodically send keepalive messages (while no audio is being sent)
to maintain the WebSocket connection."""
Expand All @@ -374,6 +356,7 @@ async def _keepalive_task(self) -> None:
except Exception as e:
logger.error(f"Error while sending keep alive message: {e}")

@utils.log_exceptions(logger=logger)
async def _prepare_audio_task(self) -> None:
"""Read audio frames and enqueue PCM data for sending."""
if not self._ws:
Expand All @@ -386,6 +369,7 @@ async def _prepare_audio_task(self) -> None:
pcm_data = data.data.tobytes()
self.audio_queue.put_nowait(pcm_data)

@utils.log_exceptions(logger=logger)
async def _send_audio_task(self) -> None:
"""Take queued audio data and transmit it over the WebSocket."""
if not self._ws:
Expand Down Expand Up @@ -448,120 +432,137 @@ def send_endpoint_transcript() -> None:
final_original.reset()

is_translation_mode = self._stt._params.translation is not None
# Method handles receiving messages from the Soniox Speech-to-Text API.
while self._ws:
try:
async for msg in self._ws:
if msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.CLOSING,
):
break

if msg.type != aiohttp.WSMsgType.TEXT:
logger.warning(
f"Unexpected message type from Soniox Speech-to-Text API: {msg.type}"
)
continue
if not self._ws:
return

try:
content = json.loads(msg.data)
tokens = content["tokens"]

non_final = _TokenAccumulator()
non_final_original = _TokenAccumulator()
total_audio_proc_ms = content.get("total_audio_proc_ms", 0)

# 1) process tokens: accumulate final/non-final,
# flush immediately on endpoint tokens.
for token in tokens:
is_translated = token.get("translation_status") == "translation"
if (
is_translation_mode
and not is_end_token(token)
and not is_translated
):
# Original-language token: capture text for source_text only.
if token["is_final"]:
final_original.update(token)
else:
non_final_original.update(token)
continue
if token["is_final"]:
if is_end_token(token):
send_endpoint_transcript()
self._report_processed_audio_duration(
total_audio_proc_ms,
)
else:
final.update(token)
else:
non_final.update(token)

# 2) emit START_OF_SPEECH + transcript for remaining content.
if final.text or non_final.text:
if not is_speaking:
is_speaking = True
self._event_ch.send_nowait(
stt.SpeechEvent(type=SpeechEventType.START_OF_SPEECH)
)
interim_segs = _merge_lang_segments(
final_original._lang_segments, non_final_original._lang_segments
)
interim_src_langs = [
LanguageCode(lang) for lang, _ in interim_segs
] or None
interim_src_texts = [t for _, t in interim_segs] or None

# When all tokens in this batch are final (no non-final pending),
# speech has reached a stable state — emit PREFLIGHT_TRANSCRIPT to
# allow preemptive LLM generation. This mirrors Deepgram v2's
# EagerEndOfTurn behavior.
event_type = (
SpeechEventType.PREFLIGHT_TRANSCRIPT
if final.text and not non_final.text
else SpeechEventType.INTERIM_TRANSCRIPT
)
self._event_ch.send_nowait(
stt.SpeechEvent(
type=event_type,
alternatives=[
final.merged_speech_data(
non_final,
self.start_time_offset,
source_languages=interim_src_langs,
source_texts=interim_src_texts,
)
],
)
)
# Set when Soniox sends a `finished` frame, indicating the server-side
# session ended normally and the WS will close cleanly afterwards.
finished = False

async for msg in self._ws:
if msg.type in (
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.CLOSING,
):
if finished:
raise _SessionFinished
raise APIStatusError(
message="Soniox Speech-to-Text API connection closed unexpectedly",
status_code=self._ws.close_code or -1,
request_id=None,
body=None,
)

if msg.type != aiohttp.WSMsgType.TEXT:
logger.warning(
f"Unexpected message type from Soniox Speech-to-Text API: {msg.type}"
)
continue

# 3) on error or finish, flush any remaining final tokens.
if (
content.get("finished")
or content.get("error_code")
or content.get("error_message")
):
try:
content = json.loads(msg.data)
tokens = content.get("tokens", [])

non_final = _TokenAccumulator()
non_final_original = _TokenAccumulator()
total_audio_proc_ms = content.get("total_audio_proc_ms", 0)

# 1) process tokens: accumulate final/non-final,
# flush immediately on endpoint tokens.
for token in tokens:
is_translated = token.get("translation_status") == "translation"
if is_translation_mode and not is_end_token(token) and not is_translated:
# Original-language token: capture text for source_text only.
if token["is_final"]:
final_original.update(token)
else:
non_final_original.update(token)
continue
if token["is_final"]:
if is_end_token(token):
send_endpoint_transcript()
self._report_processed_audio_duration(total_audio_proc_ms)
else:
final.update(token)
else:
non_final.update(token)

# 2) emit START_OF_SPEECH + transcript for remaining content.
if final.text or non_final.text:
if not is_speaking:
is_speaking = True
self._event_ch.send_nowait(
stt.SpeechEvent(type=SpeechEventType.START_OF_SPEECH)
)
interim_segs = _merge_lang_segments(
final_original._lang_segments, non_final_original._lang_segments
)
interim_src_langs = [LanguageCode(lang) for lang, _ in interim_segs] or None
interim_src_texts = [t for _, t in interim_segs] or None

# When all tokens in this batch are final (no non-final pending),
# speech has reached a stable state — emit PREFLIGHT_TRANSCRIPT to
# allow preemptive LLM generation. This mirrors Deepgram v2's
# EagerEndOfTurn behavior.
event_type = (
SpeechEventType.PREFLIGHT_TRANSCRIPT
if final.text and not non_final.text
else SpeechEventType.INTERIM_TRANSCRIPT
)
self._event_ch.send_nowait(
stt.SpeechEvent(
type=event_type,
alternatives=[
final.merged_speech_data(
non_final,
self.start_time_offset,
source_languages=interim_src_langs,
source_texts=interim_src_texts,
)
],
)
)

if content.get("error_code") or content.get("error_message"):
logger.error(
f"WebSocket error: {content.get('error_code')}"
f" - {content.get('error_message')}"
)

if content.get("finished"):
logger.debug("Transcription finished")
# 3) on error or finish, flush any remaining final tokens.
if (
content.get("finished")
or content.get("error_code")
or content.get("error_message")
):
send_endpoint_transcript()
self._report_processed_audio_duration(total_audio_proc_ms)

if content.get("error_code") or content.get("error_message"):
# Surface in-band Soniox error frames as APIStatusError so the
# base class retry/backoff policy can apply.
error_code = content.get("error_code")
raise APIStatusError(
message=(
f"Soniox Speech-to-Text API error: {content.get('error_message')}"
),
status_code=error_code if isinstance(error_code, int) else -1,
request_id=None,
body=None,
)

except Exception as e:
logger.exception(f"Error processing message: {e}")
if content.get("finished"):
logger.debug("Transcription finished")
finished = True

except aiohttp.ClientError as e:
logger.error(f"WebSocket error while receiving: {e}")
except APIError:
raise
except Exception as e:
logger.error(f"Unexpected error while receiving messages: {e}")
logger.exception(f"Error processing message: {e}")

if finished:
raise _SessionFinished
raise APIStatusError(
message="Soniox Speech-to-Text API connection ended without close frame",
status_code=self._ws.close_code or -1,
request_id=None,
body=None,
)


def _merge_lang_segments(
Expand Down
Loading