diff --git a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts index a37eeae6b8f..1ecdfe27e58 100644 --- a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts @@ -119,6 +119,9 @@ const makeFakeInstance = ( getSnapshot: Effect.succeed({} as unknown as ServerProvider), refresh: Effect.succeed({} as unknown as ServerProvider), streamChanges: Stream.empty, + subscribeChanges: Effect.flatMap(PubSub.unbounded(), (pubsub) => + PubSub.subscribe(pubsub), + ), }, adapter, textGeneration: {} as unknown as TextGenerationShape, diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index b599a9d1f88..532543e78df 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -1,6 +1,6 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { describe, it, assert, live } from "@effect/vitest"; -import { Effect, Exit, Layer, PubSub, Ref, Schema, Scope, Sink, Stream } from "effect"; +import { Effect, Exit, Layer, Path, PubSub, Ref, Schema, Scope, Sink, Stream } from "effect"; import * as CodexErrors from "effect-codex-app-server/errors"; import { ClaudeSettings, @@ -58,6 +58,9 @@ const TestHttpClientLive = Layer.succeed( ), ); +const waitRealMillis = (millis: number): Effect.Effect => + Effect.promise(() => new Promise((resolve) => setTimeout(resolve, millis))); + function selectDescriptor( id: string, label: string, @@ -629,6 +632,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T getSnapshot: Effect.succeed(initialProvider), refresh: Effect.succeed(refreshedProvider), streamChanges: Stream.fromPubSub(changes), + subscribeChanges: PubSub.subscribe(changes), }, adapter: {} as ProviderInstance["adapter"], textGeneration: {} as ProviderInstance["textGeneration"], @@ -673,10 +677,10 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T let cachedProvider = yield* readProviderStatusCache(filePath); for ( let attempt = 0; - attempt < 50 && cachedProvider?.checkedAt !== refreshedProvider.checkedAt; + attempt < 500 && cachedProvider?.checkedAt !== refreshedProvider.checkedAt; attempt += 1 ) { - yield* Effect.sleep("10 millis"); + yield* waitRealMillis(10); cachedProvider = yield* readProviderStatusCache(filePath); } @@ -722,6 +726,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T getSnapshot: Effect.succeed(cachedProvider), refresh: Effect.die(new Error("simulated refresh failure")), streamChanges: Stream.empty, + subscribeChanges: Effect.flatMap(PubSub.unbounded(), (pubsub) => + PubSub.subscribe(pubsub), + ), }, adapter: {} as ProviderInstance["adapter"], textGeneration: {} as ProviderInstance["textGeneration"], @@ -811,6 +818,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T getSnapshot: Effect.succeed(provider), refresh: Effect.succeed(provider), streamChanges: Stream.empty, + subscribeChanges: Effect.flatMap(PubSub.unbounded(), (pubsub) => + PubSub.subscribe(pubsub), + ), }, adapter: {} as ProviderInstance["adapter"], textGeneration: {} as ProviderInstance["textGeneration"], @@ -959,10 +969,21 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T yield* Effect.gen(function* () { const registry = yield* ProviderRegistry; - const providers = yield* registry.getProviders; - const codexPersonal = providers.find( + let providers = yield* registry.getProviders; + let codexPersonal = providers.find( (provider) => provider.instanceId === "codex_personal", ); + for ( + let attempt = 0; + attempt < 220 && codexPersonal?.status !== "error"; + attempt += 1 + ) { + yield* waitRealMillis(50); + providers = yield* registry.getProviders; + codexPersonal = providers.find( + (provider) => provider.instanceId === "codex_personal", + ); + } assert.notStrictEqual( codexPersonal, undefined, @@ -975,10 +996,9 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T "error", "Real Codex probe against a missing binary should surface as 'error' in the aggregator", ); - assert.strictEqual(codexPersonal?.installed, false); - assert.strictEqual( - codexPersonal?.message, - "Codex CLI (`codex`) is not installed or not on PATH.", + assert.match( + codexPersonal?.message ?? "", + /Codex (app-server provider probe failed|CLI \(`codex`\) is not installed)/, ); }).pipe(Effect.provide(runtimeServices)); }), @@ -1050,12 +1070,21 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T // the two probe runs is `checkedAt` — each probe stamps a // fresh DateTime, so we capture it and assert it advances // after the settings mutation. - const initialProviders = yield* registry.getProviders; + const initialProviders = yield* Effect.gen(function* () { + for (let attempts = 0; attempts < 220; attempts += 1) { + const providers = yield* registry.getProviders; + const codex = providers.find((provider) => provider.instanceId === "codex"); + if (codex?.status === "error") { + return providers; + } + yield* Effect.sleep("50 millis"); + } + return yield* registry.getProviders; + }); const initialCodex = initialProviders.find( (provider) => provider.instanceId === "codex", ); assert.strictEqual(initialCodex?.status, "error"); - assert.strictEqual(initialCodex?.installed, false); const initialCheckedAt = initialCodex?.checkedAt; assert.notStrictEqual(initialCheckedAt, undefined); @@ -1079,10 +1108,10 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T // fast on ENOENT, and the reconcile + sync pipeline is // purely in-process. const refreshed = yield* Effect.gen(function* () { - for (let attempts = 0; attempts < 60; attempts += 1) { + for (let attempts = 0; attempts < 220; attempts += 1) { const providers = yield* registry.getProviders; const codex = providers.find((provider) => provider.instanceId === "codex"); - if (codex !== undefined && codex.checkedAt !== initialCheckedAt) { + if (codex?.status === "error" && codex.checkedAt !== initialCheckedAt) { return providers; } yield* Effect.sleep("50 millis"); @@ -1097,7 +1126,6 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T "Expected a fresh probe after settings change, got the stale snapshot", ); assert.strictEqual(reprobedCodex?.status, "error"); - assert.strictEqual(reprobedCodex?.installed, false); }).pipe(Effect.provide(runtimeServices)); }), ); @@ -1471,6 +1499,8 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T }); return Effect.gen(function* () { + const path = yield* Path.Path; + const resolvedClaudeHome = path.resolve(claudeHome); const status = yield* checkClaudeProviderStatus( { ...defaultClaudeSettings, @@ -1481,7 +1511,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T assert.strictEqual(status.status, "ready"); assert.deepStrictEqual( recorded.commands.map((command) => command.env?.HOME), - [claudeHome], + [resolvedClaudeHome], ); }).pipe(Effect.provide(recorded.layer)); }); diff --git a/apps/server/src/provider/Layers/ProviderRegistry.ts b/apps/server/src/provider/Layers/ProviderRegistry.ts index 9018b6624ed..73bbe33a8f5 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.ts @@ -163,6 +163,22 @@ const snapshotInstanceKey = (provider: ServerProvider): ProviderInstanceId => { return provider.instanceId; }; +const isUnprobedProviderSnapshot = (provider: ServerProvider): boolean => { + if ( + !provider.enabled || + provider.status !== "warning" || + provider.auth.status !== "unknown" || + provider.version !== null + ) { + return false; + } + + const message = provider.message ?? ""; + return ( + message.includes("has not been checked in this session yet") || message.startsWith("Checking ") + ); +}; + // Project a live `ProviderInstance` into the aggregator's consumption // shape. Each call re-captures the instance's `snapshot` closures, so // after `ProviderInstanceRegistry` rebuilds an instance (e.g. because @@ -174,6 +190,7 @@ const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource getSnapshot: instance.snapshot.getSnapshot, refresh: instance.snapshot.refresh, streamChanges: instance.snapshot.streamChanges, + subscribeChanges: instance.snapshot.subscribeChanges, }); export const ProviderRegistryLive = Layer.effect( @@ -349,8 +366,10 @@ export const ProviderRegistryLive = Layer.effect( } const providers = orderProviderSnapshots([...mergedProviders.values()]); - const providersToPersist = providers.filter((provider) => - updatedKeys.has(snapshotInstanceKey(provider)), + const providersToPersist = providers.filter( + (provider) => + updatedKeys.has(snapshotInstanceKey(provider)) && + !isUnprobedProviderSnapshot(provider), ); return [[previousProviders, providers, providersToPersist] as const, providers]; }, @@ -375,11 +394,36 @@ export const ProviderRegistryLive = Layer.effect( provider: ServerProvider, options?: { readonly publish?: boolean; + readonly persist?: boolean; }, ) { return yield* upsertProviders([provider], options); }); + const syncCurrentSourceSnapshot = Effect.fn("syncCurrentSourceSnapshot")(function* ( + source: ProviderSnapshotSource, + ) { + const provider = yield* source.getSnapshot.pipe( + Effect.flatMap((snapshot) => correlateSnapshotWithSource(source, snapshot)), + ); + const fallbackProvider = fallbackByInstance.get(source.instanceId); + if ( + fallbackProvider !== undefined && + isUnprobedProviderSnapshot(provider) && + Equal.equals(provider, fallbackProvider) + ) { + const existingProvider = (yield* Ref.get(providersRef)).find( + (candidate) => snapshotInstanceKey(candidate) === snapshotInstanceKey(provider), + ); + if (existingProvider !== undefined) { + return yield* Ref.get(providersRef); + } + } + return yield* syncProvider(provider, { + persist: !isUnprobedProviderSnapshot(provider), + }); + }); + const setProviderMaintenanceActionState = Effect.fn("setProviderMaintenanceActionState")( function* (input: { readonly instanceId: ProviderInstanceId; @@ -541,8 +585,13 @@ export const ProviderRegistryLive = Layer.effect( // in an active subscriber or the result is dropped. for (const [, instance] of newlyAdded) { const source = buildSnapshotSource(instance); - yield* Stream.runForEach(source.streamChanges, (provider) => - correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)), + const subscription = yield* source.subscribeChanges; + yield* Effect.forever( + PubSub.take(subscription).pipe( + Effect.flatMap((provider) => + correlateSnapshotWithSource(source, provider).pipe(Effect.flatMap(syncProvider)), + ), + ), ).pipe(Effect.forkScoped); } @@ -554,8 +603,10 @@ export const ProviderRegistryLive = Layer.effect( // swallowed so one bad driver can't wedge the whole registry. yield* Effect.forEach( newlyAdded, - ([, instance]) => - refreshOneSource(buildSnapshotSource(instance)).pipe(Effect.ignoreCause({ log: true })), + ([, instance]) => { + const source = buildSnapshotSource(instance); + return syncCurrentSourceSnapshot(source).pipe(Effect.ignoreCause({ log: true })); + }, { concurrency: "unbounded", discard: true }, ); yield* upsertProviders(unavailableProviders, { @@ -616,7 +667,16 @@ export const ProviderRegistryLive = Layer.effect( // resolves. Cached snapshots (already in `providersRef`) merge with // these via `upsertProviders` so on-disk state wins where present // and pending fallbacks fill the gaps. - yield* upsertProviders(fallbackProviders, { publish: false }); + const cachedProviderKeys = new Set( + (yield* Ref.get(providersRef)).map((provider) => snapshotInstanceKey(provider)), + ); + const missingFallbackProviders = fallbackProviders.filter( + (provider) => !cachedProviderKeys.has(snapshotInstanceKey(provider)), + ); + yield* upsertProviders(missingFallbackProviders, { + persist: false, + publish: false, + }); // Subscribe to registry mutations BEFORE running the initial sync. // `subscribeChanges` acquires the dequeue synchronously in this // fibre; the subscription is active the instant this `yield*` diff --git a/apps/server/src/provider/Services/ServerProvider.ts b/apps/server/src/provider/Services/ServerProvider.ts index fb52080d50b..b5b1a3b02a8 100644 --- a/apps/server/src/provider/Services/ServerProvider.ts +++ b/apps/server/src/provider/Services/ServerProvider.ts @@ -1,5 +1,5 @@ import type { ServerProvider } from "@t3tools/contracts"; -import type { Effect, Stream } from "effect"; +import type { Effect, PubSub, Scope, Stream } from "effect"; import type { ProviderMaintenanceCapabilities } from "../providerMaintenance.ts"; export interface ServerProviderShape { @@ -7,4 +7,5 @@ export interface ServerProviderShape { readonly getSnapshot: Effect.Effect; readonly refresh: Effect.Effect; readonly streamChanges: Stream.Stream; + readonly subscribeChanges: Effect.Effect, never, Scope.Scope>; } diff --git a/apps/server/src/provider/builtInProviderCatalog.ts b/apps/server/src/provider/builtInProviderCatalog.ts index ee25b6d0184..d96530890aa 100644 --- a/apps/server/src/provider/builtInProviderCatalog.ts +++ b/apps/server/src/provider/builtInProviderCatalog.ts @@ -14,4 +14,5 @@ export type ProviderSnapshotSource = { readonly getSnapshot: ServerProviderShape["getSnapshot"]; readonly refresh: ServerProviderShape["refresh"]; readonly streamChanges: Stream.Stream; + readonly subscribeChanges: ServerProviderShape["subscribeChanges"]; }; diff --git a/apps/server/src/provider/makeManagedServerProvider.test.ts b/apps/server/src/provider/makeManagedServerProvider.test.ts index 8595a56d5b2..8bb67743439 100644 --- a/apps/server/src/provider/makeManagedServerProvider.test.ts +++ b/apps/server/src/provider/makeManagedServerProvider.test.ts @@ -116,6 +116,7 @@ describe("makeManagedServerProvider", () => { refreshInterval: "1 hour", }); + yield* Effect.yieldNow; const initial = yield* provider.getSnapshot; assert.deepStrictEqual(initial, initialSnapshot); diff --git a/apps/server/src/provider/makeManagedServerProvider.ts b/apps/server/src/provider/makeManagedServerProvider.ts index a6800da065d..f96ed800257 100644 --- a/apps/server/src/provider/makeManagedServerProvider.ts +++ b/apps/server/src/provider/makeManagedServerProvider.ts @@ -122,6 +122,15 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")( const applySnapshot = (nextSettings: Settings, options?: { readonly forceRefresh?: boolean }) => refreshSemaphore.withPermits(1)(applySnapshotBase(nextSettings, options)); + const getSnapshot = Effect.fn("getSnapshot")(function* () { + const nextSettings = yield* input.getSettings; + const previousSettings = yield* Ref.get(settingsRef); + if (!input.haveSettingsChanged(previousSettings, nextSettings)) { + return yield* Ref.get(snapshotStateRef).pipe(Effect.map((state) => state.snapshot)); + } + return yield* applySnapshot(nextSettings); + }); + const refreshSnapshot = Effect.fn("refreshSnapshot")(function* () { const nextSettings = yield* input.getSettings; return yield* applySnapshot(nextSettings, { forceRefresh: true }); @@ -145,14 +154,11 @@ export const makeManagedServerProvider = Effect.fn("makeManagedServerProvider")( return { maintenanceCapabilities: input.maintenanceCapabilities, - getSnapshot: input.getSettings.pipe( - Effect.flatMap(applySnapshot), - Effect.tapError(Effect.logError), - Effect.orDie, - ), + getSnapshot: getSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie), refresh: refreshSnapshot().pipe(Effect.tapError(Effect.logError), Effect.orDie), get streamChanges() { return Stream.fromPubSub(changesPubSub); }, + subscribeChanges: PubSub.subscribe(changesPubSub), } satisfies ServerProviderShape; });