Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions api-reference/pipecat-subagents/decorators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ async def on_task_request(self, message):

# Named handler (receives only "research" requests)
@task(name="research")
async def on_research(self, message):
result = await do_research(message.payload)
await self.send_task_response(result)

# Parallel handler (each request runs concurrently)
@task(name="research", parallel=True)
async def on_research(self, message):
result = await do_research(message.payload)
await self.send_task_response(result)
Expand All @@ -97,10 +91,9 @@ async def on_research(self, message):
matching named handler).
</ParamField>

<ParamField path="parallel" type="bool" default="False">
When `True`, each request runs in a separate asyncio task for concurrent
execution.
</ParamField>
<Note>
Each task request runs in its own asyncio task so the bus message loop is never blocked. Multiple tasks can be in flight simultaneously.
</Note>

### Method Signature

Expand Down
11 changes: 1 addition & 10 deletions subagents/learn/task-coordination.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,6 @@ async with self.task("worker", name="research", payload={"topic": "AI"}) as t:
pass
```

Set `parallel=True` to allow concurrent execution of multiple requests:

```python
@task(parallel=True)
async def on_task_handler(self, message: BusTaskRequestMessage):
# Each request runs in its own asyncio task
await self.send_task_response(message.task_id, {"done": True})
```

### Overriding on_task_request

Alternatively, you can override `on_task_request()` directly without the `@task` decorator:
Expand All @@ -112,7 +103,7 @@ class MyWorker(BaseAgent):
This is useful when you need custom routing logic or want to integrate with an existing pipeline, as shown in the example below.

<Note>
`send_task_response()`, `send_task_update()`, and `send_task_stream_*()` all require an explicit `task_id`. This lets a worker handle multiple concurrent tasks -- typically with `@task(parallel=True)` -- and respond to each one correctly. For simple handlers, pass `message.task_id` from the request. For asynchronous responses (see the example below), track the `task_id` yourself until you're ready to respond.
`send_task_response()`, `send_task_update()`, and `send_task_stream_*()` all require an explicit `task_id`. This lets a worker handle multiple concurrent tasks and respond to each one correctly. For simple handlers, pass `message.task_id` from the request. For asynchronous responses (see the example below), track the `task_id` yourself until you're ready to respond.
</Note>

## Building a task system
Expand Down
Loading