Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 116 additions & 12 deletions cli/serve/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import sys
import time
import uuid
from typing import Any

try:
import typer
import uvicorn
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, create_model
except ImportError as e:
raise ImportError(
"The 'm serve' command requires extra dependencies. "
Expand Down Expand Up @@ -90,6 +92,58 @@ def create_openai_error_response(
)


def _json_schema_to_pydantic(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this handles type, but will not handle enum, additionalProperties, nested types, array, $ref, allOf, anyOf

Suggest clarifying caveats in comments? or figuring out if any more validation is viable

schema: dict[str, Any], model_name: str = "DynamicModel"
) -> type[BaseModel]:
"""Convert a JSON Schema to a Pydantic model dynamically.

Args:
schema: JSON Schema definition (must have 'properties' and 'type': 'object').
model_name: Name for the generated Pydantic model.

Returns:
A dynamically created Pydantic model class.

Raises:
ValueError: If the schema is invalid or unsupported.
"""
if not isinstance(schema, dict):
raise ValueError("Schema must be a dictionary")

if schema.get("type") != "object":
raise ValueError("Only object-type schemas are supported")

properties = schema.get("properties", {})
required = schema.get("required", [])

if not properties:
raise ValueError("Schema must have 'properties' field")

# Map JSON Schema types to Python types
type_mapping = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
}

# Build field definitions for create_model
field_definitions: dict[str, Any] = {}
for field_name, field_schema in properties.items():
field_type = field_schema.get("type", "string")
python_type = type_mapping.get(field_type, str)

# Handle optional fields
if field_name in required:
field_definitions[field_name] = (python_type, ...)
else:
field_definitions[field_name] = (python_type | None, None)

return create_model(model_name, **field_definitions)


def _build_model_options(request: ChatCompletionRequest) -> dict:
"""Build model_options dict from OpenAI-compatible request parameters."""
excluded_fields = {
Expand All @@ -108,7 +162,7 @@ def _build_model_options(request: ChatCompletionRequest) -> dict:
"presence_penalty", # Presence penalty - not yet implemented
"frequency_penalty", # Frequency penalty - not yet implemented
"logit_bias", # Logit bias - not yet implemented
"response_format", # Response format (json_object) - not yet implemented
"response_format", # Response format - handled separately
"functions", # Legacy function calling - not yet implemented
"function_call", # Legacy function calling - not yet implemented
"tools", # Tool calling - not yet implemented
Expand Down Expand Up @@ -154,22 +208,71 @@ async def endpoint(request: ChatCompletionRequest):

model_options = _build_model_options(request)

# Handle response_format
format_model: type[BaseModel] | None = None
if request.response_format is not None:
if request.response_format.type == "json_schema":
if request.response_format.json_schema is None:
return create_openai_error_response(
status_code=400,
message="json_schema field is required when response_format.type is 'json_schema'",
error_type="invalid_request_error",
param="response_format.json_schema",
)
try:
format_model = _json_schema_to_pydantic(
request.response_format.json_schema.schema_,
request.response_format.json_schema.name,
)
except ValueError as e:
return create_openai_error_response(
status_code=400,
message=f"Invalid JSON schema: {e!s}",
error_type="invalid_request_error",
param="response_format.json_schema.schema",
)
elif request.response_format.type == "json_object":
# For json_object, we don't enforce a specific schema
# The backend will handle JSON mode if supported
pass

# Check if serve function accepts format parameter
serve_sig = inspect.signature(module.serve)
accepts_format = "format" in serve_sig.parameters
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cacheable/could be done up front? Here it's done in every request but won't change?


# Detect if serve is async or sync and handle accordingly
if inspect.iscoroutinefunction(module.serve):
Copy link
Copy Markdown
Contributor

@planetf1 planetf1 Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar (not identical) code is repeated multiple times - possible opportunity for making common - minor.

# It's async, await it directly
output = await module.serve(
input=request.messages,
requirements=request.requirements,
model_options=model_options,
)
if accepts_format:
output = await module.serve(
input=request.messages,
requirements=request.requirements,
model_options=model_options,
format=format_model,
)
else:
output = await module.serve(
input=request.messages,
requirements=request.requirements,
model_options=model_options,
)
Comment on lines +246 to +258
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these calls be combined? If format defaults to None, is the expectation that module.serve handles that differently? Does module.serve default to a different format value?

else:
# It's sync, run in thread pool to avoid blocking event loop
output = await asyncio.to_thread(
module.serve,
input=request.messages,
requirements=request.requirements,
model_options=model_options,
)
if accepts_format:
output = await asyncio.to_thread(
module.serve,
input=request.messages,
requirements=request.requirements,
model_options=model_options,
format=format_model,
)
else:
output = await asyncio.to_thread(
module.serve,
input=request.messages,
requirements=request.requirements,
model_options=model_options,
)

# system_fingerprint represents backend config hash, not model name
# The model name is already in response.model (line 73)
Expand All @@ -186,6 +289,7 @@ async def endpoint(request: ChatCompletionRequest):
created=created_timestamp,
stream_options=request.stream_options,
system_fingerprint=system_fingerprint,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non-streaming path (return ChatCompletion just below, line 297) returns output.value without validating against format_model. Suggest adding before that return (also needs import json at the top and ValidationError added to the pydantic import):

if format_model is not None and output.value is not None:
    try:
        format_model.model_validate(json.loads(output.value))
    except (json.JSONDecodeError, ValidationError) as e:
        return create_openai_error_response(
            status_code=400,
            message=f"Output does not match required schema: {e!s}",
            error_type="invalid_response_error",
        )

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe OpenAI responses can return output that is not valid for a given schema if things like token limits are hit. Do we want to match that behavior? Or should we always error on our side if the format isn't met?

format_model=format_model,
),
media_type="text/event-stream",
)
Expand Down
20 changes: 19 additions & 1 deletion cli/serve/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,26 @@ class ToolFunction(BaseModel):
function: FunctionDefinition


class JsonSchemaFormat(BaseModel):
"""JSON Schema definition for structured output."""

name: str
"""Name of the schema."""

schema_: dict[str, Any] = Field(alias="schema")
"""JSON Schema definition."""

strict: bool | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used? See related comment - more is needed to really be strict or at least clarify behaviour?

"""Whether to enforce strict schema validation."""

model_config = {"populate_by_name": True}


class ResponseFormat(BaseModel):
type: Literal["text", "json_object"]
type: Literal["text", "json_object", "json_schema"]

json_schema: JsonSchemaFormat | None = None
"""JSON Schema definition when type is 'json_schema'."""


class StreamOptions(BaseModel):
Expand Down
46 changes: 46 additions & 0 deletions cli/serve/streaming.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Streaming utilities for OpenAI-compatible server responses."""

import json
from collections.abc import AsyncGenerator

from pydantic import BaseModel, ValidationError

from mellea.core.base import ModelOutputThunk
from mellea.core.utils import MelleaLogger
from mellea.helpers.openai_compatible_helpers import build_completion_usage
Expand All @@ -23,6 +26,7 @@ async def stream_chat_completion_chunks(
created: int,
stream_options: StreamOptions | None = None,
system_fingerprint: str | None = None,
format_model: type[BaseModel] | None = None,
) -> AsyncGenerator[str, None]:
"""Generate OpenAI-compatible SSE chat completion chunks from a model output.

Expand All @@ -36,6 +40,9 @@ async def stream_chat_completion_chunks(
``include_usage`` field.
system_fingerprint: Backend configuration fingerprint to include in chunks.
Defaults to ``None``.
format_model: Optional Pydantic model for validating structured output.
When provided, the complete streamed output will be validated against
this schema before the final chunk is sent.

Yields:
Server-sent event payload strings representing OpenAI-compatible chat
Expand Down Expand Up @@ -98,6 +105,45 @@ async def stream_chat_completion_chunks(
)
yield f"data: {chunk.model_dump_json()}\n\n"

# Validate format if format_model is provided
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation runs after all content chunks are already sent (lines 68–106), so the error arrives after the client has consumed the data. A few options:

  1. Buffer when format_model is set, validate, then stream or error before emitting anything.
  2. Return a 400 upfront when stream=True + json_schema — simplest for now.
  3. Keep post-hoc but document it — callers can pass format= to the backend for constrained decoding instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to #891 right?

if format_model is not None:
if output.value is None:
error_response = OpenAIErrorResponse(
error=OpenAIError(
message="Output value is None, cannot validate format",
type="invalid_response_error",
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return

try:
# Parse the complete output as JSON
output_json = json.loads(output.value)
# Validate against the Pydantic model
format_model.model_validate(output_json)
except json.JSONDecodeError as e:
error_response = OpenAIErrorResponse(
error=OpenAIError(
message=f"Output is not valid JSON: {e!s}",
type="invalid_response_error",
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return
except ValidationError as e:
error_response = OpenAIErrorResponse(
error=OpenAIError(
message=f"Output does not match required schema: {e!s}",
type="invalid_response_error",
)
)
yield f"data: {error_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
return

# Include usage in final chunk only if explicitly requested via stream_options
# Per OpenAI spec: usage is only included when stream_options.include_usage=True
include_usage = stream_options is not None and stream_options.include_usage
Expand Down
91 changes: 91 additions & 0 deletions docs/examples/m_serve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ A dedicated streaming example for `m serve` that supports both modes:
- `stream=True` returns an uncomputed thunk so the server can emit
incremental Server-Sent Events (SSE) chunks

### m_serve_example_response_format.py
Example demonstrating structured output with the `response_format` parameter.

**Key Features:**
- Supporting the `format` parameter in serve functions
- Structured output validation with JSON schemas
- Three format types: `text`, `json_object`, `json_schema`

### pii_serve.py
Example of serving a PII (Personally Identifiable Information) detection service.

Expand All @@ -29,6 +37,9 @@ Client code for testing the served API endpoints with non-streaming requests.
Client code demonstrating streaming responses using Server-Sent Events (SSE)
against `m_serve_example_streaming.py`.

### client_response_format.py
Client code demonstrating all three `response_format` types with examples.

## Concepts Demonstrated

- **API Deployment**: Exposing Mellea programs as REST APIs
Expand All @@ -37,6 +48,7 @@ against `m_serve_example_streaming.py`.
- **Validation in Production**: Using requirements in deployed services
- **Model Options**: Passing model configuration through API
- **Streaming Responses**: Real-time token streaming via Server-Sent Events (SSE)
- **Structured Output**: Using `response_format` for JSON schema validation

## Basic Pattern

Expand Down Expand Up @@ -84,6 +96,85 @@ m serve docs/examples/m_serve/m_serve_example_streaming.py
python docs/examples/m_serve/client_streaming.py
```

### Response Format

```bash
# Start the response_format example server
m serve docs/examples/m_serve/m_serve_example_response_format.py

# In another terminal, test with the response_format client
python docs/examples/m_serve/client_response_format.py
```

## Response Format Support

The server supports structured output via the `response_format` parameter, which allows you to control the format of the model's response. This is compatible with OpenAI's response format API.

**Three Format Types:**

1. **`text`** (default): Plain text output
2. **`json_object`**: Unstructured JSON output (model decides the schema)
3. **`json_schema`**: Structured output validated against a JSON schema

**Key Features:**
- Automatic JSON schema to Pydantic model conversion
- Schema validation for structured outputs
- OpenAI-compatible API
- Works with the `format` parameter in serve functions

**Example - JSON Schema:**
```python
import openai

client = openai.OpenAI(api_key="na", base_url="http://0.0.0.0:8080/v1")

# Define a schema for structured output
person_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"email": {"type": "string"},
},
"required": ["name", "age", "email"],
}

response = client.chat.completions.create(
messages=[{"role": "user", "content": "Generate a person named Alice"}],
model="granite4:micro-h",
response_format={
"type": "json_schema",
"json_schema": {
"name": "Person",
"schema": person_schema,
"strict": True,
},
},
)

# Response will be valid JSON matching the schema
print(response.choices[0].message.content)
```

**Server Implementation:**
Your serve function must accept a `format` parameter to support `json_schema`:

```python
def serve(
input: list[ChatMessage],
requirements: list[str] | None = None,
model_options: dict | None = None,
format: type | None = None, # Add this parameter
) -> ModelOutputThunk:
result = session.instruct(
description=input[-1].content,
requirements=requirements,
model_options=model_options,
format=format, # Pass to instruct()
)
return result
```

## Streaming Support

The server supports streaming responses via Server-Sent Events (SSE) when the
Expand Down
Loading
Loading