Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions api-reference/pipecat-subagents/base-agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ agent.active -> bool

Whether this agent is currently active.

### activation_args

```python
agent.activation_args -> Optional[dict]
```

The arguments from the most recent activation, or `None` if the agent is inactive. The value is cleared when the agent is deactivated.

### parent

```python
Expand All @@ -87,6 +95,14 @@ agent.bridged -> bool

Whether this agent is bridged (receives pipeline frames from the bus).

### ready

```python
agent.ready -> bool
```

Whether this agent's pipeline has started and is ready to operate.

### started_at

```python
Expand Down
18 changes: 17 additions & 1 deletion api-reference/pipecat-subagents/bus.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,28 @@ pipeline = Pipeline([transport.input(), bridge, transport.output()])

## BusSubscriber

Mixin for objects that receive messages from an `AgentBus`. Implementors override `on_bus_message()` to handle incoming messages.
Mixin for objects that receive messages from an `AgentBus`. Implementors override `on_bus_message()` to handle incoming messages. Concrete subscribers must provide a `name` property (typically inherited from `BaseObject`) that uniquely identifies the subscriber on the bus.

```python
from pipecat_subagents.bus.subscriber import BusSubscriber

class MySubscriber(BusSubscriber):
@property
def name(self) -> str:
return "my_subscriber"

async def on_bus_message(self, message: BusMessage) -> None:
...
```

### Properties

<ParamField path="name" type="str" required>
Unique name identifying this subscriber on the bus. Built-in subscribers inherit this from `BaseObject`; custom implementations that extend `BusSubscriber` directly must provide one.
</ParamField>

### Methods

<ParamField path="on_bus_message" type="async (message: BusMessage) -> None">
Override to handle an incoming bus message.
</ParamField>
13 changes: 1 addition & 12 deletions api-reference/pipecat-subagents/decorators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def get_weather(self, params, location: str, unit: str = "celsius"):

Mark an agent method as a task handler.

Decorated methods are automatically collected by [`BaseAgent`](/api-reference/pipecat-subagents/base-agent) at initialization and dispatched when matching task requests arrive.
Decorated methods are automatically collected by [`BaseAgent`](/api-reference/pipecat-subagents/base-agent) at initialization and dispatched when matching task requests arrive. Each request runs in its own asyncio task so the bus message loop is never blocked.

```python
from pipecat_subagents.agents import task
Expand All @@ -78,12 +78,6 @@ async def on_task_request(self, message):

# Named handler (receives only "research" requests)
@task(name="research")
async def on_research(self, message):
result = await do_research(message.payload)
await self.send_task_response(result)

# Parallel handler (each request runs concurrently)
@task(name="research", parallel=True)
async def on_research(self, message):
result = await do_research(message.payload)
await self.send_task_response(result)
Expand All @@ -97,11 +91,6 @@ async def on_research(self, message):
matching named handler).
</ParamField>

<ParamField path="parallel" type="bool" default="False">
When `True`, each request runs in a separate asyncio task for concurrent
execution.
</ParamField>

### Method Signature

Task handler methods receive a [`BusTaskRequestMessage`](/api-reference/pipecat-subagents/messages#bustaskrequestmessage):
Expand Down
11 changes: 2 additions & 9 deletions subagents/learn/task-coordination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,7 @@ async with self.task("worker", name="research", payload={"topic": "AI"}) as t:
pass
```

Set `parallel=True` to allow concurrent execution of multiple requests:

```python
@task(parallel=True)
async def on_task_handler(self, message: BusTaskRequestMessage):
# Each request runs in its own asyncio task
await self.send_task_response(message.task_id, {"done": True})
```
Each request runs in its own asyncio task, so multiple requests to the same handler are executed concurrently without blocking the bus message loop.

### Overriding on_task_request

Expand All @@ -112,7 +105,7 @@ class MyWorker(BaseAgent):
This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.

<Note>
`send_task_response()`, `send_task_update()`, and `send_task_stream_*()` all require an explicit `task_id`. This lets a worker handle multiple concurrent tasks -- typically with `@task(parallel=True)` -- and respond to each one correctly. For simple handlers, pass `message.task_id` from the request. For asynchronous responses (see the example below), track the `task_id` yourself until you're ready to respond.
`send_task_response()`, `send_task_update()`, and `send_task_stream_*()` all require an explicit `task_id`. This lets a worker handle multiple concurrent tasks and respond to each one correctly. For simple handlers, pass `message.task_id` from the request. For asynchronous responses (see the example below), track the `task_id` yourself until you're ready to respond.
</Note>

## Building a task system
Expand Down
Loading