Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/mesh/src/cli/config-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const SECRET_KEYS = new Set([
"ENCRYPTION_KEY",
"MESH_JWT_SECRET",
"STUDIO_PROVISION_SECRET_KEY",
"CLICKHOUSE_KEY_SECRET",
]);

const URL_KEYS = new Set(["DATABASE_URL", "CLICKHOUSE_URL", "NATS_URL"]);
Expand Down Expand Up @@ -141,6 +142,9 @@ function getConfigSections(e: Settings): ConfigSection[] {
title: "Observability",
entries: [
{ key: "CLICKHOUSE_URL", value: e.clickhouseUrl },
{ key: "CLICKHOUSE_SERVICE_ID", value: e.clickhouseServiceId },
{ key: "CLICKHOUSE_KEY_ID", value: e.clickhouseKeyId },
{ key: "CLICKHOUSE_KEY_SECRET", value: e.clickhouseKeySecret },
{ key: "OTEL_SERVICE_NAME", value: e.otelServiceName },
],
},
Expand Down
19 changes: 14 additions & 5 deletions apps/mesh/src/core/context-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
import {
createMonitoringEngine,
ClickHouseClientEngine,
ClickHouseQueryApiEngine,
resolveClickHouseQueryApiConfig,
} from "../monitoring/query-engine";
import type { QueryEngine } from "../monitoring/query-engine";
import { getLogsDir, getMetricsDir } from "../monitoring/schema";
Expand Down Expand Up @@ -980,8 +982,12 @@ export async function createMeshContextFactory(
const vault = new CredentialVault(config.encryption.key);

// Create monitoring engines (shared across requests)
const clickhouseUrl = getSettings().clickhouseUrl;
const isClickHouse = !!clickhouseUrl;
const settings = getSettings();
const clickhouseUrl = settings.clickhouseUrl;
// Query API (OpenAPI key) takes precedence when configured; the url-based
// engine remains the default so existing deployments are unaffected.
const queryApiConfig = resolveClickHouseQueryApiConfig(settings);
const isClickHouse = !!clickhouseUrl || !!queryApiConfig;
const dialect: SqlDialect = config.monitoringEngines
? "duckdb"
: isClickHouse
Expand All @@ -997,9 +1003,12 @@ export async function createMeshContextFactory(
// crash). See MeshContextConfig.monitoringEngines for the why.
monitoringEngine = config.monitoringEngines.monitoringEngine;
metricEngine = config.monitoringEngines.metricEngine;
} else if (isClickHouse) {
monitoringEngine = new ClickHouseClientEngine(clickhouseUrl!);
metricEngine = new ClickHouseClientEngine(clickhouseUrl!);
} else if (queryApiConfig) {
monitoringEngine = new ClickHouseQueryApiEngine(queryApiConfig);
metricEngine = new ClickHouseQueryApiEngine(queryApiConfig);
} else if (clickhouseUrl) {
monitoringEngine = new ClickHouseClientEngine(clickhouseUrl);
metricEngine = new ClickHouseClientEngine(clickhouseUrl);
} else {
const { engine: me } = await createMonitoringEngine({
basePath: getLogsDir(),
Expand Down
31 changes: 27 additions & 4 deletions apps/mesh/src/monitoring/clickhouse-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
* eliminate per-query full-table scans on monitoring_metrics.
*
* Uses its own @clickhouse/client instance with client.command() for DDL,
* keeping the QueryEngine interface read-only.
* keeping the QueryEngine interface read-only. When a ClickHouse Cloud Query
* API config is supplied instead of a url, DDL is sent over the Query API.
*/

import {
runClickHouseQueryApi,
type ClickHouseQueryApiConfig,
} from "./query-engine";

const ROLLUP_TABLE_DDL = `
CREATE TABLE IF NOT EXISTS monitoring_metrics_rollup_1m (
bucket DateTime,
Expand Down Expand Up @@ -54,19 +60,36 @@ FROM monitoring_metrics
GROUP BY organization_id, name, bucket, connection_id, tool_name, status
`;

export type ClickHouseRollupTarget =
| { url: string }
| { queryApi: ClickHouseQueryApiConfig };

/**
* Run ClickHouse DDL to create the rollup table and materialized view.
*
* Logs errors but does not throw — queries detect the rollup table's
* existence at query time and fall back to the raw table automatically.
* (Creating these objects requires a credential with DDL rights; a
* read-only Query API key will land on the raw-table fallback.)
*/
export async function ensureClickHouseRollup(
clickhouseUrl: string,
target: ClickHouseRollupTarget,
): Promise<void> {
try {
const { createClient } = await import("@clickhouse/client");
const client = createClient({ url: clickhouseUrl });
if ("queryApi" in target) {
await runClickHouseQueryApi(target.queryApi, ROLLUP_TABLE_DDL);
console.log(
"[clickhouse-schema] monitoring_metrics_rollup_1m table ready",
);
await runClickHouseQueryApi(target.queryApi, MATERIALIZED_VIEW_DDL);
console.log(
"[clickhouse-schema] monitoring_metrics_rollup_1m_mv view ready",
);
return;
}

const { createClient } = await import("@clickhouse/client");
const client = createClient({ url: target.url });
try {
await client.command({ query: ROLLUP_TABLE_DDL });
console.log(
Expand Down
109 changes: 109 additions & 0 deletions apps/mesh/src/monitoring/query-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,115 @@ export class ClickHouseClientEngine implements QueryEngine {
}
}

/**
* Config for the ClickHouse Cloud Query API engine. Authenticates with an
* OpenAPI key (key id + secret over HTTP Basic) instead of a database
* user/password embedded in a connection URL.
*/
export interface ClickHouseQueryApiConfig {
serviceId: string;
keyId: string;
keySecret: string;
maxMemoryUsage?: string;
maxExecutionTime?: number;
}

/**
* Build a ClickHouseQueryApiConfig from settings, or undefined when the
* three required values are not all present. Used by callers to decide
* whether to opt into the Query API path over the url-based engine.
*/
export function resolveClickHouseQueryApiConfig(settings: {
clickhouseServiceId?: string;
clickhouseKeyId?: string;
clickhouseKeySecret?: string;
}): ClickHouseQueryApiConfig | undefined {
const { clickhouseServiceId, clickhouseKeyId, clickhouseKeySecret } =
settings;
if (clickhouseServiceId && clickhouseKeyId && clickhouseKeySecret) {
return {
serviceId: clickhouseServiceId,
keyId: clickhouseKeyId,
keySecret: clickhouseKeySecret,
};
}
return undefined;
}

/**
* Execute one SQL statement against the ClickHouse Cloud query gateway and
* return parsed JSONEachRow rows. Shared by ClickHouseQueryApiEngine (read
* SELECTs) and the rollup DDL path (CREATE TABLE / MATERIALIZED VIEW).
*/
export async function runClickHouseQueryApi(
config: ClickHouseQueryApiConfig,
sql: string,
): Promise<Record<string, unknown>[]> {
const auth = Buffer.from(`${config.keyId}:${config.keySecret}`).toString(
"base64",
);
const response = await fetch(
`https://queries.clickhouse.cloud/service/${config.serviceId}/run?format=JSONEachRow`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Basic ${auth}`,
},
body: JSON.stringify({ sql }),
},
);
if (!response.ok) {
const errorText = await response.text();
throw new Error(
`ClickHouse Query API failed: ${response.status} ${response.statusText} - ${errorText.slice(0, 500)}`,
);
}
const text = await response.text();
if (!text.trim()) return [];
const rows: Record<string, unknown>[] = [];
for (const line of text.trim().split("\n")) {
if (!line.trim()) continue;
try {
rows.push(JSON.parse(line));
} catch {
console.error(
`[clickhouse-query-api] skipped malformed row: ${line.slice(0, 100)}...`,
);
}
}
return rows;
}

/**
* ClickHouse Cloud Query API engine for production monitoring queries.
* Runs SQL over `queries.clickhouse.cloud/service/{id}/run` with an OpenAPI
* key. Opt-in alternative to ClickHouseClientEngine, which stays the default
* so existing url-based deployments are unaffected.
*/
export class ClickHouseQueryApiEngine implements QueryEngine {
private maxMemoryUsage: string;
private maxExecutionTime: number;

constructor(private config: ClickHouseQueryApiConfig) {
this.maxMemoryUsage = config.maxMemoryUsage ?? "200000000";
this.maxExecutionTime = config.maxExecutionTime ?? 30;
}

async query(sql: string): Promise<Record<string, unknown>[]> {
return runClickHouseQueryApi(this.config, this.withSettings(sql));
}

// Preserve the memory/time guardrails ClickHouseClientEngine sets via
// clickhouse_settings. The gateway takes raw SQL, so the limits are
// appended as a SETTINGS clause; skipped if the SQL already declares one.
private withSettings(sql: string): string {
if (/\bsettings\b/i.test(sql)) return sql;
const half = Math.floor(Number(this.maxMemoryUsage) / 2);
return `${sql}\nSETTINGS max_memory_usage=${this.maxMemoryUsage}, max_execution_time=${this.maxExecutionTime}, max_bytes_before_external_group_by=${half}, max_bytes_before_external_sort=${half}`;
}
}

const DEFAULT_TABLE_NAME = "monitoring_logs";

export interface MonitoringEngineConfig {
Expand Down
15 changes: 13 additions & 2 deletions apps/mesh/src/settings/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,22 @@ export async function buildSettings(flags: CliFlags): Promise<BuildResult> {
}

// 4b. ClickHouse rollup DDL (non-blocking — queries fall back to raw table)
if (config.settings.clickhouseUrl) {
if (
config.settings.clickhouseUrl ||
(config.settings.clickhouseServiceId &&
config.settings.clickhouseKeyId &&
config.settings.clickhouseKeySecret)
) {
const { ensureClickHouseRollup } = await import(
"../monitoring/clickhouse-schema"
);
await ensureClickHouseRollup(config.settings.clickhouseUrl);
const { resolveClickHouseQueryApiConfig } = await import(
"../monitoring/query-engine"
);
const queryApi = resolveClickHouseQueryApiConfig(config.settings);
await ensureClickHouseRollup(
queryApi ? { queryApi } : { url: config.settings.clickhouseUrl! },
);
}

// 5. Assemble and freeze
Expand Down
3 changes: 3 additions & 0 deletions apps/mesh/src/settings/resolve-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ export function resolveConfig(

// Observability
clickhouseUrl: envVars.CLICKHOUSE_URL,
clickhouseServiceId: envVars.CLICKHOUSE_SERVICE_ID,
clickhouseKeyId: envVars.CLICKHOUSE_KEY_ID,
clickhouseKeySecret: envVars.CLICKHOUSE_KEY_SECRET,
otelServiceName: envVars.OTEL_SERVICE_NAME || "studio",

// Config files
Expand Down
3 changes: 3 additions & 0 deletions apps/mesh/src/settings/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ export interface Settings {

// Observability
clickhouseUrl: string | undefined;
clickhouseServiceId: string | undefined;
clickhouseKeyId: string | undefined;
clickhouseKeySecret: string | undefined;
otelServiceName: string;

// Event Bus & Networking
Expand Down
Loading