diff --git a/frontend/bun.lock b/frontend/bun.lock index 73669f25a..bcd8e2775 100644 --- a/frontend/bun.lock +++ b/frontend/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "dependencies": { @@ -26,7 +27,7 @@ "@icons-pack/react-simple-icons": "^13.8.0", "@milkdown/kit": "^7.18.0", "@milkdown/react": "^7.18.0", - "@modelcontextprotocol/sdk": "^1.26.0", + "@modelcontextprotocol/sdk": "^1.29.0", "@module-federation/runtime": "^2.3.2", "@monaco-editor/react": "^4.7.0", "@redpanda-data/ui": "^4.2.0", @@ -709,7 +710,7 @@ "@milkdown/utils": ["@milkdown/utils@7.18.0", "", { "dependencies": { "@milkdown/core": "7.18.0", "@milkdown/ctx": "7.18.0", "@milkdown/exception": "7.18.0", "@milkdown/prose": "7.18.0", "@milkdown/transformer": "7.18.0", "nanoid": "^5.0.9" } }, "sha512-+o/1sky+QwbS0Y92HthTupMFziJKhZUgF7IBS55Ft4Wjt63kX8PHaLC9KtewNawpzyM/CjPJ9ySCIa+C/06Bsg=="], - "@modelcontextprotocol/sdk": ["@modelcontextprotocol/sdk@1.27.1", "", { "dependencies": { "@hono/node-server": "^1.19.9", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", "cors": "^2.8.5", "cross-spawn": "^7.0.5", "eventsource": "^3.0.2", "eventsource-parser": "^3.0.0", "express": "^5.2.1", "express-rate-limit": "^8.2.1", "hono": "^4.11.4", "jose": "^6.1.3", "json-schema-typed": "^8.0.2", "pkce-challenge": "^5.0.0", "raw-body": "^3.0.0", "zod": "^3.25 || ^4.0", "zod-to-json-schema": "^3.25.1" }, "peerDependencies": { "@cfworker/json-schema": "^4.1.1" }, "optionalPeers": ["@cfworker/json-schema"] }, "sha512-sr6GbP+4edBwFndLbM60gf07z0FQ79gaExpnsjMGePXqFcSSb7t6iscpjk9DhFhwd+mTEQrzNafGP8/iGGFYaA=="], + "@modelcontextprotocol/sdk": ["@modelcontextprotocol/sdk@1.29.0", "", { "dependencies": { "@hono/node-server": "^1.19.9", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", "cors": "^2.8.5", "cross-spawn": "^7.0.5", "eventsource": "^3.0.2", "eventsource-parser": "^3.0.0", "express": "^5.2.1", "express-rate-limit": "^8.2.1", "hono": "^4.11.4", "jose": "^6.1.3", "json-schema-typed": "^8.0.2", "pkce-challenge": "^5.0.0", "raw-body": "^3.0.0", "zod": "^3.25 || ^4.0", "zod-to-json-schema": "^3.25.1" }, "peerDependencies": { "@cfworker/json-schema": "^4.1.1" }, "optionalPeers": ["@cfworker/json-schema"] }, "sha512-zo37mZA9hJWpULgkRpowewez1y6ML5GsXJPY8FI0tBBCd77HEvza4jDqRKOXgHNn867PVGCyTdzqpz0izu5ZjQ=="], "@module-federation/bridge-react-webpack-plugin": ["@module-federation/bridge-react-webpack-plugin@2.3.2", "", { "dependencies": { "@module-federation/sdk": "2.3.2", "@types/semver": "7.5.8", "semver": "7.6.3" } }, "sha512-NMzJhTSGz6PpImjbXfpGX595i+N3EFW5RwRgQ2ftTuqT7FS3vqYnC4i/72HNgryYNTSIZZSwjTbfKLyeSTroeA=="], diff --git a/frontend/package.json b/frontend/package.json index 577240730..03b0c5201 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -87,7 +87,7 @@ "@icons-pack/react-simple-icons": "^13.8.0", "@milkdown/kit": "^7.18.0", "@milkdown/react": "^7.18.0", - "@modelcontextprotocol/sdk": "^1.26.0", + "@modelcontextprotocol/sdk": "^1.29.0", "@module-federation/runtime": "^2.3.2", "@monaco-editor/react": "^4.7.0", "@redpanda-data/ui": "^4.2.0", diff --git a/frontend/src/components/pages/mcp-servers/details/__screenshots__/remote-mcp-inspector-tab.browser.test.tsx/mcp-streaming-inspector-chromium-darwin.png b/frontend/src/components/pages/mcp-servers/details/__screenshots__/remote-mcp-inspector-tab.browser.test.tsx/mcp-streaming-inspector-chromium-darwin.png new file mode 100644 index 000000000..c041c9a27 Binary files /dev/null and b/frontend/src/components/pages/mcp-servers/details/__screenshots__/remote-mcp-inspector-tab.browser.test.tsx/mcp-streaming-inspector-chromium-darwin.png differ diff --git a/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.browser.test.tsx b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.browser.test.tsx new file mode 100644 index 000000000..b6147a9b9 --- /dev/null +++ b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.browser.test.tsx @@ -0,0 +1,168 @@ +/** + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import { create } from '@bufbuild/protobuf'; +import type { MCPServer } from 'protogen/redpanda/api/dataplane/v1/mcp_pb'; +import { MCPServer_State, MCPServerSchema } from 'protogen/redpanda/api/dataplane/v1/mcp_pb'; +import { describe, expect, test, vi } from 'vitest'; +import { page } from 'vitest/browser'; +import { render } from 'vitest-browser-react'; + +import { mockConnectQuery, mockRouterForBrowserTest, ScreenshotFrame } from '../../../../__tests__/browser-test-utils'; + +const RUN_TOOL_REGEX = /run tool/i; + +type StreamOptions = { + onProgress?: (p: { progress?: number; total?: number; statusMessage?: string; status?: string }) => void; +}; + +const mocks = vi.hoisted(() => ({ + getMCPServer: vi.fn<() => { data: unknown; isLoading: boolean; error: Error | null }>().mockReturnValue({ + data: undefined, + isLoading: false, + error: null, + }), + listTools: vi + .fn< + () => { data: unknown; isLoading: boolean; error: Error | null; isRefetchError: boolean; isRefetching: boolean } + >() + .mockReturnValue({ + data: { tools: [] }, + isLoading: false, + error: null, + isRefetchError: false, + isRefetching: false, + }), + listTopics: vi.fn<() => { data: unknown; refetch: () => void }>().mockReturnValue({ + data: { topics: [] }, + refetch: () => undefined, + }), + createTopic: vi.fn(), + lastCapturedOnProgress: undefined as undefined | ((p: unknown) => void), + streamState: { + data: undefined as unknown, + isPending: false as boolean, + error: null as Error | null, + }, +})); + +vi.mock('@tanstack/react-router', () => ({ + ...mockRouterForBrowserTest(), + getRouteApi: () => ({ + useParams: () => ({ id: 'mcp-server-visual' }), + useRouteContext: ({ select }: { select: (ctx: Record) => unknown }) => + select({ gatewayUrl: 'http://localhost:8090' }), + }), +})); +vi.mock('@connectrpc/connect-query', () => mockConnectQuery()); + +vi.mock('config', () => ({ + config: { jwt: 'test-jwt-token' }, + isFeatureFlagEnabled: vi.fn(() => false), + isEmbedded: vi.fn(() => false), + addBearerTokenInterceptor: vi.fn((next) => async (request: unknown) => await next(request)), +})); + +vi.mock('react-query/api/remote-mcp', () => ({ + useGetMCPServerQuery: () => mocks.getMCPServer(), + useListMCPServerTools: () => mocks.listTools(), + useStreamMCPServerToolMutation: () => ({ + data: mocks.streamState.data, + mutate: (params: StreamOptions) => { + mocks.lastCapturedOnProgress = params.onProgress as (p: unknown) => void; + mocks.streamState.isPending = true; + }, + isPending: mocks.streamState.isPending, + error: mocks.streamState.error, + reset: () => { + mocks.streamState.isPending = false; + mocks.streamState.data = undefined; + mocks.streamState.error = null; + }, + }), +})); + +vi.mock('react-query/api/topic', () => ({ + useLegacyListTopicsQuery: () => mocks.listTopics(), + useCreateTopicMutation: () => ({ mutateAsync: mocks.createTopic }), +})); + +const testServer: MCPServer = create(MCPServerSchema, { + id: 'mcp-server-visual', + displayName: 'Visual Regression Server', + url: 'http://localhost:8090/mcp', + state: MCPServer_State.RUNNING, + tools: { + 'process-events': { componentType: 2, configYaml: 'processor: identity' }, + }, +}); + +const { RemoteMCPInspectorTab } = await import('./remote-mcp-inspector-tab'); + +describe('RemoteMCPInspectorTab — browser visual regression', () => { + test('streaming inspector shows Progress bar and status line mid-call', async () => { + mocks.getMCPServer.mockReturnValue({ + data: { mcpServer: testServer }, + isLoading: false, + error: null, + }); + mocks.listTools.mockReturnValue({ + data: { + tools: [ + { + name: 'process-events', + description: 'Transform a batch of events into structured output.', + inputSchema: { + type: 'object', + properties: { + batch_size: { type: 'integer', default: 10 }, + }, + required: ['batch_size'], + }, + }, + ], + }, + isLoading: false, + error: null, + isRefetchError: false, + isRefetching: false, + }); + mocks.streamState.data = undefined; + mocks.streamState.isPending = false; + mocks.streamState.error = null; + + render( + + + + ); + + await expect.element(page.getByRole('button', { name: RUN_TOOL_REGEX })).toBeVisible(); + + // Start a call and emit a progress update so the Progress bar renders. + await page.getByRole('button', { name: RUN_TOOL_REGEX }).click(); + + // Push a progress update through the captured onProgress callback so the + // UI moves into the streaming state (Progress bar + status line). + mocks.lastCapturedOnProgress?.({ + taskId: 'task-42', + status: 'working', + statusMessage: 'Processing batch 6/10...', + progress: 6, + total: 10, + }); + + await expect.element(page.getByTestId('mcp-tool-progress-bar')).toBeVisible(); + await expect.element(page.getByText('Processing batch 6/10...')).toBeVisible(); + + await expect(page.getByTestId('screenshot-frame')).toMatchScreenshot('mcp-streaming-inspector'); + }); +}); diff --git a/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.test.tsx b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.test.tsx new file mode 100644 index 000000000..059992a32 --- /dev/null +++ b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.test.tsx @@ -0,0 +1,378 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import { create } from '@bufbuild/protobuf'; +import { createRouterTransport } from '@connectrpc/connect'; +import userEvent from '@testing-library/user-event'; +import { + GetMCPServerResponseSchema, + ListMCPServersResponseSchema, + MCPServer_State, + MCPServerSchema, +} from 'protogen/redpanda/api/dataplane/v1/mcp_pb'; +import { getMCPServer, listMCPServers } from 'protogen/redpanda/api/dataplane/v1/mcp-MCPServerService_connectquery'; +import { ListTopicsResponseSchema } from 'protogen/redpanda/api/dataplane/v1/topic_pb'; +import { listTopics } from 'protogen/redpanda/api/dataplane/v1/topic-TopicService_connectquery'; +import { renderWithFileRoutes, screen, waitFor } from 'test-utils'; + +vi.mock('config', () => ({ + config: { + jwt: 'test-jwt-token', + }, +})); + +const SERVER_ID = 'server-1'; + +vi.mock('@tanstack/react-router', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getRouteApi: () => ({ + useParams: () => ({ id: SERVER_ID }), + }), + }; +}); + +const toastErrorMock = vi.fn(); +const toastSuccessMock = vi.fn(); +vi.mock('sonner', () => ({ + toast: { + error: (...args: unknown[]) => toastErrorMock(...args), + success: (...args: unknown[]) => toastSuccessMock(...args), + }, +})); + +type StreamMessage = + | { type: 'taskStatus'; task: { taskId: string; status: string; statusMessage?: string } } + | { type: 'result'; result: { content: Array<{ type: string; text?: string }> } } + | { type: 'error'; error: Error }; + +let progressBeforeGate: StreamMessage[] = []; +let messagesAfterGate: StreamMessage[] = []; +let streamGate: Promise = Promise.resolve(); +let releaseStream: () => void = () => undefined; +let lastStreamSignal: AbortSignal | undefined; +let onprogressHandoff: ((p: { progress: number; total?: number }) => void) | undefined; + +let toolsResponse: { + tools: Array<{ + name: string; + description: string; + inputSchema: { type: 'object'; properties: Record }; + }>; +} = { + tools: [ + { + name: 'echo', + description: 'Echo tool', + inputSchema: { type: 'object' as const, properties: {} }, + }, + ], +}; + +vi.mock('@modelcontextprotocol/sdk/client/index.js', () => { + class MockClient { + transport?: { sessionId?: string; onerror?: (error: Error) => void }; + connect = vi.fn((transport: { sessionId?: string; onerror?: (error: Error) => void }) => { + this.transport = transport; + return Promise.resolve(); + }); + getServerCapabilities = vi.fn(() => ({ + tasks: { requests: { tools: { call: {} } } }, + })); + listTools = vi.fn(() => Promise.resolve(toolsResponse)); + callTool = vi.fn(() => Promise.resolve({ content: [] })); + experimental = { + tasks: { + async *callToolStream( + _args: unknown, + _request: unknown, + opts?: { signal?: AbortSignal; onprogress?: (p: { progress: number; total?: number }) => void } + ) { + lastStreamSignal = opts?.signal; + onprogressHandoff = opts?.onprogress; + for (const message of progressBeforeGate) { + yield message; + } + await streamGate; + for (const message of messagesAfterGate) { + yield message; + } + }, + }, + }; + } + return { Client: MockClient }; +}); + +vi.mock('@modelcontextprotocol/sdk/client/streamableHttp.js', () => { + class MockStreamableHTTPClientTransport { + readonly url: URL; + readonly opts: unknown; + sessionId?: string; + onerror?: (error: Error) => void; + constructor(url: URL, opts: unknown) { + this.url = url; + this.opts = opts; + } + } + return { StreamableHTTPClientTransport: MockStreamableHTTPClientTransport }; +}); + +const makeTransport = () => { + const server = create(MCPServerSchema, { + id: SERVER_ID, + displayName: 'Test Server', + url: 'http://localhost:8080', + state: MCPServer_State.RUNNING, + tools: { + echo: { componentType: 1, configYaml: 'test: config' }, + }, + }); + return createRouterTransport(({ rpc }) => { + rpc(getMCPServer, () => create(GetMCPServerResponseSchema, { mcpServer: server })); + rpc(listMCPServers, () => create(ListMCPServersResponseSchema, { mcpServers: [server], nextPageToken: '' })); + rpc(listTopics, () => create(ListTopicsResponseSchema, { topics: [] })); + }); +}; + +import { RemoteMCPInspectorTab } from './remote-mcp-inspector-tab'; + +const freshStreamGate = () => { + streamGate = new Promise((resolve) => { + releaseStream = resolve; + }); +}; + +const RUN_TOOL_REGEX = /run tool/i; +const CANCEL_REGEX = /^cancel$/i; + +describe('RemoteMCPInspectorTab — streaming progress UI', () => { + beforeEach(() => { + progressBeforeGate = []; + messagesAfterGate = []; + lastStreamSignal = undefined; + onprogressHandoff = undefined; + toolsResponse = { + tools: [ + { + name: 'echo', + description: 'Echo tool', + inputSchema: { type: 'object' as const, properties: {} }, + }, + ], + }; + toastErrorMock.mockClear(); + toastSuccessMock.mockClear(); + freshStreamGate(); + }); + + test('renders the registry Progress bar and status line while a tool call is pending', async () => { + const user = userEvent.setup(); + progressBeforeGate = [{ type: 'taskStatus', task: { taskId: 't1', status: 'working', statusMessage: 'halfway' } }]; + messagesAfterGate = [{ type: 'result', result: { content: [{ type: 'text', text: '"ok"' }] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + + await user.click(runButton); + + const progressContainer = await screen.findByTestId('mcp-tool-progress'); + expect(progressContainer).toBeVisible(); + + const progressBar = await screen.findByTestId('mcp-tool-progress-bar'); + expect(progressBar).toHaveAttribute('data-slot', 'progress'); + + expect(await screen.findByText('halfway')).toBeVisible(); + }); + + test('hides the progress UI once the stream resolves', async () => { + const user = userEvent.setup(); + progressBeforeGate = [{ type: 'taskStatus', task: { taskId: 't1', status: 'working', statusMessage: 'halfway' } }]; + messagesAfterGate = [{ type: 'result', result: { content: [{ type: 'text', text: '"done"' }] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + + await user.click(runButton); + + await screen.findByTestId('mcp-tool-progress-bar'); + + releaseStream(); + + await waitFor(() => { + expect(screen.queryByTestId('mcp-tool-progress-bar')).toBeNull(); + }); + }); + + test('surfaces exactly one toast on non-abort errors — mutation owns the toast, component does not double-fire', async () => { + const user = userEvent.setup(); + messagesAfterGate = [{ type: 'error', error: new Error('server blew up') }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + + await user.click(runButton); + releaseStream(); + + await waitFor(() => { + expect(toastErrorMock).toHaveBeenCalledTimes(1); + }); + const message = toastErrorMock.mock.calls[0]?.[0] as string; + expect(message).toContain('MCP tool'); + expect(message).toContain('server blew up'); + }); + + test('clicking Cancel aborts the signal without firing a toast', async () => { + const user = userEvent.setup(); + messagesAfterGate = [{ type: 'result', result: { content: [] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + + await user.click(runButton); + + const cancelButton = await screen.findByRole('button', { name: CANCEL_REGEX }); + await waitFor(() => expect(lastStreamSignal).toBeDefined()); + + await user.click(cancelButton); + + expect(lastStreamSignal?.aborted).toBe(true); + releaseStream(); + + await waitFor(() => { + expect(toastErrorMock).not.toHaveBeenCalled(); + }); + }); + + test('numeric progress from SDK onprogress merges with task status from the stream', async () => { + const user = userEvent.setup(); + progressBeforeGate = [{ type: 'taskStatus', task: { taskId: 't1', status: 'working', statusMessage: 'halfway' } }]; + messagesAfterGate = [{ type: 'result', result: { content: [{ type: 'text', text: '"ok"' }] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + await user.click(runButton); + + await screen.findByText('halfway'); + await waitFor(() => expect(onprogressHandoff).toBeDefined()); + + onprogressHandoff?.({ progress: 50, total: 100 }); + + await waitFor(() => { + const bar = screen.queryByTestId('mcp-tool-progress-bar'); + expect(bar?.getAttribute('data-value')).toBe('50'); + }); + expect(screen.queryByText('halfway')).toBeVisible(); + + releaseStream(); + }); + + test('Progress value clamps to [0, 100] when the server sends out-of-range numbers', async () => { + const user = userEvent.setup(); + messagesAfterGate = [{ type: 'result', result: { content: [{ type: 'text', text: '"ok"' }] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + + await user.click(runButton); + + await waitFor(() => expect(onprogressHandoff).toBeDefined()); + + // > 100% + onprogressHandoff?.({ progress: 200, total: 100 }); + await waitFor(() => { + const bar = screen.queryByTestId('mcp-tool-progress-bar'); + expect(bar).toBeTruthy(); + const value = bar?.getAttribute('data-value'); + expect(value).not.toBeNull(); + expect(Number(value)).toBeLessThanOrEqual(100); + expect(Number(value)).toBeGreaterThanOrEqual(0); + }); + + // < 0% + onprogressHandoff?.({ progress: -5, total: 10 }); + await waitFor(() => { + const bar = screen.queryByTestId('mcp-tool-progress-bar'); + const value = bar?.getAttribute('data-value'); + expect(Number(value)).toBeGreaterThanOrEqual(0); + }); + + // NaN (total = 0 → division by zero NaN handled as undefined) + onprogressHandoff?.({ progress: 5, total: 0 }); + await waitFor(() => { + const bar = screen.queryByTestId('mcp-tool-progress-bar'); + expect(bar).toBeTruthy(); + const value = bar?.getAttribute('data-value'); + // Either indeterminate (no data-value) or a valid clamped number. + if (value !== null && value !== undefined) { + const n = Number(value); + if (!Number.isNaN(n)) { + expect(n).toBeGreaterThanOrEqual(0); + expect(n).toBeLessThanOrEqual(100); + } + } + }); + + releaseStream(); + }); + + test('switching tools mid-stream clears any in-flight progress UI', async () => { + const user = userEvent.setup(); + toolsResponse = { + tools: [ + { name: 'echo', description: 'Echo tool', inputSchema: { type: 'object' as const, properties: {} } }, + { name: 'reverse', description: 'Reverse tool', inputSchema: { type: 'object' as const, properties: {} } }, + ], + }; + progressBeforeGate = [{ type: 'taskStatus', task: { taskId: 't1', status: 'working', statusMessage: 'halfway' } }]; + messagesAfterGate = [{ type: 'result', result: { content: [] } }]; + + renderWithFileRoutes(, { transport: makeTransport() }); + + // Wait for two tools to render. + const echoButton = await screen.findByText('echo'); + await screen.findByText('reverse'); + + // Select echo first and run. + await user.click(echoButton); + const runButton = await screen.findByRole('button', { name: RUN_TOOL_REGEX }); + await waitFor(() => expect(runButton).toBeEnabled()); + await user.click(runButton); + + // Progress surfaces. + await screen.findByTestId('mcp-tool-progress-bar'); + expect(await screen.findByText('halfway')).toBeVisible(); + + // Switch to reverse — progress UI must clear. + const reverseButton = await screen.findByText('reverse'); + await user.click(reverseButton); + + await waitFor(() => { + expect(screen.queryByTestId('mcp-tool-progress-bar')).toBeNull(); + expect(screen.queryByText('halfway')).toBeNull(); + }); + + releaseStream(); + }); +}); diff --git a/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.tsx b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.tsx index 6b2c4907a..b1580dd68 100644 --- a/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.tsx +++ b/frontend/src/components/pages/mcp-servers/details/remote-mcp-inspector-tab.tsx @@ -21,6 +21,7 @@ import { Button } from 'components/redpanda-ui/components/button'; import { Card, CardContent, CardHeader, CardTitle } from 'components/redpanda-ui/components/card'; import { CopyButton } from 'components/redpanda-ui/components/copy-button'; import { Label } from 'components/redpanda-ui/components/label'; +import { Progress } from 'components/redpanda-ui/components/progress'; import { Skeleton } from 'components/redpanda-ui/components/skeleton'; import { Text } from 'components/redpanda-ui/components/typography'; import { RedpandaConnectComponentTypeBadge } from 'components/ui/connect/redpanda-connect-component-type-badge'; @@ -35,7 +36,12 @@ import { } from 'protogen/redpanda/api/dataplane/v1/topic_pb'; import { MCPServer_State, MCPServer_Tool_ComponentType } from 'protogen/redpanda/api/dataplane/v1alpha3/mcp_pb'; import { useEffect, useRef, useState } from 'react'; -import { useCallMCPServerToolMutation, useGetMCPServerQuery, useListMCPServerTools } from 'react-query/api/remote-mcp'; +import { + type MCPStreamProgress, + useGetMCPServerQuery, + useListMCPServerTools, + useStreamMCPServerToolMutation, +} from 'react-query/api/remote-mcp'; import { useCreateTopicMutation, useLegacyListTopicsQuery } from 'react-query/api/topic'; import { toast } from 'sonner'; @@ -143,6 +149,19 @@ const getComponentTypeFromToolName = (toolName: string): MCPServer_Tool_Componen const DEFAULT_TOPIC_PARTITION_COUNT = 1; const DEFAULT_TOPIC_REPLICATION_FACTOR = 3; +const PROGRESS_MAX_PERCENT = 100; + +const normalizeProgressPercent = (progress: number | undefined, total: number | undefined): number | undefined => { + if (progress === undefined || total === undefined || total <= 0) { + return; + } + const percent = Math.round((progress / total) * PROGRESS_MAX_PERCENT); + if (!Number.isFinite(percent)) { + return; + } + return Math.max(0, Math.min(PROGRESS_MAX_PERCENT, percent)); +}; + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: complex business logic export const RemoteMCPInspectorTab = () => { const { id } = routeApi.useParams(); @@ -159,13 +178,14 @@ export const RemoteMCPInspectorTab = () => { const abortControllerRef = useRef(null); const { data: mcpServerData } = useGetMCPServerQuery({ id: id || '' }, { enabled: !!id }); + const [streamProgress, setStreamProgress] = useState(null); const { data: serverToolResponse, mutate: callMCPServerTool, isPending: isServerToolPending, error: toolError, reset: resetMCPServerToolCall, - } = useCallMCPServerToolMutation(); + } = useStreamMCPServerToolMutation(); const { data: mcpServerTools, @@ -344,21 +364,18 @@ export const RemoteMCPInspectorTab = () => { const parameters = (toolParameters as Record) || {}; + setStreamProgress(null); + callMCPServerTool( { serverUrl: mcpServerData.mcpServer.url, toolName: selectedTool, parameters, signal: abortController.signal, + onProgress: (update) => setStreamProgress((prev) => ({ ...prev, ...update })), }, { - onError: (error) => { - if (error.message !== 'Request was cancelled') { - toast.error(error.message); - } - }, onSettled: () => { - // Clear the abort controller reference when request completes if (abortControllerRef.current === abortController) { abortControllerRef.current = null; } @@ -474,6 +491,8 @@ export const RemoteMCPInspectorTab = () => { const initialData = initializeFormData(tool.inputSchema as JSONSchemaType); setToolParameters(initialData); resetMCPServerToolCall(); + // Clear progress from the previous tool's in-flight stream + setStreamProgress(null); // Clear validation errors when switching tools setValidationErrors({}); }} @@ -606,6 +625,17 @@ export const RemoteMCPInspectorTab = () => { {Boolean(isServerToolPending) && (
+ {Boolean(streamProgress) && ( +
+ + + {streamProgress?.statusMessage ?? streamProgress?.status ?? 'Running tool...'} + +
+ )}
diff --git a/frontend/src/globals.css b/frontend/src/globals.css index 0f0c55eae..5d41922fa 100644 --- a/frontend/src/globals.css +++ b/frontend/src/globals.css @@ -1,9 +1,9 @@ -@source "../node_modules/streamdown/dist/index.js"; - @import "./components/redpanda-ui/style/theme.css"; @import "tailwindcss"; @import "tw-animate-css"; +@source "../node_modules/streamdown/dist/index.js"; + /* tw-animate-css defines .fade-in { opacity: 0; animation: .3s forwards fadeIn; } which collides with Monaco's internal .monaco-hover.fade-in class, forcing hover widgets to be invisible. Override to restore opacity. */ diff --git a/frontend/src/react-query/api/mcp-oauth-provider.test.ts b/frontend/src/react-query/api/mcp-oauth-provider.test.ts new file mode 100644 index 000000000..377b2c185 --- /dev/null +++ b/frontend/src/react-query/api/mcp-oauth-provider.test.ts @@ -0,0 +1,143 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import { describe, expect, test } from 'vitest'; + +import { ConsoleJWTOAuthProvider } from './mcp-oauth-provider'; + +const INTERACTIVE_AUTHORIZATION_ERROR = /interactive authorization/; +const PKCE_ERROR = /PKCE/; +const BASE64_URL_PLUS = /\+/g; +const BASE64_URL_SLASH = /\//g; +const BASE64_URL_PADDING = /=+$/; + +const base64Url = (value: string): string => + btoa(value).replace(BASE64_URL_PLUS, '-').replace(BASE64_URL_SLASH, '_').replace(BASE64_URL_PADDING, ''); + +const makeJwt = (payload: Record): string => { + const header = base64Url(JSON.stringify({ alg: 'HS256', typ: 'JWT' })); + const body = base64Url(JSON.stringify(payload)); + // Signature is opaque to the provider — we only decode the payload. + return `${header}.${body}.sig`; +}; + +describe('ConsoleJWTOAuthProvider', () => { + test('returns the current JWT as a Bearer token every call', () => { + let jwt: string | undefined = 'jwt-1'; + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => jwt }); + + expect(provider.tokens()).toEqual({ access_token: 'jwt-1', token_type: 'Bearer' }); + + jwt = 'jwt-2'; + expect(provider.tokens()).toEqual({ access_token: 'jwt-2', token_type: 'Bearer' }); + }); + + test('returns undefined when no JWT is present', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => undefined }); + expect(provider.tokens()).toBeUndefined(); + }); + + test('exposes static client metadata with JWT-bearer grant', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => 'x', clientName: 'custom-client' }); + + expect(provider.redirectUrl).toBeUndefined(); + expect(provider.clientMetadata.client_name).toBe('custom-client'); + expect(provider.clientMetadata.grant_types).toEqual(['urn:ietf:params:oauth:grant-type:jwt-bearer']); + expect(provider.clientInformation()).toEqual({ client_id: 'custom-client' }); + }); + + test('defaults clientName to redpanda-console when omitted', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => 'x' }); + + expect(provider.clientMetadata.client_name).toBe('redpanda-console'); + expect(provider.clientInformation()).toEqual({ client_id: 'redpanda-console' }); + }); + + test('clientMetadata advertises no redirect URIs and public-client auth method', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => 'x' }); + + expect(provider.clientMetadata.redirect_uris).toEqual([]); + expect(provider.clientMetadata.token_endpoint_auth_method).toBe('none'); + }); + + test('saveTokens is a no-op', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => 'x' }); + expect(() => provider.saveTokens()).not.toThrow(); + }); + + test('interactive flow methods throw because the console never drives OAuth redirects', () => { + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => 'x' }); + + expect(() => provider.redirectToAuthorization()).toThrow(INTERACTIVE_AUTHORIZATION_ERROR); + expect(() => provider.saveCodeVerifier()).toThrow(PKCE_ERROR); + expect(() => provider.codeVerifier()).toThrow(PKCE_ERROR); + }); + + test('returns undefined when the JWT exp has passed so SDK raises UnauthorizedError early', () => { + const expiredJwt = makeJwt({ exp: Math.floor(Date.now() / 1000) - 60 }); + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => expiredJwt }); + + expect(provider.tokens()).toBeUndefined(); + }); + + test('returns the JWT when exp is in the future', () => { + const freshJwt = makeJwt({ exp: Math.floor(Date.now() / 1000) + 3600 }); + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => freshJwt }); + + expect(provider.tokens()).toEqual({ access_token: freshJwt, token_type: 'Bearer' }); + }); + + test('returns the JWT when exp is absent — decode is advisory, not crypto', () => { + const noExpJwt = makeJwt({ sub: 'user' }); + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => noExpJwt }); + + expect(provider.tokens()).toEqual({ access_token: noExpJwt, token_type: 'Bearer' }); + }); + + test('returns the JWT when the token is malformed — fail-open, not fail-closed', () => { + const malformed = 'this-is-not-a-valid-jwt'; + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => malformed }); + + expect(provider.tokens()).toEqual({ access_token: malformed, token_type: 'Bearer' }); + }); + + test('returns the JWT when the middle segment is unparseable JSON', () => { + const jwt = `${base64Url('{"alg":"HS256"}')}.${base64Url('not-json')}.sig`; + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => jwt }); + + expect(provider.tokens()).toEqual({ access_token: jwt, token_type: 'Bearer' }); + }); + + test('returns the JWT when exp is non-numeric', () => { + const jwt = makeJwt({ exp: 'not-a-number' }); + const provider = new ConsoleJWTOAuthProvider({ getJwt: () => jwt }); + + expect(provider.tokens()).toEqual({ access_token: jwt, token_type: 'Bearer' }); + }); +}); + +describe('ConsoleJWTOAuthProvider — instance isolation', () => { + test('two providers share no mutable state — their getJwt sources are independent', () => { + let jwtA = 'jwt-A'; + let jwtB = 'jwt-B'; + const providerA = new ConsoleJWTOAuthProvider({ getJwt: () => jwtA }); + const providerB = new ConsoleJWTOAuthProvider({ getJwt: () => jwtB }); + + expect(providerA.tokens()).toEqual({ access_token: 'jwt-A', token_type: 'Bearer' }); + expect(providerB.tokens()).toEqual({ access_token: 'jwt-B', token_type: 'Bearer' }); + + jwtA = 'jwt-A-rotated'; + jwtB = undefined as unknown as string; + + expect(providerA.tokens()).toEqual({ access_token: 'jwt-A-rotated', token_type: 'Bearer' }); + expect(providerB.tokens()).toBeUndefined(); + }); +}); diff --git a/frontend/src/react-query/api/mcp-oauth-provider.ts b/frontend/src/react-query/api/mcp-oauth-provider.ts new file mode 100644 index 000000000..8a61e27bd --- /dev/null +++ b/frontend/src/react-query/api/mcp-oauth-provider.ts @@ -0,0 +1,116 @@ +/** + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import type { OAuthClientProvider } from '@modelcontextprotocol/sdk/client/auth.js'; +import type { + OAuthClientInformation, + OAuthClientMetadata, + OAuthTokens, +} from '@modelcontextprotocol/sdk/shared/auth.js'; + +export type ConsoleJWTOAuthProviderOptions = { + getJwt: () => string | undefined; + clientName?: string; +}; + +const JWT_SEGMENT_COUNT = 3; + +/** + * Best-effort check that a JWT has not yet expired. + * + * Decodes the middle segment (base64url) and reads `exp`. Returns `true` only + * when we can confirm expiry; returns `false` on any parse failure, missing + * `exp`, or non-numeric `exp` so a well-formed token is never spuriously + * rejected. This is advisory, not cryptographic — signature validation still + * lives with the server. + */ +const isJwtExpired = (token: string): boolean => { + try { + const parts = token.split('.'); + if (parts.length !== JWT_SEGMENT_COUNT) { + return false; + } + const padded = parts[1].replace(/-/g, '+').replace(/_/g, '/'); + const payload = JSON.parse(atob(padded)) as { exp?: unknown }; + if (typeof payload.exp !== 'number') { + return false; + } + return payload.exp * 1000 <= Date.now(); + } catch { + return false; + } +}; + +/** + * OAuthClientProvider wrapper around the console's existing JWT. + * + * Console authenticates via an externally managed JWT held on the shared + * `config` object. This provider lets the MCP SDK plug into that flow: + * - `tokens()` returns the current JWT as an OAuth bearer token on every + * call, so the SDK always sees a fresh value without needing refresh. + * - Flow-dependent methods (PKCE, redirects, client registration) throw, + * because the console never drives an interactive OAuth flow — the + * caller has either already authenticated, or it hasn't. + */ +export class ConsoleJWTOAuthProvider implements OAuthClientProvider { + private readonly getJwt: () => string | undefined; + private readonly clientName: string; + + constructor({ getJwt, clientName = 'redpanda-console' }: ConsoleJWTOAuthProviderOptions) { + this.getJwt = getJwt; + this.clientName = clientName; + } + + readonly redirectUrl: string | URL | undefined; + + get clientMetadata(): OAuthClientMetadata { + return { + client_name: this.clientName, + redirect_uris: [], + grant_types: ['urn:ietf:params:oauth:grant-type:jwt-bearer'], + token_endpoint_auth_method: 'none', + }; + } + + clientInformation(): OAuthClientInformation | undefined { + return { client_id: this.clientName }; + } + + tokens(): OAuthTokens | undefined { + const jwt = this.getJwt(); + if (!jwt) { + return; + } + if (isJwtExpired(jwt)) { + return; + } + return { + access_token: jwt, + token_type: 'Bearer', + }; + } + + saveTokens(): void { + // Tokens are owned by the console auth layer, not this provider. + } + + redirectToAuthorization(): void { + throw new Error('ConsoleJWTOAuthProvider does not drive interactive authorization flows'); + } + + saveCodeVerifier(): void { + throw new Error('ConsoleJWTOAuthProvider does not use PKCE'); + } + + codeVerifier(): string { + throw new Error('ConsoleJWTOAuthProvider does not use PKCE'); + } +} diff --git a/frontend/src/react-query/api/remote-mcp.test.tsx b/frontend/src/react-query/api/remote-mcp.test.tsx index 09eb0f134..e45265c03 100644 --- a/frontend/src/react-query/api/remote-mcp.test.tsx +++ b/frontend/src/react-query/api/remote-mcp.test.tsx @@ -11,13 +11,212 @@ import { create } from '@bufbuild/protobuf'; import { createRouterTransport } from '@connectrpc/connect'; -import { renderHook, waitFor } from '@testing-library/react'; +import type { OAuthClientProvider } from '@modelcontextprotocol/sdk/client/auth.js'; +import { act, renderHook, waitFor } from '@testing-library/react'; import { ListMCPServersResponseSchema, MCPServerSchema } from 'protogen/redpanda/api/dataplane/v1/mcp_pb'; import { listMCPServers } from 'protogen/redpanda/api/dataplane/v1/mcp-MCPServerService_connectquery'; import { connectQueryWrapper } from 'test-utils'; -import { describe, expect, test } from 'vitest'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; -import { useListMCPServersQuery } from './remote-mcp'; +import { ConsoleJWTOAuthProvider } from './mcp-oauth-provider'; +import type { MCPStreamProgress } from './remote-mcp'; +import { + createMCPClientWithSession, + listMCPServerTools, + useCallMCPServerToolMutation, + useListMCPServersQuery, + useStreamMCPServerToolMutation, +} from './remote-mcp'; + +const STREAM_TIMEOUT_50MS_REGEX = /MCP tool stream timed out after 50ms/; +const STREAM_TIMED_OUT_REGEX = /timed out/; +const STREAM_WATCHDOG_REGEX = /MCP tool stream ended without a terminal/; +const STREAM_ERROR_EMPTY_PAYLOAD_REGEX = /MCP stream yielded an error event with no payload/; + +vi.mock('config', () => ({ + config: { + jwt: 'test-jwt-token', + }, +})); + +const formatToastErrorMessageGRPCMock = vi.fn(() => 'formatted error'); +vi.mock('utils/toast.utils', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + formatToastErrorMessageGRPC: (...args: Parameters) => + formatToastErrorMessageGRPCMock(...args), + }; +}); + +const toastErrorMock = vi.fn(); +vi.mock('sonner', () => ({ + toast: { + error: (...args: unknown[]) => toastErrorMock(...args), + success: vi.fn(), + }, +})); + +type StreamMessage = + | { type: 'taskCreated'; task: { taskId: string; status: string; statusMessage?: string } } + | { type: 'taskStatus'; task: { taskId: string; status: string; statusMessage?: string } } + | { type: 'result'; result: { content: Array<{ type: string; text?: string }> } } + | { type: 'error'; error?: Error }; + +type StreamOptions = { + signal?: AbortSignal; + onprogress?: (progress: { progress: number; total?: number }) => void; +}; + +type ServerCapabilitiesMock = { + tools?: { listChanged?: boolean }; + tasks?: { requests?: { tools?: { call?: unknown } } } | undefined; +} | null; + +let streamMessages: StreamMessage[] = []; +let streamYieldDelayMs = 0; +let streamHangForever = false; +let lastStreamOptions: StreamOptions | undefined; +let connectOrderLog: string[] = []; +let lastTransportOpts: { authProvider?: OAuthClientProvider; fetch?: typeof fetch } | undefined; +let lastClientInfo: { name: string; version: string } | undefined; +let nextConnectRejection: Error | undefined; +let nextListToolsRejection: Error | undefined; +let nextCallToolRejection: Error | undefined; +let streamConstructorSnapshots: number[] = []; +let serverCapabilitiesMock: ServerCapabilitiesMock = { + tools: { listChanged: false }, + tasks: { requests: { tools: { call: {} } } }, +}; +const createdClients: { close: ReturnType }[] = []; +const callToolInvocations: { name: string; arguments: Record }[] = []; +let fallbackCallToolHang = false; +let fallbackCallToolSignalCapture: AbortSignal | undefined; + +vi.mock('@modelcontextprotocol/sdk/client/index.js', () => { + class MockClient { + transport?: { sessionId?: string; onerror?: (error: Error) => void }; + close = vi.fn(() => Promise.resolve()); + constructor(clientInfo: { name: string; version: string }) { + lastClientInfo = clientInfo; + createdClients.push(this); + } + connect = vi.fn((transport: { sessionId?: string; onerror?: (error: Error) => void }) => { + if (nextConnectRejection) { + const err = nextConnectRejection; + nextConnectRejection = undefined; + return Promise.reject(err); + } + this.transport = transport; + connectOrderLog.push(`connect:onerror=${typeof transport.onerror}`); + return Promise.resolve(); + }); + getServerCapabilities = vi.fn(() => serverCapabilitiesMock ?? undefined); + listTools = vi.fn(() => { + if (nextListToolsRejection) { + const err = nextListToolsRejection; + nextListToolsRejection = undefined; + return Promise.reject(err); + } + return Promise.resolve({ tools: [] }); + }); + callTool = vi.fn( + ( + params: { name: string; arguments: Record }, + _schema: unknown, + opts?: { signal?: AbortSignal } + ) => { + callToolInvocations.push(params); + fallbackCallToolSignalCapture = opts?.signal; + if (nextCallToolRejection) { + const err = nextCallToolRejection; + nextCallToolRejection = undefined; + return Promise.reject(err); + } + if (fallbackCallToolHang) { + return new Promise<{ content: Array<{ type: string; text: string }> }>((_resolve, reject) => { + const onAbort = () => { + const err = new Error('aborted'); + err.name = 'AbortError'; + reject(err); + }; + if (opts?.signal?.aborted) { + onAbort(); + return; + } + opts?.signal?.addEventListener('abort', onAbort, { once: true }); + }); + } + return Promise.resolve({ content: [{ type: 'text', text: 'fallback-result' }] }); + } + ); + experimental = { + tasks: { + // biome-ignore lint/suspicious/useAwait: async generator with sync yields + async *callToolStream( + _toolArgs: { name: string; arguments: Record }, + _request: unknown, + opts?: StreamOptions + ) { + lastStreamOptions = opts; + streamConstructorSnapshots.push(streamMessages.length); + if (streamHangForever) { + await new Promise((resolve) => { + opts?.signal?.addEventListener('abort', () => resolve()); + }); + return; + } + for (const message of streamMessages) { + if (streamYieldDelayMs > 0) { + await new Promise((r) => setTimeout(r, streamYieldDelayMs)); + } + yield message; + } + }, + }, + }; + } + return { Client: MockClient }; +}); + +vi.mock('@modelcontextprotocol/sdk/client/streamableHttp.js', () => { + class MockStreamableHTTPClientTransport { + sessionId?: string; + onerror?: (error: Error) => void; + url: URL; + opts: { authProvider?: OAuthClientProvider; fetch?: typeof fetch }; + constructor(url: URL, opts: { authProvider?: OAuthClientProvider; fetch?: typeof fetch }) { + this.url = url; + this.opts = opts; + lastTransportOpts = opts; + } + } + return { StreamableHTTPClientTransport: MockStreamableHTTPClientTransport }; +}); + +beforeEach(() => { + streamMessages = []; + streamYieldDelayMs = 0; + streamHangForever = false; + lastStreamOptions = undefined; + connectOrderLog = []; + lastTransportOpts = undefined; + lastClientInfo = undefined; + nextConnectRejection = undefined; + nextListToolsRejection = undefined; + nextCallToolRejection = undefined; + streamConstructorSnapshots = []; + serverCapabilitiesMock = { + tools: { listChanged: false }, + tasks: { requests: { tools: { call: {} } } }, + }; + createdClients.length = 0; + callToolInvocations.length = 0; + fallbackCallToolHang = false; + fallbackCallToolSignalCapture = undefined; + formatToastErrorMessageGRPCMock.mockClear(); + toastErrorMock.mockClear(); +}); describe('useListMCPServersQuery', () => { test('fetches all pages and flattens servers into a single array', async () => { @@ -113,3 +312,771 @@ describe('useListMCPServersQuery', () => { expect(result.current.data.mcpServers).toHaveLength(0); }); }); + +describe('createMCPClientWithSession', () => { + test('wires transport.onerror to log transport-level failures', async () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined); + + const { client, transport } = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(client).toBeDefined(); + expect(transport).toBeDefined(); + expect(typeof transport.onerror).toBe('function'); + + transport.onerror?.(new Error('boom')); + + expect(errorSpy).toHaveBeenCalledWith( + '[MCP] transport error', + expect.objectContaining({ serverUrl: 'https://example.test/mcp' }) + ); + + errorSpy.mockRestore(); + }); + + test('assigns transport.onerror before calling client.connect', async () => { + await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(connectOrderLog).toEqual(['connect:onerror=function']); + }); + + test('transport.onerror handler does not throw so the promise chain stays alive', async () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined); + const { transport } = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(() => transport.onerror?.(new Error('boom'))).not.toThrow(); + + errorSpy.mockRestore(); + }); + + test('passes a ConsoleJWTOAuthProvider to the transport constructor', async () => { + await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(lastTransportOpts?.authProvider).toBeInstanceOf(ConsoleJWTOAuthProvider); + }); + + test('custom fetch injects Mcp-Session-Id header belt-and-suspenders with auth', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch').mockResolvedValue(new Response(null, { status: 200 })); + + const { transport } = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + const transportFetch = lastTransportOpts?.fetch; + expect(transportFetch).toBeDefined(); + + (transport as unknown as { sessionId?: string }).sessionId = 'sess-42'; + + await transportFetch?.('https://example.test/mcp', { method: 'POST' }); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [, init] = fetchSpy.mock.calls[0] as [unknown, RequestInit]; + const headers = init.headers as Record; + expect(headers['Content-Type']).toBe('application/json'); + expect(headers.Authorization).toBe('Bearer test-jwt-token'); + expect(headers['Mcp-Session-Id']).toBe('sess-42'); + + fetchSpy.mockRestore(); + }); + + test('passes clientName through and pins SDK client version at 1.0.0', async () => { + await createMCPClientWithSession('https://example.test/mcp', 'some-client'); + + expect(lastClientInfo).toEqual({ name: 'some-client', version: '1.0.0' }); + }); + + test('returns the { client, transport } shape', async () => { + const result = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(Object.keys(result).sort()).toEqual(['client', 'transport']); + expect(result.client).toBeDefined(); + expect(result.transport).toBeDefined(); + }); +}); + +describe('useStreamMCPServerToolMutation', () => { + test('emits progress updates and resolves with the final result', async () => { + streamMessages = [ + { type: 'taskCreated', task: { taskId: 't1', status: 'working' } }, + { type: 'taskStatus', task: { taskId: 't1', status: 'working', statusMessage: 'halfway' } }, + { type: 'result', result: { content: [{ type: 'text', text: 'done' }] } }, + ]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const progressUpdates: MCPStreamProgress[] = []; + + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: { foo: 'bar' }, + onProgress: (update) => progressUpdates.push(update), + }); + + expect(value).toEqual({ content: [{ type: 'text', text: 'done' }] }); + expect(progressUpdates).toHaveLength(2); + expect(progressUpdates[0]).toEqual({ taskId: 't1', status: 'working', statusMessage: undefined }); + expect(progressUpdates[1]).toEqual({ taskId: 't1', status: 'working', statusMessage: 'halfway' }); + }); + + test('forwards numeric progress from the SDK onprogress callback', async () => { + streamMessages = [{ type: 'result', result: { content: [] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const progressUpdates: MCPStreamProgress[] = []; + + const mutationPromise = result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + onProgress: (update) => progressUpdates.push(update), + }); + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)); + lastStreamOptions?.onprogress?.({ progress: 3, total: 10 }); + lastStreamOptions?.onprogress?.({ progress: 7, total: 10 }); + await mutationPromise; + }); + + expect(progressUpdates).toContainEqual({ progress: 3, total: 10 }); + expect(progressUpdates).toContainEqual({ progress: 7, total: 10 }); + }); + + test('passes the AbortSignal through to the SDK call', async () => { + streamMessages = [{ type: 'result', result: { content: [] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const controller = new AbortController(); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + signal: controller.signal, + }); + + // The stream receives a composed signal (caller signal ⋃ internal timeout). + expect(lastStreamOptions?.signal).toBeDefined(); + }); + + test('pre-aborted caller signal short-circuits the composed signal', async () => { + streamMessages = [{ type: 'result', result: { content: [] } }]; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const controller = new AbortController(); + controller.abort(); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + signal: controller.signal, + }); + + // The composed signal surfaced to the SDK fired synchronously because the + // caller's signal was already aborted at call time. + expect(lastStreamOptions?.signal?.aborted).toBe(true); + }); + + test('throws when the stream ends without a result', async () => { + streamMessages = [{ type: 'error', error: new Error('server died') }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow('server died'); + }); + + test('resolves with the terminal result message payload (non-streaming call compatibility)', async () => { + const terminalPayload = { content: [{ type: 'text', text: 'final' }] }; + streamMessages = [{ type: 'result', result: terminalPayload }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(value).toEqual(terminalPayload); + }); + + test('rejects when the stream yields an error event before a result', async () => { + streamMessages = [ + { type: 'taskCreated', task: { taskId: 't1', status: 'working' } }, + { type: 'error', error: new Error('upstream failed') }, + ]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow('upstream failed'); + }); + + test('rejects with a descriptive Error when the stream yields an error event without a payload', async () => { + streamMessages = [{ type: 'error' }]; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const rejection = await result.current + .mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + .then( + () => { + throw new Error('mutation should have rejected'); + }, + (err: unknown) => err + ); + + expect(rejection).toBeInstanceOf(Error); + if (rejection instanceof Error) { + expect(rejection.message).toMatch(STREAM_ERROR_EMPTY_PAYLOAD_REGEX); + } + }); + + test('routes non-abort errors through formatToastErrorMessageGRPC with action=call entity=MCP tool and fires exactly one toast', async () => { + streamMessages = [{ type: 'error', error: new Error('boom') }]; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow('boom'); + + expect(formatToastErrorMessageGRPCMock).toHaveBeenCalledTimes(1); + expect(formatToastErrorMessageGRPCMock).toHaveBeenCalledWith( + expect.objectContaining({ action: 'call', entity: 'MCP tool' }) + ); + // Mutation owns the toast — it must actually fire one, not just format a string. + expect(toastErrorMock).toHaveBeenCalledTimes(1); + expect(toastErrorMock).toHaveBeenCalledWith('formatted error'); + }); + + test('does not surface a toast when the error is an AbortError', async () => { + const abortErr = new Error('aborted'); + abortErr.name = 'AbortError'; + streamMessages = [{ type: 'error', error: abortErr }]; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toBe(abortErr); + + expect(formatToastErrorMessageGRPCMock).not.toHaveBeenCalled(); + expect(toastErrorMock).not.toHaveBeenCalled(); + }); +}); + +describe('useStreamMCPServerToolMutation — capability fallback', () => { + test('falls back to non-streaming callTool when the server does not advertise tasks.requests.tools.call', async () => { + serverCapabilitiesMock = { tools: { listChanged: false } }; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: { foo: 'bar' }, + }); + + expect(value).toEqual({ content: [{ type: 'text', text: 'fallback-result' }] }); + expect(callToolInvocations).toHaveLength(1); + expect(callToolInvocations[0]).toEqual({ name: 'my-tool', arguments: { foo: 'bar' } }); + // Stream path must not have been entered. + expect(streamConstructorSnapshots).toHaveLength(0); + }); + + test('uses the streaming path when the server advertises tasks capability', async () => { + serverCapabilitiesMock = { + tools: { listChanged: false }, + tasks: { requests: { tools: { call: {} } } }, + }; + streamMessages = [{ type: 'result', result: { content: [{ type: 'text', text: 'streamed' }] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(value).toEqual({ content: [{ type: 'text', text: 'streamed' }] }); + expect(callToolInvocations).toHaveLength(0); + expect(streamConstructorSnapshots).toHaveLength(1); + }); + + test('falls back when getServerCapabilities returns undefined entirely', async () => { + serverCapabilitiesMock = null; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(callToolInvocations).toHaveLength(1); + }); + + test('streamTimeoutMs applies on the capability-fallback path — hung callTool rejects within the budget', async () => { + serverCapabilitiesMock = { tools: { listChanged: false } }; + fallbackCallToolHang = true; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const start = Date.now(); + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + streamTimeoutMs: 50, + }) + ).rejects.toBeTruthy(); + const elapsed = Date.now() - start; + + expect(elapsed).toBeLessThan(500); + // The fallback callTool must have received the composed signal so its + // upstream fetch can be cancelled when the timeout fires. + expect(fallbackCallToolSignalCapture).toBeDefined(); + expect(fallbackCallToolSignalCapture?.aborted).toBe(true); + }); +}); + +describe('useStreamMCPServerToolMutation — timeout & watchdog', () => { + test('rejects with a descriptive error when the stream never produces a terminal message before the timeout', async () => { + streamHangForever = true; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + streamTimeoutMs: 50, + }) + ).rejects.toThrow(STREAM_TIMEOUT_50MS_REGEX); + }); + + test('timeout path aborts the SDK signal so upstream fetches are cancelled', async () => { + streamHangForever = true; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const abortStates: boolean[] = []; + const promise = result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + streamTimeoutMs: 30, + }); + + await expect(promise).rejects.toThrow(STREAM_TIMED_OUT_REGEX); + abortStates.push(lastStreamOptions?.signal?.aborted ?? false); + expect(abortStates).toEqual([true]); + }); + + test('does not fire the timeout when a terminal result arrives first', async () => { + streamMessages = [{ type: 'result', result: { content: [{ type: 'text', text: 'ok' }] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + streamTimeoutMs: 500, + }); + + expect(value).toEqual({ content: [{ type: 'text', text: 'ok' }] }); + }); + + test('rejects explicitly when the stream closes without any result or error message (watchdog)', async () => { + // No messages at all — generator returns immediately. Current code throws a generic "stream ended without a result". + streamMessages = []; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow(STREAM_WATCHDOG_REGEX); + }); +}); + +describe('useStreamMCPServerToolMutation — concurrency', () => { + test('back-to-back calls each get a fresh client — no shared state leak', async () => { + streamMessages = [{ type: 'result', result: { content: [{ type: 'text', text: 'parallel-ok' }] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result: resultA } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + const { result: resultB } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const controllerA = new AbortController(); + const controllerB = new AbortController(); + + const a = await resultA.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'tool-a', + parameters: {}, + signal: controllerA.signal, + }); + const b = await resultB.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'tool-b', + parameters: {}, + signal: controllerB.signal, + }); + + expect(a).toEqual({ content: [{ type: 'text', text: 'parallel-ok' }] }); + expect(b).toEqual({ content: [{ type: 'text', text: 'parallel-ok' }] }); + expect(controllerA.signal).not.toBe(controllerB.signal); + // Each call created a separate client — no shared state / singleton. + expect(createdClients.length).toBeGreaterThanOrEqual(2); + }); + + test('cancelling one parallel call does not cancel the other', async () => { + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result: resultA } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + const { result: resultB } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const controllerA = new AbortController(); + const controllerB = new AbortController(); + + streamHangForever = true; + const aPromise = resultA.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'tool-a', + parameters: {}, + signal: controllerA.signal, + streamTimeoutMs: 10_000, + }); + // Let the first call enter the stream. + await new Promise((r) => setTimeout(r, 10)); + + streamHangForever = false; + streamMessages = [{ type: 'result', result: { content: [{ type: 'text', text: 'b-done' }] } }]; + const bPromise = resultB.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'tool-b', + parameters: {}, + signal: controllerB.signal, + }); + + const b = await bPromise; + expect(b).toEqual({ content: [{ type: 'text', text: 'b-done' }] }); + expect(controllerB.signal.aborted).toBe(false); + + controllerA.abort(); + await expect(aPromise).rejects.toBeTruthy(); + // B was never aborted by A's cancellation. + expect(controllerB.signal.aborted).toBe(false); + }); +}); + +describe('createMCPClientWithSession — transport error contract', () => { + test('transport.onerror does not route through formatToastErrorMessageGRPC — toast is owned by mutation onError', async () => { + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined); + + const { transport } = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + transport.onerror?.(new Error('sse drop')); + + expect(formatToastErrorMessageGRPCMock).not.toHaveBeenCalled(); + expect(errorSpy).toHaveBeenCalledWith( + '[MCP] transport error', + expect.objectContaining({ serverUrl: 'https://example.test/mcp' }) + ); + + errorSpy.mockRestore(); + }); +}); + +describe('Integration — listTools then streaming callTool end-to-end', () => { + test('happy path: list tools, then stream a call with progress and a final result', async () => { + streamMessages = [ + { type: 'taskCreated', task: { taskId: 't-e2e', status: 'working' } }, + { type: 'taskStatus', task: { taskId: 't-e2e', status: 'working', statusMessage: '50%' } }, + { type: 'result', result: { content: [{ type: 'text', text: 'e2e-done' }] } }, + ]; + + // 1. List tools via the session factory. + const { client } = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + const toolsRes = await client.listTools(); + expect(toolsRes.tools).toEqual([]); + + // 2. Stream a call through the mutation — progress + result arrive as expected. + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const progressUpdates: MCPStreamProgress[] = []; + const value = await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'e2e-tool', + parameters: { foo: 'bar' }, + onProgress: (u) => progressUpdates.push(u), + }); + + expect(value).toEqual({ content: [{ type: 'text', text: 'e2e-done' }] }); + expect(progressUpdates.map((u) => u.status)).toEqual(['working', 'working']); + expect(progressUpdates.map((u) => u.statusMessage)).toEqual([undefined, '50%']); + }); +}); + +describe('createMCPClientWithSession — isolation & error propagation', () => { + test('each call yields a fresh client — no hidden singleton', async () => { + const r1 = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + const r2 = await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + + expect(r1.client).not.toBe(r2.client); + expect(r1.transport).not.toBe(r2.transport); + expect(createdClients).toHaveLength(2); + }); + + test('connect() failure propagates and leaves no half-initialized client usable', async () => { + nextConnectRejection = new Error('connect refused'); + + await expect(createMCPClientWithSession('https://example.test/mcp', 'redpanda-console')).rejects.toThrow( + 'connect refused' + ); + }); +}); + +describe('useStreamMCPServerToolMutation — client lifecycle (close in finally)', () => { + test('closes the client exactly once on a successful streaming call', async () => { + streamMessages = [{ type: 'result', result: { content: [{ type: 'text', text: 'ok' }] } }]; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(createdClients).toHaveLength(1); + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); + + test('closes the client exactly once when the stream rejects with an error', async () => { + streamMessages = [{ type: 'error', error: new Error('boom') }]; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow('boom'); + + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); + + test('closes the client exactly once when the caller aborts mid-stream', async () => { + streamHangForever = true; + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + const controller = new AbortController(); + const promise = result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + signal: controller.signal, + streamTimeoutMs: 10_000, + }); + + // Let the mutation enter the streaming path before aborting. + await new Promise((r) => setTimeout(r, 10)); + controller.abort(); + + await expect(promise).rejects.toBeTruthy(); + + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + expect(toastErrorMock).not.toHaveBeenCalled(); + }); + + test('closes the client exactly once on the capability-fallback (non-streaming) path', async () => { + serverCapabilitiesMock = { tools: { listChanged: false } }; + + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useStreamMCPServerToolMutation(), { wrapper }); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(callToolInvocations).toHaveLength(1); + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); +}); + +describe('listMCPServerTools — client lifecycle', () => { + test('closes the client exactly once on a successful listTools call', async () => { + await listMCPServerTools('https://example.test/mcp'); + + expect(createdClients).toHaveLength(1); + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); + + test('closes the client exactly once when listTools rejects', async () => { + nextListToolsRejection = new Error('boom'); + + await expect(listMCPServerTools('https://example.test/mcp')).rejects.toThrow('boom'); + + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); +}); + +describe('useCallMCPServerToolMutation — client lifecycle (close in finally)', () => { + test('closes the client exactly once on a successful callTool', async () => { + const { wrapper } = connectQueryWrapper({ defaultOptions: { queries: { retry: false } } }); + const { result } = renderHook(() => useCallMCPServerToolMutation(), { wrapper }); + + await result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }); + + expect(createdClients).toHaveLength(1); + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); + + test('closes the client exactly once when callTool rejects', async () => { + nextCallToolRejection = new Error('boom'); + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useCallMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toThrow('boom'); + + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + }); + + test('does not fire a toast when callTool rejects with AbortError', async () => { + nextCallToolRejection = Object.assign(new Error('aborted'), { name: 'AbortError' }); + + const { wrapper } = connectQueryWrapper({ + defaultOptions: { queries: { retry: false }, mutations: { retry: false } }, + }); + const { result } = renderHook(() => useCallMCPServerToolMutation(), { wrapper }); + + await expect( + result.current.mutateAsync({ + serverUrl: 'https://example.test/mcp', + toolName: 'my-tool', + parameters: {}, + }) + ).rejects.toBeTruthy(); + + expect(createdClients[0].close).toHaveBeenCalledTimes(1); + expect(toastErrorMock).not.toHaveBeenCalled(); + }); +}); + +describe('createMCPClientWithSession — Mcp-Session-Id header', () => { + test('omits Mcp-Session-Id header when no session has been established yet', async () => { + const fetchSpy = vi.spyOn(globalThis, 'fetch').mockResolvedValue(new Response(null, { status: 200 })); + + await createMCPClientWithSession('https://example.test/mcp', 'redpanda-console'); + const transportFetch = lastTransportOpts?.fetch; + expect(transportFetch).toBeDefined(); + + await transportFetch?.('https://example.test/mcp', { method: 'POST' }); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [, init] = fetchSpy.mock.calls[0] as [unknown, RequestInit]; + const headers = init.headers as Record; + expect(headers.Authorization).toBe('Bearer test-jwt-token'); + expect(headers).not.toHaveProperty('Mcp-Session-Id'); + + fetchSpy.mockRestore(); + }); +}); diff --git a/frontend/src/react-query/api/remote-mcp.tsx b/frontend/src/react-query/api/remote-mcp.tsx index dab03c9e0..af9d840bf 100644 --- a/frontend/src/react-query/api/remote-mcp.tsx +++ b/frontend/src/react-query/api/remote-mcp.tsx @@ -33,8 +33,10 @@ import { updateMCPServer, } from 'protogen/redpanda/api/dataplane/v1/mcp-MCPServerService_connectquery'; import { useMemo } from 'react'; +import { ConsoleJWTOAuthProvider } from 'react-query/api/mcp-oauth-provider'; import { MAX_PAGE_SIZE, type MessageInit, type QueryOptions } from 'react-query/react-query.utils'; import { useInfiniteQueryWithAllPages } from 'react-query/use-infinite-query-with-all-pages'; +import { toast } from 'sonner'; import { formatToastErrorMessageGRPC } from 'utils/toast.utils'; export { MCPServer_State, MCPServer_Tool_ComponentType }; @@ -274,7 +276,10 @@ export const createMCPClientWithSession = async ( ); // Create StreamableHTTP transport for HTTP endpoints + const authProvider = new ConsoleJWTOAuthProvider({ getJwt: () => config.jwt, clientName }); const transport = new StreamableHTTPClientTransport(new URL(serverUrl), { + authProvider, + // allow: direct-query MCP SDK transport requires a raw fetch to stream SSE — ConnectRPC does not apply here fetch: async (input, init) => { const response = await fetch(input, { ...init, @@ -282,13 +287,18 @@ export const createMCPClientWithSession = async ( ...init?.headers, 'Content-Type': 'application/json', ...(config.jwt && { Authorization: `Bearer ${config.jwt}` }), - 'Mcp-Session-Id': client?.transport?.sessionId ?? '', + ...(client?.transport?.sessionId && { 'Mcp-Session-Id': client.transport.sessionId }), }, }); return response; }, }); + transport.onerror = (error) => { + // biome-ignore lint/suspicious/noConsole: transport-level errors have no UI surface + console.error('[MCP] transport error', { serverUrl, error }); + }; + // Connect the client to the transport await client.connect(transport); @@ -298,7 +308,11 @@ export const createMCPClientWithSession = async ( export const listMCPServerTools = async (serverUrl: string) => { const { client } = await createMCPClientWithSession(serverUrl, 'redpanda-console'); - return client.listTools(); + try { + return await client.listTools(); + } finally { + await client.close?.(); + } }; export type CallMCPToolParams = { @@ -313,14 +327,18 @@ export const useCallMCPServerToolMutation = () => mutationFn: async ({ serverUrl, toolName, parameters, signal }: CallMCPToolParams) => { const { client } = await createMCPClientWithSession(serverUrl, 'redpanda-console'); - return client.callTool( - { - name: toolName, - arguments: parameters, - }, - undefined, - { signal } - ); + try { + return await client.callTool( + { + name: toolName, + arguments: parameters, + }, + undefined, + { signal } + ); + } finally { + await client.close?.(); + } }, onError: (error) => { if (error.name === 'AbortError' || error.message?.includes('aborted')) { @@ -329,11 +347,169 @@ export const useCallMCPServerToolMutation = () => const connectError = ConnectError.from(error); - return formatToastErrorMessageGRPC({ - error: connectError, - action: 'call', - entity: 'MCP tool', + toast.error( + formatToastErrorMessageGRPC({ + error: connectError, + action: 'call', + entity: 'MCP tool', + }) + ); + }, + }); + +export type MCPStreamTaskStatus = 'working' | 'input_required' | 'completed' | 'failed' | 'cancelled'; + +export type MCPStreamProgress = { + taskId?: string; + status?: MCPStreamTaskStatus; + statusMessage?: string; + progress?: number; + total?: number; +}; + +export type StreamMCPToolParams = CallMCPToolParams & { + onProgress?: (update: MCPStreamProgress) => void; + /** + * Maximum time to wait for the stream to produce a terminal (result/error) + * message. On timeout, the SDK signal is aborted and the mutation rejects + * with a descriptive error. Defaults to {@link DEFAULT_STREAM_TIMEOUT_MS}. + */ + streamTimeoutMs?: number; +}; + +type CallToolResult = Awaited< + ReturnType['callTool']> +>; + +type MCPClient = InstanceType; + +export const DEFAULT_STREAM_TIMEOUT_MS = 120_000; + +const serverSupportsToolTasks = (client: MCPClient): boolean => { + const capabilities = client.getServerCapabilities(); + return capabilities?.tasks?.requests?.tools?.call !== undefined; +}; + +// Compose a caller signal with a timeout so a hung stream rejects instead of +// blocking forever. Returned cleanup must run in a `finally`. +const buildStreamAbortControl = (signal: AbortSignal | undefined, timeoutMs: number) => { + const timeoutController = new AbortController(); + const timeoutHandle = setTimeout(() => timeoutController.abort(), timeoutMs); + const onUserAbort = () => timeoutController.abort(); + if (signal?.aborted) { + timeoutController.abort(); + } else { + signal?.addEventListener('abort', onUserAbort, { once: true }); + } + return { + composedSignal: timeoutController.signal, + cleanup: () => { + clearTimeout(timeoutHandle); + signal?.removeEventListener('abort', onUserAbort); + }, + }; +}; + +const drainMCPStream = async ( + stream: AsyncIterable<{ + type: 'taskCreated' | 'taskStatus' | 'result' | 'error'; + task?: { taskId: string; status: MCPStreamTaskStatus; statusMessage?: string }; + result?: T; + error?: Error; + }>, + onProgress?: (update: MCPStreamProgress) => void +): Promise => { + for await (const message of stream) { + if (message.type === 'taskCreated' || message.type === 'taskStatus') { + onProgress?.({ + taskId: message.task?.taskId, + status: message.task?.status, + statusMessage: message.task?.statusMessage, }); + continue; + } + if (message.type === 'result') { + return message.result; + } + throw message.error ?? new Error('MCP stream yielded an error event with no payload'); + } + return; +}; + +export const useStreamMCPServerToolMutation = () => + useTanstackMutation({ + mutationFn: async ({ + serverUrl, + toolName, + parameters, + signal, + onProgress, + streamTimeoutMs = DEFAULT_STREAM_TIMEOUT_MS, + }: StreamMCPToolParams): Promise => { + const { client } = await createMCPClientWithSession(serverUrl, 'redpanda-console'); + + try { + const { composedSignal, cleanup } = buildStreamAbortControl(signal, streamTimeoutMs); + + try { + // Older servers respond to callToolStream but never produce a terminal + // message, which would hang the mutation — fall back to non-streaming. + if (!serverSupportsToolTasks(client)) { + return (await client.callTool({ name: toolName, arguments: parameters }, undefined, { + signal: composedSignal, + })) as CallToolResult; + } + + const stream = client.experimental.tasks.callToolStream( + { name: toolName, arguments: parameters }, + undefined, + { + signal: composedSignal, + onprogress: (progress) => { + onProgress?.({ + progress: progress.progress, + total: progress.total, + }); + }, + } + ); + + const result = await drainMCPStream(stream, onProgress); + if (result !== undefined) { + return result as CallToolResult; + } + + // Stream closed without a terminal message. Surface timeout explicitly + // if that was the cause, user cancellation as an AbortError so the + // mutation's onError skips the toast, else a watchdog error. + if (composedSignal.aborted && !signal?.aborted) { + throw new Error(`MCP tool stream timed out after ${streamTimeoutMs}ms`); + } + if (signal?.aborted) { + throw Object.assign(new Error('Request was cancelled'), { name: 'AbortError' }); + } + throw new Error('MCP tool stream ended without a terminal result or error message'); + } finally { + cleanup(); + } + } finally { + await client.close?.(); + } + }, + onError: (error) => { + if (error.name === 'AbortError' || error.message?.includes('aborted')) { + return; + } + + const connectError = ConnectError.from(error); + + toast.error( + formatToastErrorMessageGRPC({ + error: connectError, + action: 'call', + entity: 'MCP tool', + }) + ); }, }); diff --git a/frontend/vitest.browser.setup.ts b/frontend/vitest.browser.setup.ts index 41a8f5992..9a56f4fef 100644 --- a/frontend/vitest.browser.setup.ts +++ b/frontend/vitest.browser.setup.ts @@ -1,9 +1,5 @@ import '@testing-library/jest-dom/vitest'; -// NOTE: not importing src/globals.css — Tailwind v4's @source directive -// interleaved with @imports in that file trips PostCSS' strict @import-first -// rule in the Vitest browser-mode pipeline. Deterministic screenshots are -// already guaranteed by the animation-kill stylesheet appended below plus -// the `reducedMotion:'reduce'` playwright context option in the config. +import './src/globals.css'; // Suppress the motion library's "Reduced Motion enabled" warning — we // intentionally set reducedMotion:'reduce' in vitest.config.browser.mts. diff --git a/frontend/yarn.lock b/frontend/yarn.lock index 442fee867..a2d6b2b19 100644 --- a/frontend/yarn.lock +++ b/frontend/yarn.lock @@ -2577,10 +2577,10 @@ "@milkdown/transformer" "7.18.0" nanoid "^5.0.9" -"@modelcontextprotocol/sdk@^1.26.0": - version "1.27.1" - resolved "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.27.1.tgz" - integrity sha512-sr6GbP+4edBwFndLbM60gf07z0FQ79gaExpnsjMGePXqFcSSb7t6iscpjk9DhFhwd+mTEQrzNafGP8/iGGFYaA== +"@modelcontextprotocol/sdk@^1.29.0": + version "1.29.0" + resolved "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.29.0.tgz" + integrity sha512-zo37mZA9hJWpULgkRpowewez1y6ML5GsXJPY8FI0tBBCd77HEvza4jDqRKOXgHNn867PVGCyTdzqpz0izu5ZjQ== dependencies: "@hono/node-server" "^1.19.9" ajv "^8.17.1"