Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0afc0e2
Add TCK report gap summary gate
MisterVVP Jun 16, 2026
4e0265d
Preserve TCK reports and add zero-gap summarizer/checker
MisterVVP Jun 16, 2026
230a03d
Preserve TCK reports and add zero-gap summarizer/checker
MisterVVP Jun 16, 2026
ba90149
Preserve TCK reports and add zero-gap summarizer/checker
MisterVVP Jun 17, 2026
9575805
Add Task subscription stream support and TCK report verification
MisterVVP Jun 17, 2026
8f847ce
Add Task subscription streaming support and TCK report verification
MisterVVP Jun 17, 2026
db0eecd
Add task subscription streaming (SubscribeTask) and TCK report verifi…
MisterVVP Jun 17, 2026
edfdf3f
Add task subscription streaming (SubscribeTask) with SSE support, tas…
MisterVVP Jun 17, 2026
902ccf1
ci: report tck gaps without failing workflow
MisterVVP Jun 17, 2026
d245b34
fix: allow transports to cancel live stream sessions
MisterVVP Jun 17, 2026
6e1ce01
fix: expose subscription session cancellation
MisterVVP Jun 17, 2026
d8cf384
fix: wake blocked subscription sessions on cancel
MisterVVP Jun 17, 2026
78dda91
fix: cancel live grpc streams on client disconnect
MisterVVP Jun 17, 2026
39dd328
fix: use get for http task subscription
MisterVVP Jun 17, 2026
2698a66
fix: restore http json transport and use get for subscribe
MisterVVP Jun 17, 2026
b4df5d8
fix: keep aggregate tck gaps in summary
MisterVVP Jun 17, 2026
39f01cc
clang fmt
Jun 17, 2026
d727a5d
fix: loop conditions
Jun 18, 2026
3ba8175
fix: restore finite stream draining loops
MisterVVP Jun 18, 2026
866d283
fix: make stream session readiness explicit
MisterVVP Jun 18, 2026
ded3bc3
fix: accept get task subscriptions and use stream loop conditions
MisterVVP Jun 18, 2026
39e7f1b
fix: keep finite streams drainable
MisterVVP Jun 18, 2026
54e168c
fix: keep finite stream sessions drainable
MisterVVP Jun 18, 2026
ef1a3ef
fix: handle rest subscriptions with get
MisterVVP Jun 18, 2026
716f33b
fix: clang fmt
Jun 18, 2026
370d980
fix: align stream tests with get subscriptions
MisterVVP Jun 19, 2026
99ba294
fix: restore tck subscribe method and conditional stream drains
MisterVVP Jun 19, 2026
6fb0b56
fix: bugfix
Jun 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/tck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ jobs:
TCK_LOG_DIR: tck-artifacts/logs/inmemory
run: ./scripts/run_tck_mandatory.sh

- name: Summarize in-memory TCK report gaps
run: python3 scripts/summarize_tck_report.py tck-artifacts/reports/inmemory/compatibility.json

- name: Stop deterministic in-memory SUT
if: always()
run: ./scripts/stop_tck_sut.sh
Expand All @@ -89,6 +92,9 @@ jobs:
TCK_LOG_DIR: tck-artifacts/logs/postgres
run: ./scripts/run_tck_mandatory.sh

- name: Summarize PostgreSQL TCK report gaps
run: python3 scripts/summarize_tck_report.py tck-artifacts/reports/postgres/compatibility.json

- name: Stop deterministic PostgreSQL SUT
if: always()
run: ./scripts/stop_tck_sut.sh
Expand Down
23 changes: 18 additions & 5 deletions examples/example_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "a2a/server/request_context.h"
#include "a2a/server/server_stream_session.h"
#include "a2a/server/task_id_generator.h"
#include "a2a/server/task_subscription_service.h"
#include "a2a/server/tasks/in_memory_task_store.h"
#include "a2a/server/tasks/list_tasks.h"
#include "a2a/server/tasks/task_history.h"
Expand All @@ -50,6 +51,7 @@ constexpr std::string_view kStructuredDataKey = "key";
constexpr std::string_view kStructuredDataValue = "value";
constexpr std::string_view kStructuredDataCountKey = "count";
constexpr double kStructuredDataCount = 42.0;
constexpr std::string_view kStreamingTaskIdPrefix = "task-stream-";

[[nodiscard]] bool IsTaskNotFoundError(const core::Error& error) {
return error.code() == core::ErrorCode::kRemoteProtocol && error.protocol_code().has_value() &&
Expand Down Expand Up @@ -195,6 +197,7 @@ class ExampleExecutor final : public server::AgentExecutor {
if (!notify.ok()) {
return notify.error();
}
subscriptions_.PublishTaskUpdated(task);

lf::a2a::v1::SendMessageResponse response;
response.mutable_message()->set_role(lf::a2a::v1::ROLE_AGENT);
Expand All @@ -218,14 +221,13 @@ class ExampleExecutor final : public server::AgentExecutor {

core::Result<std::unique_ptr<server::ServerStreamSession>> SendStreamingMessage(
const lf::a2a::v1::SendMessageRequest& request, server::RequestContext& context) override {
(void)context;
std::string task_id = request.has_message() ? request.message().task_id() : "";
if (task_id.empty()) {
if (request.has_message() && !request.message().message_id().empty()) {
auto task_id_result = lifecycle_.ResolveTaskIdForSendRequest(request, context);
if (!task_id_result.ok()) {
return task_id_result.error();
}
task_id = task_id_result.value();
task_id.reserve(kStreamingTaskIdPrefix.size() + request.message().message_id().size());
task_id.append(kStreamingTaskIdPrefix);
task_id.append(request.message().message_id());
} else {
task_id = "task-test-stream-default";
}
Expand Down Expand Up @@ -281,6 +283,15 @@ class ExampleExecutor final : public server::AgentExecutor {
return task;
}

core::Result<std::unique_ptr<server::ServerStreamSession>> SubscribeTask(const lf::a2a::v1::GetTaskRequest& request,
server::RequestContext& context) override {
auto task = GetTask(request, context);
if (!task.ok()) {
return task.error();
}
return subscriptions_.Subscribe(task.value());
}

core::Result<server::ListTasksResponse> ListTasks(const server::ListTasksRequest& request,
server::RequestContext& context) override {
(void)context;
Expand All @@ -298,6 +309,7 @@ class ExampleExecutor final : public server::AgentExecutor {
if (!notify.ok()) {
return notify.error();
}
subscriptions_.PublishTaskUpdated(task.value());
return task.value();
}

Expand Down Expand Up @@ -336,6 +348,7 @@ class ExampleExecutor final : public server::AgentExecutor {
std::shared_ptr<server::TaskIdGenerator> task_id_generator_;
server::TaskLifecycleService lifecycle_;
server::PushNotificationService push_notifications_;
server::TaskSubscriptionService subscriptions_;
std::uint64_t status_timestamp_counter_ = 0;
};

Expand Down
32 changes: 32 additions & 0 deletions include/a2a/server/agent_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
#pragma once

#include <memory>
#include <optional>
#include <utility>

#include "a2a/core/protocol_errors.h"
#include "a2a/core/result.h"
#include "a2a/core/task_states.h"
#include "a2a/server/request_context.h"
#include "a2a/server/server_stream_session.h"
#include "a2a/server/tasks/list_tasks.h"
Expand All @@ -27,6 +30,35 @@ class AgentExecutor {
[[nodiscard]] virtual core::Result<lf::a2a::v1::Task> GetTask(const lf::a2a::v1::GetTaskRequest& request,
RequestContext& context) = 0;

[[nodiscard]] virtual core::Result<std::unique_ptr<ServerStreamSession>> SubscribeTask(
const lf::a2a::v1::GetTaskRequest& request, RequestContext& context) {
class CurrentTaskStreamSession final : public ServerStreamSession {
public:
explicit CurrentTaskStreamSession(lf::a2a::v1::Task task) { *event_.mutable_task() = std::move(task); }

[[nodiscard]] core::Result<std::optional<lf::a2a::v1::StreamResponse>> Next() override {
if (sent_) {
return std::optional<lf::a2a::v1::StreamResponse>{};
}
sent_ = true;
return std::optional<lf::a2a::v1::StreamResponse>{event_};
}

private:
lf::a2a::v1::StreamResponse event_;
bool sent_ = false;
};

auto task = GetTask(request, context);
if (!task.ok()) {
return task.error();
}
if (core::IsTerminalTaskState(task.value().status().state())) {
return core::protocol_errors::UnsupportedOperation("task is already terminal");
}
return std::unique_ptr<ServerStreamSession>(std::make_unique<CurrentTaskStreamSession>(std::move(task.value())));
}

[[nodiscard]] virtual core::Result<ListTasksResponse> ListTasks(const ListTasksRequest& request,
RequestContext& context) = 0;

Expand Down
1 change: 1 addition & 0 deletions include/a2a/server/dispatch_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum class DispatcherOperation : std::uint8_t {
kSendMessage,
kSendStreamingMessage,
kGetTask,
kSubscribeTask,
kListTasks,
kCancelTask,
kCreateTaskPushNotificationConfig,
Expand Down
4 changes: 4 additions & 0 deletions include/a2a/server/rest_server_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <chrono>
#include <functional>
#include <optional>
#include <string>
#include <string_view>
Expand All @@ -15,6 +16,8 @@

namespace a2a::server {

class HttpByteTransport;

struct HttpServerRequest final {
std::string method;
std::string target;
Expand All @@ -28,6 +31,7 @@ struct HttpServerResponse final {
int status_code = kDefaultStatusCode;
std::unordered_map<std::string, std::string> headers;
std::string body;
std::function<core::Result<void>(HttpByteTransport&)> stream_writer;
};

struct RestServerTransportOptions final {
Expand Down
4 changes: 4 additions & 0 deletions include/a2a/server/rest_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <functional>
#include <optional>
#include <string>
#include <string_view>
Expand All @@ -16,6 +17,8 @@

namespace a2a::server {

class HttpByteTransport;

struct RestEndpointPaths final {
static constexpr std::string_view kSendMessage = "/message:send";
static constexpr std::string_view kSendStreamingMessage = "/message:stream";
Expand All @@ -37,6 +40,7 @@ struct RestResponse final {
int http_status = kDefaultHttpStatus;
std::unordered_map<std::string, std::string> headers;
std::string body;
std::function<core::Result<void>(HttpByteTransport&)> stream_writer;
};

struct RestRoute final {
Expand Down
2 changes: 2 additions & 0 deletions include/a2a/server/server_stream_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class ServerStreamSession {
virtual ~ServerStreamSession() = default;

[[nodiscard]] virtual core::Result<std::optional<lf::a2a::v1::StreamResponse>> Next() = 0;
[[nodiscard]] virtual bool IsLive() const noexcept { return true; }
virtual void Cancel() noexcept {}
};

} // namespace a2a::server
77 changes: 77 additions & 0 deletions include/a2a/server/stream_response_coroutine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2026 Vladimir Pavlov <mistervvp@outlook.com> (https://github.com/MisterVVP)

#pragma once

#include <coroutine>
#include <exception>
#include <optional>
#include <utility>

#include "a2a/v1/a2a.pb.h"

namespace a2a::server {

class StreamResponseCoroutine final {
public:
struct promise_type final {
[[nodiscard]] StreamResponseCoroutine get_return_object() {
return StreamResponseCoroutine(std::coroutine_handle<promise_type>::from_promise(*this));
}
[[nodiscard]] std::suspend_always initial_suspend() noexcept { return {}; }
[[nodiscard]] std::suspend_always final_suspend() noexcept { return {}; }
void return_void() noexcept {}
void unhandled_exception() { exception_ = std::current_exception(); }
[[nodiscard]] std::suspend_always yield_value(lf::a2a::v1::StreamResponse value) noexcept {
current_value_ = std::move(value);
return {};
}

std::optional<lf::a2a::v1::StreamResponse> current_value_;
std::exception_ptr exception_;
};

StreamResponseCoroutine() = default;
StreamResponseCoroutine(const StreamResponseCoroutine&) = delete;
StreamResponseCoroutine& operator=(const StreamResponseCoroutine&) = delete;

StreamResponseCoroutine(StreamResponseCoroutine&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
StreamResponseCoroutine& operator=(StreamResponseCoroutine&& other) noexcept {
if (this != &other) {
Destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}

~StreamResponseCoroutine() { Destroy(); }

[[nodiscard]] std::optional<lf::a2a::v1::StreamResponse> Next() {
if (!handle_ || handle_.done()) {
return std::nullopt;
}
handle_.promise().current_value_.reset();
handle_.resume();
if (handle_.promise().exception_ != nullptr) {
std::rethrow_exception(handle_.promise().exception_);
}
if (handle_.done()) {
return std::nullopt;
}
return std::move(handle_.promise().current_value_);
}

private:
explicit StreamResponseCoroutine(std::coroutine_handle<promise_type> handle) : handle_(handle) {}

void Destroy() noexcept {
if (handle_) {
handle_.destroy();
handle_ = {};
}
}

std::coroutine_handle<promise_type> handle_;
};

} // namespace a2a::server
65 changes: 65 additions & 0 deletions include/a2a/server/task_subscription_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2026 Vladimir Pavlov <mistervvp@outlook.com> (https://github.com/MisterVVP)

#pragma once

#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>

#include "a2a/core/protocol_errors.h"
#include "a2a/core/result.h"
#include "a2a/core/task_states.h"
#include "a2a/server/server_stream_session.h"
#include "a2a/server/stream_response_coroutine.h"
#include "a2a/v1/a2a.pb.h"

namespace a2a::server {

class TaskSubscriptionService final {
public:
[[nodiscard]] core::Result<std::unique_ptr<ServerStreamSession>> Subscribe(const lf::a2a::v1::Task& task);
void PublishTaskUpdated(const lf::a2a::v1::Task& task);

private:
struct SubscriberState final {
std::string task_id;
lf::a2a::v1::Task current_task;
std::deque<lf::a2a::v1::StreamResponse> events;
bool closed = false;
std::mutex mutex;
std::condition_variable ready;
};

class SubscriptionSession final : public ServerStreamSession {
public:
SubscriptionSession(TaskSubscriptionService* owner, std::shared_ptr<SubscriberState> state);
~SubscriptionSession() override;

[[nodiscard]] core::Result<std::optional<lf::a2a::v1::StreamResponse>> Next() override;
[[nodiscard]] bool IsLive() const noexcept override { return true; }
void Cancel() noexcept override;

private:
TaskSubscriptionService* owner_ = nullptr;
std::shared_ptr<SubscriberState> state_;
StreamResponseCoroutine coroutine_;
};

void RemoveSubscriber(const std::shared_ptr<SubscriberState>& state);
static std::optional<lf::a2a::v1::StreamResponse> WaitForPublishedEvent(
const std::shared_ptr<SubscriberState>& state);
static StreamResponseCoroutine RunSubscription(std::shared_ptr<SubscriberState> state);
static lf::a2a::v1::StreamResponse BuildCurrentTaskEvent(const lf::a2a::v1::Task& task);
static lf::a2a::v1::StreamResponse BuildStatusUpdateEvent(const lf::a2a::v1::Task& task);

std::mutex mutex_;
std::unordered_map<std::string, std::vector<std::weak_ptr<SubscriberState>>> subscribers_by_task_id_;
};

} // namespace a2a::server
Loading
Loading