Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
Expand Down
3 changes: 3 additions & 0 deletions openai_agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ Each directory contains a complete example with its own README for detailed inst
- **[Customer Service](./customer_service/README.md)** - Interactive customer service agent with escalation capabilities, demonstrating conversational workflows.
- **[Reasoning Content](./reasoning_content/README.md)** - Example of how to retrieve the thought process of reasoning models.
- **[Financial Research Agent](./financial_research_agent/README.md)** - Multi-agent financial research system with planner, search, analyst, writer, and verifier agents collaborating.
- **[Streaming](./streaming/README.md)** - Buffered token streaming (events coalesced into batches over a configurable flush interval, default 100ms) via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch][workflow-stream-branch] of sdk-python.**

[workflow-stream-branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub
90 changes: 90 additions & 0 deletions openai_agents/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Streaming OpenAI Agents

> **Experimental.** These samples target the streaming hooks added to
> `temporalio.contrib.openai_agents` on the [`contrib/pubsub` branch of
> sdk-python][branch], which is not yet released. Install sdk-python
> from that branch (e.g. `uv pip install -e <path-to-sdk-python>` after
> checking out the branch) to run them locally.

[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub

The OpenAI Agents SDK supports streaming via `Runner.run_streamed`, which
yields `TResponseStreamEvent`s as the model produces them. Inside a
Temporal workflow the model call runs in an activity, so the workflow
cannot iterate the live HTTP stream directly. The plugin's streaming
support runs `model.stream_response()` in the activity and publishes
each event to the workflow's `temporalio.contrib.workflow_stream`. The
publisher coalesces events into batches over `streaming_event_batch_interval`
(default 100ms) before sending them as a signal — call this **buffered
token streaming**: deltas reach external subscribers within a batch
window of being produced, not on every byte. At typical model speeds a
single batch carries multiple tokens, so output arrives in small bursts
rather than glyph-by-glyph — close enough for most UIs, though the
cadence is visible next to a true per-token render. Tune
`streaming_event_batch_interval` to trade signal volume for smoothness.

The two samples here mirror the upstream openai-agents-python basic
streaming examples.

## `stream_text` — buffered text deltas

Adapted from [`examples/basic/stream_text.py`][upstream-text]. Subscribes
to `ResponseTextDeltaEvent`s and prints them as they arrive (batched at
the broker's flush interval, see above).

[upstream-text]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_text.py

```bash
# Terminal 1
uv run openai_agents/streaming/run_worker.py

# Terminal 2
uv run openai_agents/streaming/run_stream_text_workflow.py
```

## `stream_items` — agent-level events with a tool call

Adapted from [`examples/basic/stream_items.py`][upstream-items]. Renders
agent updates, tool calls, tool outputs, and message outputs as a
play-by-play.

[upstream-items]: https://github.com/openai/openai-agents-python/blob/main/examples/basic/stream_items.py

```bash
uv run openai_agents/streaming/run_stream_items_workflow.py
```

## How it works

1. The workflow constructs a `WorkflowStream` from `@workflow.init`.
2. The plugin's `OpenAIAgentsPlugin` is configured with
`streaming_event_topic="events"`. The plugin routes
`Runner.run_streamed` calls to `invoke_model_activity_streaming`.
3. Inside that activity, each `TResponseStreamEvent` from the live HTTP
stream is appended to a list (returned to the workflow when the
activity completes) **and** published to the stream via
`WorkflowStreamClient.from_activity()`.
4. The workflow publishes a sentinel to a separate `done` topic right
before returning, so the subscriber knows the stream is finished.
5. External code subscribes with `WorkflowStreamClient.create(...).subscribe(
["events", "done"])` and breaks on the `done` event. We leave
`result_type` unset and decode events manually because the two
topics carry different types. The runner also races the consumer
against `handle.result()` so a workflow failure surfaces as an
exception rather than blocking the subscriber forever.

In the workflow, `stream_events()` resolves only after the activity
returns, so the workflow itself does not see deltas as they arrive — the
streaming benefit is for external observers. If you want the workflow to
react incrementally, subscribe from a child workflow or activity rather
than from the workflow that hosts the stream.

## Notes

* `streaming_event_topic` defaults to `None` (no publishing). Set it on
`ModelActivityParameters` to a topic such as `"events"` to publish raw
stream events.
* Streaming is incompatible with `use_local_activity=True`: local
activities can neither heartbeat nor send signals back to the workflow.
* The workflow must host a `WorkflowStream`. Without one, the plugin's
publish signals are unhandled and silently dropped by Temporal.
Empty file.
Empty file.
11 changes: 11 additions & 0 deletions openai_agents/streaming/activities/joke_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

import random

from temporalio import activity


@activity.defn
async def how_many_jokes() -> int:
"""Return a random integer of jokes to tell between 1 and 10 (inclusive)."""
return random.randint(1, 10)
70 changes: 70 additions & 0 deletions openai_agents/streaming/run_stream_items_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from __future__ import annotations

import asyncio
import uuid

from agents import ItemHelpers
from agents.items import TResponseStreamEvent
from temporalio.api.common.v1 import Payload
from temporalio.client import Client
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from temporalio.contrib.workflow_stream import WorkflowStreamClient

from openai_agents.streaming.shared import (
TASK_QUEUE,
TOPIC_DONE,
TOPIC_EVENTS,
race_with_workflow,
)
from openai_agents.streaming.workflows.stream_items_workflow import (
StreamItemsInput,
StreamItemsWorkflow,
)


async def main() -> None:
client = await Client.connect(
"localhost:7233",
plugins=[OpenAIAgentsPlugin()],
)

workflow_id = f"stream-items-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
StreamItemsWorkflow.run,
StreamItemsInput(),
id=workflow_id,
task_queue=TASK_QUEUE,
)

stream = WorkflowStreamClient.create(client, workflow_id)
converter = client.data_converter.payload_converter

async def render() -> None:
print("=== Run starting ===")
async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]):
if item.topic == TOPIC_DONE:
return
assert isinstance(item.data, Payload)
event = converter.from_payload(item.data, TResponseStreamEvent)
if event.type == "raw_response_event":
continue
if event.type == "agent_updated_stream_event":
print(f"Agent updated: {event.new_agent.name}")
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
name = getattr(event.item.raw_item, "name", "Unknown Tool")
print(f"-- Tool was called: {name}")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(
"-- Message output:\n "
f"{ItemHelpers.text_message_output(event.item)}"
)

await race_with_workflow(render(), handle)
print("=== Run complete ===")


if __name__ == "__main__":
asyncio.run(main())
64 changes: 64 additions & 0 deletions openai_agents/streaming/run_stream_text_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from __future__ import annotations

import asyncio
import uuid

from agents.items import TResponseStreamEvent
from openai.types.responses import ResponseTextDeltaEvent
from temporalio.api.common.v1 import Payload
from temporalio.client import Client
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from temporalio.contrib.workflow_stream import WorkflowStreamClient

from openai_agents.streaming.shared import (
TASK_QUEUE,
TOPIC_DONE,
TOPIC_EVENTS,
race_with_workflow,
)
from openai_agents.streaming.workflows.stream_text_workflow import (
StreamTextInput,
StreamTextWorkflow,
)


async def main() -> None:
client = await Client.connect(
"localhost:7233",
plugins=[OpenAIAgentsPlugin()],
)

workflow_id = f"stream-text-{uuid.uuid4().hex[:8]}"
handle = await client.start_workflow(
StreamTextWorkflow.run,
StreamTextInput(prompt="Please tell me 5 jokes."),
id=workflow_id,
task_queue=TASK_QUEUE,
)

stream = WorkflowStreamClient.create(client, workflow_id)
converter = client.data_converter.payload_converter

async def render() -> None:
# Subscribe to both the streaming-event topic and the workflow's
# done-sentinel so we can break cleanly without racing
# handle.result() against the next poll. result_type is left
# unset (we get raw Payloads) because the two topics carry
# different types — we decode based on item.topic.
async for item in stream.subscribe([TOPIC_EVENTS, TOPIC_DONE]):
if item.topic == TOPIC_DONE:
return
assert isinstance(item.data, Payload)
event = converter.from_payload(item.data, TResponseStreamEvent)
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
print(event.data.delta, end="", flush=True)

result = await race_with_workflow(render(), handle)
print("\n--- final result ---")
print(result)


if __name__ == "__main__":
asyncio.run(main())
55 changes: 55 additions & 0 deletions openai_agents/streaming/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import annotations

import asyncio
import logging
from datetime import timedelta

from temporalio.client import Client
from temporalio.contrib.openai_agents import (
ModelActivityParameters,
OpenAIAgentsPlugin,
)
from temporalio.worker import Worker

from openai_agents.streaming.activities.joke_activities import how_many_jokes
from openai_agents.streaming.shared import TASK_QUEUE, TOPIC_EVENTS
from openai_agents.streaming.workflows.stream_items_workflow import (
StreamItemsWorkflow,
)
from openai_agents.streaming.workflows.stream_text_workflow import (
StreamTextWorkflow,
)


async def main() -> None:
logging.basicConfig(level=logging.INFO)
client = await Client.connect(
"localhost:7233",
plugins=[
OpenAIAgentsPlugin(
model_params=ModelActivityParameters(
# Streaming relies on heartbeats to detect a stuck
# LLM call. Pick a heartbeat_timeout comfortably
# larger than the expected delta cadence.
heartbeat_timeout=timedelta(seconds=10),
start_to_close_timeout=timedelta(minutes=5),
# streaming_event_topic defaults to None (no
# publishing). Set to a topic to publish raw stream
# events for external subscribers.
streaming_event_topic=TOPIC_EVENTS,
),
),
],
)

worker = Worker(
client,
task_queue=TASK_QUEUE,
workflows=[StreamTextWorkflow, StreamItemsWorkflow],
activities=[how_many_jokes],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())
79 changes: 79 additions & 0 deletions openai_agents/streaming/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

import asyncio
from collections.abc import Coroutine
from typing import Any, TypeVar

from temporalio.client import WorkflowHandle

TASK_QUEUE = "openai-agents-streaming-task-queue"

# Topic the plugin publishes raw model stream events to. Must match
# OpenAIAgentsPlugin(model_params=ModelActivityParameters(streaming_event_topic=...)).
TOPIC_EVENTS = "events"

# Sentinel topic the workflow publishes to once Runner.run_streamed has
# finished. Subscribers iterate (events, done) and break on the done
# event — this avoids racing handle.result() against the subscriber's
# poll cycle.
TOPIC_DONE = "done"


T = TypeVar("T")


async def race_with_workflow(
consumer: Coroutine[Any, Any, None],
handle: WorkflowHandle[Any, T],
) -> T:
"""Run a subscriber concurrently with the workflow.

If the workflow finishes (success or failure) before the subscriber
sees its sentinel, cancel the subscriber and surface the workflow
result. If the subscriber finishes first (clean sentinel exit),
wait for the workflow result. A non-cancellation failure in the
subscriber is propagated either way.

Without this, a workflow that raises before publishing the sentinel
would leave the subscriber blocked on its next poll forever.
"""
consumer_task = asyncio.create_task(consumer)
result_task = asyncio.create_task(handle.result())
we_cancelled_consumer = False
try:
await asyncio.wait(
[consumer_task, result_task],
return_when=asyncio.FIRST_COMPLETED,
)
# Stop the subscriber whether it reached its sentinel or not.
if not consumer_task.done():
consumer_task.cancel()
we_cancelled_consumer = True
# gather(return_exceptions=True) drains both tasks. Child
# cancellation surfaces as a returned CancelledError; only
# cancellation we initiated is expected — anything else
# (including a third party cancelling the consumer behind
# our back) propagates.
consumer_outcome, workflow_outcome = await asyncio.gather(
consumer_task, result_task, return_exceptions=True
)
if isinstance(consumer_outcome, asyncio.CancelledError):
if not we_cancelled_consumer:
raise consumer_outcome
elif isinstance(consumer_outcome, BaseException):
raise consumer_outcome
if isinstance(workflow_outcome, BaseException):
raise workflow_outcome
return workflow_outcome
finally:
# Idempotent cleanup. try/finally re-raises the in-flight
# exception (if any) after finally completes, so swallowing
# cleanup failures here is safe.
for task in (consumer_task, result_task):
if not task.done():
task.cancel()
for task in (consumer_task, result_task):
try:
await task
except BaseException:
pass
Empty file.
Loading
Loading