Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions src/scope/server/livepeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import numpy as np
from av import AudioFrame, VideoFrame

from .kafka_publisher import publish_event # noqa: F401
from .livepeer_client import LivepeerClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,6 +75,33 @@ def webrtc_connected(self) -> bool:
"""Whether an active Livepeer job is running."""
return self._client is not None and self._client.media_connected

def _publish_cloud_error(
self,
error_message: str,
exception_type: str,
error_type: str,
extra_error_fields: dict | None = None,
) -> None:
"""Publish a cloud connection error event (fire-and-forget)."""
error = {
"error_type": error_type,
"message": error_message,
"exception_type": exception_type,
"recoverable": True,
}
if extra_error_fields:
error.update(extra_error_fields)

publish_event(
event_type="error",
connection_id=(
self._client.connection_id if self._client is not None else None
),
user_id=self._user_id,
error=error,
metadata={"app_id": "livepeer"},
)

def configure(self) -> None:
"""Enable Livepeer backend mode."""
self._configured = True
Expand Down Expand Up @@ -133,6 +161,11 @@ async def connect(
self._connect_error = str(e)
self._last_close_reason = str(e)
logger.error(f"Failed to connect job: {e}")
self._publish_cloud_error(
str(e),
type(e).__name__,
error_type="cloud_connection_failed",
)
try:
await client.disconnect()
except Exception:
Expand Down Expand Up @@ -167,6 +200,11 @@ async def _do_connect():
self._connect_error = str(e)
self._last_close_reason = str(e)
logger.exception(f"Background connect failed: {e}")
self._publish_cloud_error(
str(e),
type(e).__name__,
error_type="cloud_connection_failed",
)

self._connecting = True
self._connect_error = None
Expand Down Expand Up @@ -206,8 +244,18 @@ async def disconnect(self) -> None:
logger.warning(
"Livepeer client disconnect timed out after 15s, forcing cleanup"
)
self._publish_cloud_error(
"Livepeer client disconnect timed out after 15s",
"TimeoutError",
error_type="cloud_disconnect_error",
)
except Exception as e:
logger.warning(f"Error during Livepeer client disconnect: {e}")
self._publish_cloud_error(
str(e),
type(e).__name__,
error_type="cloud_disconnect_error",
)
self._client = None
self._configured = False
self._connecting = False
Expand Down
Loading