Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions livekit-plugins/livekit-plugins-facemarket/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# livekit-plugins-facemarket

`livekit-plugins-facemarket` is a lightweight Avatar Plugin for LiveKit Agents.

It is designed for the narrow integration model confirmed for this project:

- LiveKit Agents users only
- caller owns `STT / LLM / TTS`
- plugin is responsible for avatar session orchestration and signaling
- FaceMarket backend is responsible for starting renderer/coordinator participants

## Install

```bash
pip install livekit-plugins-facemarket
```

## Quick Start

```python
from livekit.agents import RoomOutputOptions
from livekit.plugins.facemarket import AvatarSession

avatar = AvatarSession(
avatar_id="2",
platform_api_key="your-app-key",
livekit_url="wss://your-livekit-host",
livekit_api_key="your-livekit-api-key",
livekit_api_secret="your-livekit-api-secret",
)

@avatar.on("session_ready")
async def on_session_ready() -> None:
print("avatar session is ready")

await session.start(
agent=agent,
room=ctx.room,
room_output_options=RoomOutputOptions(audio_enabled=False),
)

await avatar.start(agent_session=session, room=ctx.room)

await avatar.stop()
```

## Get More

See the [FaceMarket LiveAvatar Integration Docs](https://github.com/newportAI-lab/liveavatar-integration-guide) for more information.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from .api import FaceMarketAPI
from .avatar import AvatarSession
from .exceptions import (
FaceMarketError,
FaceMarketPlatformError,
FaceMarketSessionError,
SessionReadyTimeoutError,
)
from .version import __version__

__all__ = [
"FaceMarketAPI",
"AvatarSession",
"FaceMarketError",
"FaceMarketPlatformError",
"FaceMarketSessionError",
"SessionReadyTimeoutError",
"__version__",
]

from livekit.agents import Plugin

from .log import logger


class FaceMarketPlugin(Plugin):
def __init__(self) -> None:
super().__init__(__name__, __version__, __package__, logger)


Plugin.register_plugin(FaceMarketPlugin())
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

import asyncio
import json
import ssl
from typing import Any

import aiohttp
import certifi

from .exceptions import FaceMarketPlatformError
from .log import logger
from .schemas import SessionInfo, StartSessionRequest

BASE_URL = "https://facemarket.ai/vih"
AUTH_PATH = "/dispatcher/auth/session/token"
START_PATH = "/dispatcher/v1/session/start"
STOP_PATH = "/dispatcher/v1/session/stop"
DEFAULT_TIMEOUT = 10.0
DEFAULT_RETRIES = 2


def _redact_payload(value: Any) -> Any:
if isinstance(value, dict):
return {
key: "<redacted>"
if key.lower().endswith("token") or key.lower() == "authorization"
else _redact_payload(item)
for key, item in value.items()
}
if isinstance(value, list):
return [_redact_payload(item) for item in value]
return value


class FaceMarketAPI:
def __init__(
self,
*,
platform_api_key: str,
base_url: str = BASE_URL,
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = DEFAULT_RETRIES,
) -> None:
self._platform_api_key = platform_api_key
self._base_url = base_url.rstrip("/")
self._timeout = aiohttp.ClientTimeout(total=timeout)
self._max_retries = max_retries
self._auth_token: str | None = None
self._ssl_context = ssl.create_default_context(cafile=certifi.where())

async def start_session(self, request: StartSessionRequest) -> SessionInfo:
request_payload = request.to_payload()
request_headers = {"Authorization": f"Bearer {self._platform_api_key}"}
logger.info(
"FaceMarket start request payload=%s headers=%s",
_redact_payload(request_payload),
_redact_payload(request_headers),
)
payload = await self._request_json(
"POST",
START_PATH,
headers=request_headers,
json_body=request_payload,
)

data = payload.get("data") or {}
logger.info("FaceMarket start response payload=%s", _redact_payload(payload))
if data.get("roomToken") and data.get("livekitUrl"):
raise FaceMarketPlatformError(
"FaceMarket start returned hosted-room response, not plugin-mode response. "
"Expected data.sessionId/session_id and renderer/coordinator joining the provided room."
)
session_id = (
payload.get("sessionId")
or payload.get("session_id")
or data.get("sessionId")
or data.get("session_id")
)
if not session_id:
raise FaceMarketPlatformError("FaceMarket start response did not contain sessionId")

return SessionInfo(
session_id=str(session_id),
room_id=str(data["roomId"]) if data.get("roomId") else None,
livekit_url=str(data["livekitUrl"]) if data.get("livekitUrl") else None,
room_token=str(data["roomToken"]) if data.get("roomToken") else None,
green_screen=data.get("greenScreen"),
)
Comment thread
wuguangmou marked this conversation as resolved.

async def stop_session(self, session_id: str) -> None:
request_body = {"sessionId": session_id, "roomId": session_id}
request_headers = {"Authorization": f"Bearer {self._platform_api_key}"}
logger.info(
"FaceMarket stop request payload=%s headers=%s",
_redact_payload(request_body),
_redact_payload(request_headers),
)
await self._request_json(
"POST",
STOP_PATH,
headers=request_headers,
json_body=request_body,
)

async def _request_json(
self,
method: str,
path: str,
*,
params: dict[str, str] | None = None,
headers: dict[str, str] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any]:
url = f"{self._base_url}{path}"
attempts = self._max_retries + 1
last_error: Exception | None = None

for attempt in range(attempts):
try:
connector = aiohttp.TCPConnector(ssl=self._ssl_context)
async with aiohttp.ClientSession(
timeout=self._timeout, connector=connector
) as http:
async with http.request(
method,
url,
params=params,
headers=headers,
json=json_body,
) as response:
return await self._decode_response(response)
except FaceMarketPlatformError:
raise
Comment on lines +133 to +134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 HTTP 5xx server errors bypass the retry mechanism entirely

_decode_response raises FaceMarketPlatformError for all HTTP status codes >= 400, including transient server errors like 502/503/504 (api.py:152-160). In the retry loop, FaceMarketPlatformError is caught and immediately re-raised (api.py:133-134), completely bypassing the retry logic. Only aiohttp.ClientError and asyncio.TimeoutError are retried. This means the retry mechanism (configured via DEFAULT_RETRIES = 2) is ineffective for transient server-side failures, which are exactly the kind of errors retries are meant to handle.

Prompt for agents
In api.py _request_json method (lines 119-141), the except FaceMarketPlatformError: raise clause on lines 133-134 immediately re-raises ALL platform errors, including retryable HTTP 5xx errors. The _decode_response method (lines 143-160) converts all HTTP >= 400 responses into FaceMarketPlatformError, making no distinction between 4xx (client errors, non-retryable) and 5xx (server errors, typically retryable).

To fix this, consider one of these approaches:
1. In _decode_response, raise a retryable subclass of FaceMarketPlatformError for 5xx status codes. Then in _request_json, only re-raise the non-retryable subclass immediately.
2. Alternatively, in _request_json, check the HTTP status from the exception before deciding to re-raise or retry. For example, attach the status code to FaceMarketPlatformError and check if status >= 500 before retrying.

The relevant methods are FaceMarketAPI._request_json (line 106) and FaceMarketAPI._decode_response (line 143) in api.py.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
last_error = exc
if attempt >= attempts - 1:
break
await asyncio.sleep(0.25 * (attempt + 1))

raise FaceMarketPlatformError(f"FaceMarket API request failed: {last_error!r}")

async def _decode_response(self, response: aiohttp.ClientResponse) -> dict[str, Any]:
raw_text = await response.text()
try:
payload = json.loads(raw_text) if raw_text else {}
except json.JSONDecodeError as exc:
raise FaceMarketPlatformError(
f"FaceMarket API returned non-JSON response: status={response.status}"
) from exc

if response.status >= 400:
logger.info(
"FaceMarket API response status=%s payload=%s",
response.status,
_redact_payload(payload),
)
raise FaceMarketPlatformError(
f"FaceMarket API request failed: status={response.status}, body={_redact_payload(payload)!r}"
)
Comment thread
wuguangmou marked this conversation as resolved.

code = payload.get("code")
if code not in (None, 0):
logger.info(
"FaceMarket API response status=%s payload=%s",
response.status,
_redact_payload(payload),
)
message = payload.get("message") or payload.get("msg") or "unknown platform error"
raise FaceMarketPlatformError(f"FaceMarket API error {code}: {message}")

return payload
Loading
Loading