diff --git a/apps/provider-inventory/src/config/env.config.ts b/apps/provider-inventory/src/config/env.config.ts index 5e8b5b053c..29914fa689 100644 --- a/apps/provider-inventory/src/config/env.config.ts +++ b/apps/provider-inventory/src/config/env.config.ts @@ -2,11 +2,11 @@ import { z } from "zod"; export const envSchema = z.object({ PROVIDER_INVENTORY_POSTGRES_URL: z.string(), - DRIZZLE_MIGRATIONS_FOLDER: z.string().optional().default("./drizzle"), - LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).optional().default("info"), - STD_OUT_LOG_FORMAT: z.enum(["json", "pretty"]).optional().default("json"), - NODE_ENV: z.enum(["development", "production", "test"]).optional().default("development"), - PORT: z.number({ coerce: true }).optional().default(3092) + DRIZZLE_MIGRATIONS_FOLDER: z.string().default("./drizzle"), + LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace"]).default("info"), + STD_OUT_LOG_FORMAT: z.enum(["json", "pretty"]).default("json"), + PORT: z.number({ coerce: true }).default(3092), + DISCOVERY_INTERVAL_MS: z.number({ coerce: true }).default(10 * 60 * 1000) // 10 minutes }); export type EnvConfig = z.infer; diff --git a/apps/provider-inventory/src/index.ts b/apps/provider-inventory/src/index.ts index 4cb776e06b..1717dc9bdb 100644 --- a/apps/provider-inventory/src/index.ts +++ b/apps/provider-inventory/src/index.ts @@ -10,7 +10,9 @@ import { container } from "tsyringe"; import { APP_CONFIG } from "@src/providers/app-config.provider"; import { healthzRouter } from "@src/routes"; +import { DiscoverySchedulerService } from "@src/services/discovery-scheduler/discovery-scheduler.service"; import { HonoErrorHandlerService } from "@src/services/hono-error-handler/hono-error-handler.service"; +import { ProviderInventoryWriterService } from "@src/services/provider-inventory-writer/provider-inventory-writer.service"; import { startServer } from "@src/services/start-server/start-server"; import type { AppEnv } from "@src/types/app-context"; @@ -28,6 +30,10 @@ export async function bootstrap(): Promise { const app = createApp(); await startServer(app, createOtelLogger({ context: "APP" }), process, { - port: container.resolve(APP_CONFIG).PORT + port: container.resolve(APP_CONFIG).PORT, + beforeStart: async () => { + await container.resolve(ProviderInventoryWriterService).resetOnlineSince(); + container.resolve(DiscoverySchedulerService).start(); + } }); } diff --git a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts new file mode 100644 index 0000000000..7b5409c338 --- /dev/null +++ b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts @@ -0,0 +1,268 @@ +import { describe, expect, it } from "vitest"; + +import { computeRollups } from "./compute-rollups"; + +describe(computeRollups.name, () => { + it("returns all zeros for an empty cluster", () => { + const result = computeRollups({ nodes: [], storage: [] }); + + expect(result).toEqual({ + totalAvailableCpu: 0n, + totalAvailableMemory: 0n, + totalAvailableGpu: 0n, + totalAvailableEph: 0n, + totalAvailablePersistent: 0n, + maxNodeFreeCpu: 0n, + maxNodeFreeMemory: 0n, + maxNodeFreeGpu: 0n, + gpuModels: [], + storageClasses: [] + }); + }); + + it("computes rollups for a single node", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 4000 }, + memory: { available: 8_000_000_000 }, + gpu: [{ vendor: "nvidia", model: "a100", available: 2 }], + ephStorage: { available: 100_000_000_000 }, + persistentStorage: [{ class: "beta2", available: 500_000_000_000 }] + } + ], + storage: [] + }); + + expect(result.totalAvailableCpu).toBe(4000n); + expect(result.totalAvailableMemory).toBe(8_000_000_000n); + expect(result.totalAvailableGpu).toBe(2n); + expect(result.totalAvailableEph).toBe(100_000_000_000n); + expect(result.totalAvailablePersistent).toBe(500_000_000_000n); + expect(result.maxNodeFreeCpu).toBe(4000n); + expect(result.maxNodeFreeMemory).toBe(8_000_000_000n); + expect(result.maxNodeFreeGpu).toBe(2n); + expect(result.gpuModels).toEqual(["nvidia/a100"]); + expect(result.storageClasses).toEqual(["beta2"]); + }); + + it("sums totals across multiple nodes and tracks max-per-node", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 2000 }, + memory: { available: 4_000_000_000 }, + gpu: [{ vendor: "nvidia", model: "a100", available: 1 }], + ephStorage: { available: 50_000_000_000 }, + persistentStorage: [] + }, + { + name: "node-2", + cpu: { available: 8000 }, + memory: { available: 16_000_000_000 }, + gpu: [{ vendor: "nvidia", model: "a100", available: 4 }], + ephStorage: { available: 200_000_000_000 }, + persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] + } + ], + storage: [] + }); + + expect(result.totalAvailableCpu).toBe(10_000n); + expect(result.totalAvailableMemory).toBe(20_000_000_000n); + expect(result.totalAvailableGpu).toBe(5n); + expect(result.totalAvailableEph).toBe(250_000_000_000n); + expect(result.totalAvailablePersistent).toBe(1_000_000_000_000n); + expect(result.maxNodeFreeCpu).toBe(8000n); + expect(result.maxNodeFreeMemory).toBe(16_000_000_000n); + expect(result.maxNodeFreeGpu).toBe(4n); + }); + + it("deduplicates GPU models across nodes", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 1000 }, + memory: { available: 1000 }, + gpu: [ + { vendor: "nvidia", model: "a100", available: 1 }, + { vendor: "amd", model: "mi300x", available: 1 } + ], + ephStorage: { available: 0 }, + persistentStorage: [] + }, + { + name: "node-2", + cpu: { available: 1000 }, + memory: { available: 1000 }, + gpu: [{ vendor: "nvidia", model: "a100", available: 2 }], + ephStorage: { available: 0 }, + persistentStorage: [] + } + ], + storage: [] + }); + + expect(result.gpuModels).toEqual(["amd/mi300x", "nvidia/a100"]); + }); + + it("handles ephemeral-only storage", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 1000 }, + memory: { available: 1000 }, + gpu: [], + ephStorage: { available: 500_000_000_000 }, + persistentStorage: [] + } + ], + storage: [] + }); + + expect(result.totalAvailableEph).toBe(500_000_000_000n); + expect(result.totalAvailablePersistent).toBe(0n); + expect(result.storageClasses).toEqual([]); + }); + + it("handles persistent-only storage with multiple classes", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 1000 }, + memory: { available: 1000 }, + gpu: [], + ephStorage: { available: 0 }, + persistentStorage: [ + { class: "beta2", available: 100_000_000_000 }, + { class: "beta3", available: 200_000_000_000 } + ] + } + ], + storage: [] + }); + + expect(result.totalAvailablePersistent).toBe(300_000_000_000n); + expect(result.storageClasses).toEqual(["beta2", "beta3"]); + }); + + it("collects storage classes from both nodes and cluster-level storage", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 0 }, + memory: { available: 0 }, + gpu: [], + ephStorage: { available: 0 }, + persistentStorage: [{ class: "beta2", available: 100 }] + } + ], + storage: [{ class: "beta3", available: 500 }] + }); + + expect(result.storageClasses).toEqual(["beta2", "beta3"]); + }); + + it("handles nodes with no GPUs", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 4000 }, + memory: { available: 8_000_000_000 }, + gpu: [], + ephStorage: { available: 100_000_000_000 }, + persistentStorage: [] + } + ], + storage: [] + }); + + expect(result.totalAvailableGpu).toBe(0n); + expect(result.maxNodeFreeGpu).toBe(0n); + expect(result.gpuModels).toEqual([]); + }); + + it("clamps negative values to zero (overcommit)", () => { + const result = computeRollups({ + nodes: [ + { + name: "overcommitted", + cpu: { available: -500 }, + memory: { available: -1_000_000 }, + gpu: [{ vendor: "nvidia", model: "a100", available: -1 }], + ephStorage: { available: -100 }, + persistentStorage: [{ class: "beta2", available: -200 }] + } + ], + storage: [] + }); + + expect(result.totalAvailableCpu).toBe(0n); + expect(result.totalAvailableMemory).toBe(0n); + expect(result.totalAvailableGpu).toBe(0n); + expect(result.totalAvailableEph).toBe(0n); + expect(result.totalAvailablePersistent).toBe(0n); + expect(result.maxNodeFreeCpu).toBe(0n); + expect(result.maxNodeFreeMemory).toBe(0n); + expect(result.maxNodeFreeGpu).toBe(0n); + }); + + it("handles all-zero capacity", () => { + const result = computeRollups({ + nodes: [ + { + name: "idle", + cpu: { available: 0 }, + memory: { available: 0 }, + gpu: [], + ephStorage: { available: 0 }, + persistentStorage: [] + } + ], + storage: [] + }); + + expect(result.totalAvailableCpu).toBe(0n); + expect(result.totalAvailableMemory).toBe(0n); + expect(result.maxNodeFreeCpu).toBe(0n); + expect(result.maxNodeFreeMemory).toBe(0n); + }); + + it("sums GPU count per node for max-node-free-gpu", () => { + const result = computeRollups({ + nodes: [ + { + name: "node-1", + cpu: { available: 0 }, + memory: { available: 0 }, + gpu: [ + { vendor: "nvidia", model: "a100", available: 2 }, + { vendor: "nvidia", model: "h100", available: 3 } + ], + ephStorage: { available: 0 }, + persistentStorage: [] + }, + { + name: "node-2", + cpu: { available: 0 }, + memory: { available: 0 }, + gpu: [{ vendor: "nvidia", model: "a100", available: 4 }], + ephStorage: { available: 0 }, + persistentStorage: [] + } + ], + storage: [] + }); + + expect(result.maxNodeFreeGpu).toBe(5n); + expect(result.totalAvailableGpu).toBe(9n); + expect(result.gpuModels).toEqual(["nvidia/a100", "nvidia/h100"]); + }); +}); diff --git a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts new file mode 100644 index 0000000000..d619b7fabc --- /dev/null +++ b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts @@ -0,0 +1,65 @@ +import type { Inventory, InventoryRollups } from "@src/types/inventory"; + +export function computeRollups(inventory: Inventory): InventoryRollups { + let totalAvailableCpu = 0n; + let totalAvailableMemory = 0n; + let totalAvailableGpu = 0n; + let totalAvailableEph = 0n; + let totalAvailablePersistent = 0n; + let maxNodeFreeCpu = 0n; + let maxNodeFreeMemory = 0n; + let maxNodeFreeGpu = 0n; + const gpuModelSet = new Set(); + const storageClassSet = new Set(); + + for (const node of inventory.nodes) { + const nodeCpu = clamp(node.cpu.available); + const nodeMemory = clamp(node.memory.available); + const nodeEph = clamp(node.ephStorage.available); + + totalAvailableCpu += nodeCpu; + totalAvailableMemory += nodeMemory; + totalAvailableEph += nodeEph; + + if (nodeCpu > maxNodeFreeCpu) maxNodeFreeCpu = nodeCpu; + if (nodeMemory > maxNodeFreeMemory) maxNodeFreeMemory = nodeMemory; + + let nodeGpuTotal = 0n; + for (const gpu of node.gpu) { + const gpuCount = clamp(gpu.available); + nodeGpuTotal += gpuCount; + totalAvailableGpu += gpuCount; + if (gpu.vendor && gpu.model) { + gpuModelSet.add(`${gpu.vendor}/${gpu.model}`); + } + } + if (nodeGpuTotal > maxNodeFreeGpu) maxNodeFreeGpu = nodeGpuTotal; + + for (const ps of node.persistentStorage) { + totalAvailablePersistent += clamp(ps.available); + if (ps.class) storageClassSet.add(ps.class); + } + } + + for (const s of inventory.storage) { + if (s.class) storageClassSet.add(s.class); + } + + return { + totalAvailableCpu, + totalAvailableMemory, + totalAvailableGpu, + totalAvailableEph, + totalAvailablePersistent, + maxNodeFreeCpu, + maxNodeFreeMemory, + maxNodeFreeGpu, + gpuModels: [...gpuModelSet].sort(), + storageClasses: [...storageClassSet].sort() + }; +} + +function clamp(value: number): bigint { + if (!Number.isFinite(value) || value <= 0) return 0n; + return BigInt(value); +} diff --git a/apps/provider-inventory/src/lib/project-row/project-row.spec.ts b/apps/provider-inventory/src/lib/project-row/project-row.spec.ts new file mode 100644 index 0000000000..c438d907f5 --- /dev/null +++ b/apps/provider-inventory/src/lib/project-row/project-row.spec.ts @@ -0,0 +1,80 @@ +import { describe, expect, it } from "vitest"; + +import { projectRow } from "./project-row"; + +describe(projectRow.name, () => { + it("projects an empty cluster", () => { + const result = projectRow({ nodes: [], storage: [] }); + + expect(result.inventory).toEqual({ nodes: [], storage: [] }); + expect(result.totalAvailableCpu).toBe(0n); + expect(result.totalAvailableMemory).toBe(0n); + expect(result.totalAvailableGpu).toBe(0n); + expect(result.gpuModels).toEqual([]); + expect(result.storageClasses).toEqual([]); + }); + + it("normalizes a single node into inventory JSONB and computes rollups", () => { + const result = projectRow({ + nodes: [ + { + name: "node-1", + cpuAvailable: 8000, + memoryAvailable: 32_000_000_000, + gpus: [{ vendor: "nvidia", model: "rtx4090", available: 1 }], + ephStorageAvailable: 500_000_000_000, + persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] + } + ], + storage: [{ class: "beta2", available: 2_000_000_000_000 }] + }); + + expect(result.inventory.nodes).toHaveLength(1); + expect(result.inventory.nodes[0]).toEqual({ + name: "node-1", + cpu: { available: 8000 }, + memory: { available: 32_000_000_000 }, + gpu: [{ vendor: "nvidia", model: "rtx4090", available: 1 }], + ephStorage: { available: 500_000_000_000 }, + persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] + }); + expect(result.inventory.storage).toEqual([{ class: "beta2", available: 2_000_000_000_000 }]); + + expect(result.totalAvailableCpu).toBe(8000n); + expect(result.totalAvailableGpu).toBe(1n); + expect(result.gpuModels).toEqual(["nvidia/rtx4090"]); + expect(result.storageClasses).toEqual(["beta2"]); + }); + + it("projects a multi-node cluster with correct max-per-node tracking", () => { + const result = projectRow({ + nodes: [ + { + name: "small", + cpuAvailable: 2000, + memoryAvailable: 4_000_000_000, + gpus: [], + ephStorageAvailable: 50_000_000_000, + persistentStorage: [] + }, + { + name: "large", + cpuAvailable: 16000, + memoryAvailable: 64_000_000_000, + gpus: [{ vendor: "nvidia", model: "a100", available: 8 }], + ephStorageAvailable: 1_000_000_000_000, + persistentStorage: [{ class: "beta3", available: 2_000_000_000_000 }] + } + ], + storage: [] + }); + + expect(result.inventory.nodes).toHaveLength(2); + expect(result.totalAvailableCpu).toBe(18_000n); + expect(result.maxNodeFreeCpu).toBe(16_000n); + expect(result.totalAvailableMemory).toBe(68_000_000_000n); + expect(result.maxNodeFreeMemory).toBe(64_000_000_000n); + expect(result.totalAvailableGpu).toBe(8n); + expect(result.maxNodeFreeGpu).toBe(8n); + }); +}); diff --git a/apps/provider-inventory/src/lib/project-row/project-row.ts b/apps/provider-inventory/src/lib/project-row/project-row.ts new file mode 100644 index 0000000000..f53ffd6381 --- /dev/null +++ b/apps/provider-inventory/src/lib/project-row/project-row.ts @@ -0,0 +1,31 @@ +import { computeRollups } from "@src/lib/compute-rollups/compute-rollups"; +import type { Inventory, ProjectedRow } from "@src/types/inventory"; +import type { StreamStatusMessage } from "@src/types/stream-status"; + +export function projectRow(message: StreamStatusMessage): ProjectedRow { + const inventory: Inventory = { + nodes: message.nodes.map(node => ({ + name: node.name, + cpu: { available: node.cpuAvailable }, + memory: { available: node.memoryAvailable }, + gpu: node.gpus.map(g => ({ + vendor: g.vendor, + model: g.model, + available: g.available + })), + ephStorage: { available: node.ephStorageAvailable }, + persistentStorage: node.persistentStorage.map(ps => ({ + class: ps.class, + available: ps.available + })) + })), + storage: message.storage.map(s => ({ + class: s.class, + available: s.available + })) + }; + + const rollups = computeRollups(inventory); + + return { inventory, ...rollups }; +} diff --git a/apps/provider-inventory/src/providers/chain-query.provider.ts b/apps/provider-inventory/src/providers/chain-query.provider.ts new file mode 100644 index 0000000000..192fff3b4f --- /dev/null +++ b/apps/provider-inventory/src/providers/chain-query.provider.ts @@ -0,0 +1,37 @@ +import type { InjectionToken } from "tsyringe"; +import { container, instancePerContainerCachingFactory } from "tsyringe"; + +import type { SelfAttribute } from "@src/types/chain-provider"; + +export interface ChainProviderRecord { + owner: string; + hostUri: string; + createdHeight: bigint; + attributes: SelfAttribute[]; +} + +export interface ChainAuditRecord { + owner: string; + attributes: Array<{ key: string; value: string; auditor: string }>; +} + +export interface ChainQueryClient { + getProviders(): Promise; + getAllProvidersAttributes(): Promise; +} + +export const CHAIN_QUERY_CLIENT: InjectionToken = Symbol("CHAIN_QUERY_CLIENT"); + +container.register(CHAIN_QUERY_CLIENT, { + useFactory: instancePerContainerCachingFactory( + () => + ({ + async getProviders() { + return []; + }, + async getAllProvidersAttributes() { + return []; + } + }) satisfies ChainQueryClient + ) +}); diff --git a/apps/provider-inventory/src/providers/drizzle.provider.ts b/apps/provider-inventory/src/providers/drizzle.provider.ts new file mode 100644 index 0000000000..752511ad87 --- /dev/null +++ b/apps/provider-inventory/src/providers/drizzle.provider.ts @@ -0,0 +1,12 @@ +import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; +import { drizzle } from "drizzle-orm/postgres-js"; +import type { InjectionToken } from "tsyringe"; +import { container, instancePerContainerCachingFactory } from "tsyringe"; + +import { PG_CLIENT } from "@src/providers/postgres.provider"; + +export const DRIZZLE_DB: InjectionToken = Symbol("DRIZZLE_DB"); + +container.register(DRIZZLE_DB, { + useFactory: instancePerContainerCachingFactory(c => drizzle(c.resolve(PG_CLIENT))) +}); diff --git a/apps/provider-inventory/src/providers/index.ts b/apps/provider-inventory/src/providers/index.ts index 5ae579efd5..2f6f851dc3 100644 --- a/apps/provider-inventory/src/providers/index.ts +++ b/apps/provider-inventory/src/providers/index.ts @@ -2,3 +2,7 @@ export * from "./raw-app-config.provider"; export * from "./app-config.provider"; export * from "./logging.provider"; export * from "./postgres.provider"; +export * from "./drizzle.provider"; +export * from "./logger-factory.provider"; +export * from "./chain-query.provider"; +export * from "./provider-stream.provider"; diff --git a/apps/provider-inventory/src/providers/logger-factory.provider.ts b/apps/provider-inventory/src/providers/logger-factory.provider.ts new file mode 100644 index 0000000000..070cffcdfd --- /dev/null +++ b/apps/provider-inventory/src/providers/logger-factory.provider.ts @@ -0,0 +1,10 @@ +import type { LoggerService } from "@akashnetwork/logging"; +import { createOtelLogger } from "@akashnetwork/logging/otel"; +import type { InjectionToken } from "tsyringe"; +import { container } from "tsyringe"; + +export type LoggerFactory = (options: { context: string }) => LoggerService; + +export const LOGGER_FACTORY: InjectionToken = Symbol("LOGGER_FACTORY"); + +container.register(LOGGER_FACTORY, { useValue: createOtelLogger }); diff --git a/apps/provider-inventory/src/providers/provider-stream.provider.ts b/apps/provider-inventory/src/providers/provider-stream.provider.ts new file mode 100644 index 0000000000..6e6d4100fb --- /dev/null +++ b/apps/provider-inventory/src/providers/provider-stream.provider.ts @@ -0,0 +1,20 @@ +import type { InjectionToken } from "tsyringe"; +import { container, instancePerContainerCachingFactory } from "tsyringe"; + +import type { StreamStatusMessage } from "@src/types/stream-status"; + +export interface ProviderStreamFactory { + openStatusStream(hostUri: string, signal: AbortSignal): AsyncIterable; +} + +export const PROVIDER_STREAM_FACTORY: InjectionToken = Symbol("PROVIDER_STREAM_FACTORY"); + +container.register(PROVIDER_STREAM_FACTORY, { + useFactory: instancePerContainerCachingFactory( + () => + ({ + // eslint-disable-next-line @typescript-eslint/no-empty-function + async *openStatusStream() {} + }) satisfies ProviderStreamFactory + ) +}); diff --git a/apps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.ts b/apps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.ts new file mode 100644 index 0000000000..1bfbd83654 --- /dev/null +++ b/apps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.ts @@ -0,0 +1,46 @@ +import type { LoggerService } from "@akashnetwork/logging"; +import { inject, singleton } from "tsyringe"; + +import type { ChainQueryClient } from "@src/providers/chain-query.provider"; +import { CHAIN_QUERY_CLIENT } from "@src/providers/chain-query.provider"; +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; +import type { ChainProvider } from "@src/types/chain-provider"; + +@singleton() +export class ChainProviderPollerService { + readonly #logger: LoggerService; + readonly #chainClient: ChainQueryClient; + + constructor(@inject(CHAIN_QUERY_CLIENT) chainClient: ChainQueryClient, @inject(LOGGER_FACTORY) loggerFactory: LoggerFactory) { + this.#chainClient = chainClient; + this.#logger = loggerFactory({ context: "ChainProviderPoller" }); + } + + async poll(): Promise { + this.#logger.info({ event: "CHAIN_POLL_START" }); + + const [providers, auditRecords] = await Promise.all([this.#chainClient.getProviders(), this.#chainClient.getAllProvidersAttributes()]); + + const signedByOwner = new Map>(); + for (const record of auditRecords) { + const existing = signedByOwner.get(record.owner); + if (existing) { + existing.push(...record.attributes); + } else { + signedByOwner.set(record.owner, [...record.attributes]); + } + } + + const result: ChainProvider[] = providers.map(p => ({ + owner: p.owner, + hostUri: p.hostUri, + createdHeight: p.createdHeight, + selfAttributes: p.attributes, + signedAttributes: signedByOwner.get(p.owner) ?? [] + })); + + this.#logger.info({ event: "CHAIN_POLL_COMPLETE", providerCount: result.length }); + return result; + } +} diff --git a/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts new file mode 100644 index 0000000000..b6b2e32935 --- /dev/null +++ b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts @@ -0,0 +1,153 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { mock } from "vitest-mock-extended"; + +import type { EnvConfig } from "@src/providers/app-config.provider"; +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import type { ChainProviderPollerService } from "@src/services/chain-provider-poller/chain-provider-poller.service"; +import type { ProviderInventoryWriterService } from "@src/services/provider-inventory-writer/provider-inventory-writer.service"; +import type { StreamLifecycleManagerService } from "@src/services/stream-lifecycle-manager/stream-lifecycle-manager.service"; +import { DiscoverySchedulerService } from "./discovery-scheduler.service"; + +describe(DiscoverySchedulerService.name, () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("fires the first tick immediately on start", async () => { + const { poller } = setup(); + + await vi.advanceTimersByTimeAsync(0); + + expect(poller.poll).toHaveBeenCalledTimes(1); + }); + + it("schedules subsequent ticks via recursive setTimeout after each tick completes", async () => { + const { poller, config } = setup(); + + await vi.advanceTimersByTimeAsync(0); + expect(poller.poll).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(3); + }); + + it("never uses setInterval", async () => { + const setIntervalSpy = vi.spyOn(globalThis, "setInterval"); + + setup(); + await vi.advanceTimersByTimeAsync(0); + + expect(setIntervalSpy).not.toHaveBeenCalled(); + setIntervalSpy.mockRestore(); + }); + + it("does not overlap ticks when a tick is slow", async () => { + const pollDuration = 1_200_000; + const { poller, config } = setup({ + pollDelay: () => pollDuration + }); + + await vi.advanceTimersByTimeAsync(0); + expect(poller.poll).toHaveBeenCalledTimes(1); + + // Mid-poll: no second tick despite interval elapsed + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(1); + + // Poll completes at pollDuration; next tick is scheduled DISCOVERY_INTERVAL_MS later + await vi.advanceTimersByTimeAsync(pollDuration - config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(1); + + // Advance past the scheduled interval after tick completion + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(2); + }); + + it("writes attributes and reconciles streams on each tick", async () => { + const fakeProviders = [{ owner: "akash1abc", hostUri: "https://provider.example.com:8443", createdHeight: 100n, selfAttributes: [], signedAttributes: [] }]; + const { writer, lifecycle } = setup({ providers: fakeProviders }); + + await vi.advanceTimersByTimeAsync(0); + + expect(writer.upsertAttributes).toHaveBeenCalledWith(fakeProviders[0]); + expect(lifecycle.reconcile).toHaveBeenCalledWith(fakeProviders); + }); + + it("continues scheduling after a poll error", async () => { + const { poller, config } = setup({ pollError: new Error("chain RPC unavailable") }); + + await vi.advanceTimersByTimeAsync(0); + expect(poller.poll).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS); + expect(poller.poll).toHaveBeenCalledTimes(2); + }); + + it("stops scheduling after stop is called", async () => { + const { scheduler, poller, config } = setup(); + + await vi.advanceTimersByTimeAsync(0); + expect(poller.poll).toHaveBeenCalledTimes(1); + + scheduler.stop(); + + await vi.advanceTimersByTimeAsync(config.DISCOVERY_INTERVAL_MS * 10); + expect(poller.poll).toHaveBeenCalledTimes(1); + }); + + it("is idempotent on multiple start calls", async () => { + const { scheduler, poller } = setup(); + + scheduler.start(); + scheduler.start(); + await vi.advanceTimersByTimeAsync(0); + + expect(poller.poll).toHaveBeenCalledTimes(1); + }); + + function setup(input?: { + providers?: Array<{ owner: string; hostUri: string; createdHeight: bigint; selfAttributes: never[]; signedAttributes: never[] }>; + pollError?: Error; + pollDelay?: (config: EnvConfig) => number; + }) { + const poller = mock(); + const writer = mock(); + const lifecycle = mock(); + const loggerFactory: LoggerFactory = () => mock>(); + const config: EnvConfig = { + PROVIDER_INVENTORY_POSTGRES_URL: "postgres://localhost/test", + DRIZZLE_MIGRATIONS_FOLDER: "./drizzle", + LOG_LEVEL: "info", + STD_OUT_LOG_FORMAT: "json", + NODE_ENV: "test", + PORT: 3092, + DISCOVERY_INTERVAL_MS: 600_000 + }; + + if (input?.pollError) { + poller.poll.mockRejectedValue(input.pollError); + } else if (input?.pollDelay) { + const delay = input.pollDelay(config); + poller.poll.mockImplementation( + () => + new Promise(resolve => { + setTimeout(() => resolve(input?.providers ?? []), delay); + }) + ); + } else { + poller.poll.mockResolvedValue(input?.providers ?? []); + } + + const scheduler = new DiscoverySchedulerService(poller, writer, lifecycle, config, loggerFactory); + scheduler.start(); + + return { scheduler, poller, writer, lifecycle, config }; + } +}); diff --git a/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts new file mode 100644 index 0000000000..e15d0f0c56 --- /dev/null +++ b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts @@ -0,0 +1,71 @@ +import type { LoggerService } from "@akashnetwork/logging"; +import { inject, singleton } from "tsyringe"; + +import type { EnvConfig } from "@src/providers/app-config.provider"; +import { APP_CONFIG } from "@src/providers/app-config.provider"; +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; +import { ChainProviderPollerService } from "@src/services/chain-provider-poller/chain-provider-poller.service"; +import { ProviderInventoryWriterService } from "@src/services/provider-inventory-writer/provider-inventory-writer.service"; +import { StreamLifecycleManagerService } from "@src/services/stream-lifecycle-manager/stream-lifecycle-manager.service"; + +@singleton() +export class DiscoverySchedulerService { + readonly #logger: LoggerService; + readonly #poller: ChainProviderPollerService; + readonly #writer: ProviderInventoryWriterService; + readonly #lifecycle: StreamLifecycleManagerService; + readonly #config: EnvConfig; + #timer: ReturnType | null = null; + #running = false; + + constructor( + @inject(ChainProviderPollerService) poller: ChainProviderPollerService, + @inject(ProviderInventoryWriterService) writer: ProviderInventoryWriterService, + @inject(StreamLifecycleManagerService) lifecycle: StreamLifecycleManagerService, + @inject(APP_CONFIG) config: EnvConfig, + @inject(LOGGER_FACTORY) loggerFactory: LoggerFactory + ) { + this.#poller = poller; + this.#writer = writer; + this.#lifecycle = lifecycle; + this.#config = config; + this.#logger = loggerFactory({ context: "DiscoveryScheduler" }); + } + + start(): void { + if (this.#running) return; + this.#running = true; + void this.#tick(); + } + + stop(): void { + this.#running = false; + if (this.#timer) { + clearTimeout(this.#timer); + this.#timer = null; + } + } + + dispose(): void { + this.stop(); + } + + async #tick(): Promise { + if (!this.#running) return; + + try { + this.#logger.info({ event: "DISCOVERY_TICK_START" }); + const providers = await this.#poller.poll(); + await Promise.all(providers.map(p => this.#writer.upsertAttributes(p))); + this.#lifecycle.reconcile(providers); + this.#logger.info({ event: "DISCOVERY_TICK_COMPLETE", providerCount: providers.length }); + } catch (error) { + this.#logger.error({ event: "DISCOVERY_TICK_ERROR", error }); + } + + if (this.#running) { + this.#timer = setTimeout(() => void this.#tick(), this.#config.DISCOVERY_INTERVAL_MS); + } + } +} diff --git a/apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts b/apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts new file mode 100644 index 0000000000..e9ff6d323a --- /dev/null +++ b/apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts @@ -0,0 +1,75 @@ +import type { LoggerService } from "@akashnetwork/logging"; +import { sql as rawSql } from "drizzle-orm"; +import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; +import { inject, singleton } from "tsyringe"; + +import { providerInventory } from "@src/model-schemas/provider-inventory/provider-inventory.schema"; +import { DRIZZLE_DB } from "@src/providers/drizzle.provider"; +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; +import type { ChainProvider } from "@src/types/chain-provider"; +import type { ProjectedRow } from "@src/types/inventory"; + +@singleton() +export class ProviderInventoryWriterService { + readonly #logger: LoggerService; + readonly #db: PostgresJsDatabase; + + constructor(@inject(DRIZZLE_DB) db: PostgresJsDatabase, @inject(LOGGER_FACTORY) loggerFactory: LoggerFactory) { + this.#db = db; + this.#logger = loggerFactory({ context: "ProviderInventoryWriter" }); + } + + async resetOnlineSince(): Promise { + await this.#db.update(providerInventory).set({ isOnlineSince: null }); + this.#logger.info({ event: "ONLINE_SINCE_RESET" }); + } + + async upsertInventory(provider: ChainProvider, row: ProjectedRow): Promise { + const set = { + hostUri: provider.hostUri, + createdHeight: provider.createdHeight, + inventory: row.inventory, + totalAvailableCpu: row.totalAvailableCpu, + totalAvailableMemory: row.totalAvailableMemory, + totalAvailableGpu: row.totalAvailableGpu, + totalAvailableEph: row.totalAvailableEph, + totalAvailablePersistent: row.totalAvailablePersistent, + maxNodeFreeCpu: row.maxNodeFreeCpu, + maxNodeFreeMemory: row.maxNodeFreeMemory, + maxNodeFreeGpu: row.maxNodeFreeGpu, + gpuModels: row.gpuModels, + storageClasses: row.storageClasses, + isOnline: true as const, + isOnlineSince: rawSql`coalesce(${providerInventory.isOnlineSince}, now())`, + updatedAt: rawSql`now()` + }; + + await this.#db + .insert(providerInventory) + .values({ owner: provider.owner, ...set }) + .onConflictDoUpdate({ target: providerInventory.owner, set }); + + this.#logger.debug({ event: "PROVIDER_INVENTORY_UPSERTED", owner: provider.owner }); + } + + async upsertAttributes(provider: ChainProvider): Promise { + const auditedBy = [...new Set(provider.signedAttributes.map(a => a.auditor))].sort(); + + const set = { + hostUri: provider.hostUri, + createdHeight: provider.createdHeight, + selfAttributes: provider.selfAttributes, + signedAttributes: provider.signedAttributes, + auditedBy, + updatedAt: rawSql`now()` + }; + + await this.#db + .insert(providerInventory) + .values({ owner: provider.owner, ...set }) + .onConflictDoUpdate({ target: providerInventory.owner, set }); + + this.#logger.debug({ event: "PROVIDER_ATTRIBUTES_UPSERTED", owner: provider.owner }); + } +} diff --git a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts new file mode 100644 index 0000000000..99a6e357b3 --- /dev/null +++ b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts @@ -0,0 +1,260 @@ +import { describe, expect, it } from "vitest"; +import { mock } from "vitest-mock-extended"; + +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import type { ProviderStreamFactory } from "@src/providers/provider-stream.provider"; +import type { ProviderInventoryWriterService } from "@src/services/provider-inventory-writer/provider-inventory-writer.service"; +import type { ChainProvider } from "@src/types/chain-provider"; +import type { StreamStatusMessage } from "@src/types/stream-status"; +import { StreamLifecycleManagerService } from "./stream-lifecycle-manager.service"; + +describe(StreamLifecycleManagerService.name, () => { + describe("reconcile", () => { + it("opens a stream for a new provider", async () => { + const message = createMessage(); + const { streamFactory } = setup({ + streams: { "https://p1:8443": [message] } + }); + + expect(streamFactory.openStatusStream).toHaveBeenCalledWith("https://p1:8443", expect.any(AbortSignal)); + }); + + it("calls upsertInventory for each stream message", async () => { + const messages = [createMessage({ cpu: 1000 }), createMessage({ cpu: 2000 })]; + const { writer, providers } = setup({ + streams: { "https://p1:8443": messages } + }); + + await flush(); + + expect(writer.upsertInventory).toHaveBeenCalledTimes(2); + expect(writer.upsertInventory).toHaveBeenCalledWith(providers[0], expect.objectContaining({ totalAvailableCpu: 1000n })); + expect(writer.upsertInventory).toHaveBeenCalledWith(providers[0], expect.objectContaining({ totalAvailableCpu: 2000n })); + }); + + it("skips providers that already have an active stream", async () => { + const { manager, streamFactory, providers } = setup({ + streams: { "https://p1:8443": [createMessage()] } + }); + + manager.reconcile(providers); + + expect(streamFactory.openStatusStream).toHaveBeenCalledTimes(1); + }); + + it("stops streams for providers absent from the new list", async () => { + const signals: AbortSignal[] = []; + const { manager, streamFactory } = setup({ + streams: { "https://p1:8443": "hang" } + }); + signals.push(captureSignal(streamFactory)); + + manager.reconcile([]); + + expect(signals[0].aborted).toBe(true); + }); + + it("opens streams for new providers while stopping removed ones", async () => { + const signals: AbortSignal[] = []; + const providerA = createProvider({ owner: "a", hostUri: "https://a:8443" }); + const providerB = createProvider({ owner: "b", hostUri: "https://b:8443" }); + const { manager, streamFactory } = setup({ + providers: [providerA], + streams: { "https://a:8443": "hang", "https://b:8443": [createMessage()] } + }); + signals.push(captureSignal(streamFactory)); + + manager.reconcile([providerB]); + + expect(signals[0].aborted).toBe(true); + expect(streamFactory.openStatusStream).toHaveBeenCalledWith("https://b:8443", expect.any(AbortSignal)); + }); + }); + + describe("error handling", () => { + it("logs and continues when upsertInventory throws", async () => { + const messages = [createMessage({ cpu: 1000 }), createMessage({ cpu: 2000 })]; + const { writer, logger } = setup({ + streams: { "https://p1:8443": messages } + }); + writer.upsertInventory.mockRejectedValueOnce(new Error("DB down")); + + await flush(); + + expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ event: "STREAM_PROVIDER_WRITE_ERROR", owner: "akash1owner" })); + expect(writer.upsertInventory).toHaveBeenCalledTimes(2); + }); + + it("logs stream-level errors", async () => { + const { logger } = setup({ + streams: { "https://p1:8443": new Error("connection lost") } + }); + + await flush(); + + expect(logger.error).toHaveBeenCalledWith(expect.objectContaining({ event: "STREAM_ERROR", owner: "akash1owner" })); + }); + + it("does not log AbortError", async () => { + const abortError = new Error("aborted"); + abortError.name = "AbortError"; + const { logger } = setup({ + streams: { "https://p1:8443": abortError } + }); + + await flush(); + + expect(logger.error).not.toHaveBeenCalled(); + }); + }); + + describe("stale finalizer", () => { + it("does not remove a replacement stream when the old one finishes", async () => { + let resolveOldStream!: () => void; + const oldStreamPromise = new Promise>(r => { + resolveOldStream = () => r({ done: true, value: undefined }); + }); + const provider = createProvider(); + const streamFactory = mock(); + const writer = mock(); + const logger = mock>(); + const loggerFactory: LoggerFactory = () => logger; + + let callCount = 0; + streamFactory.openStatusStream.mockImplementation(() => { + callCount++; + if (callCount === 1) { + return { + [Symbol.asyncIterator]() { + return { next: () => oldStreamPromise }; + } + }; + } + return hangingStream(); + }); + + const manager = new StreamLifecycleManagerService(streamFactory, writer, loggerFactory); + manager.reconcile([provider]); + + manager.reconcile([]); + manager.reconcile([provider]); + + expect(streamFactory.openStatusStream).toHaveBeenCalledTimes(2); + + resolveOldStream(); + await flush(); + + manager.reconcile([provider]); + expect(streamFactory.openStatusStream).toHaveBeenCalledTimes(2); + }); + }); + + describe("shutdown", () => { + it("aborts all active streams", () => { + const signals: AbortSignal[] = []; + const providerA = createProvider({ owner: "a", hostUri: "https://a:8443" }); + const providerB = createProvider({ owner: "b", hostUri: "https://b:8443" }); + const { manager, streamFactory } = setup({ + providers: [providerA, providerB], + streams: { "https://a:8443": "hang", "https://b:8443": "hang" } + }); + signals.push(captureSignal(streamFactory, 0), captureSignal(streamFactory, 1)); + + manager.shutdown(); + + expect(signals[0].aborted).toBe(true); + expect(signals[1].aborted).toBe(true); + }); + + it("allows new streams after shutdown and re-reconcile", async () => { + const { manager, streamFactory } = setup({ + streams: { "https://p1:8443": "hang" } + }); + + manager.shutdown(); + streamFactory.openStatusStream.mockReturnValue(hangingStream()); + + manager.reconcile([createProvider()]); + + expect(streamFactory.openStatusStream).toHaveBeenCalledTimes(2); + }); + }); + + function setup(input?: { providers?: ChainProvider[]; streams?: Record }) { + const streamFactory = mock(); + const writer = mock(); + const logger = mock>(); + const loggerFactory: LoggerFactory = () => logger; + const providers = input?.providers ?? [createProvider()]; + + const streams = input?.streams ?? {}; + streamFactory.openStatusStream.mockImplementation((hostUri: string) => { + const value = streams[hostUri]; + if (value === "hang") return hangingStream(); + if (value instanceof Error) return throwingStream(value); + if (Array.isArray(value)) return arrayStream(value); + return arrayStream([]); + }); + + const manager = new StreamLifecycleManagerService(streamFactory, writer, loggerFactory); + manager.reconcile(providers); + + return { manager, streamFactory, writer, logger, providers }; + } +}); + +function createProvider(overrides?: Partial): ChainProvider { + return { + owner: "akash1owner", + hostUri: "https://p1:8443", + createdHeight: 100n, + selfAttributes: [], + signedAttributes: [], + ...overrides + }; +} + +function createMessage(overrides?: { cpu?: number }): StreamStatusMessage { + return { + nodes: [ + { + name: "node-1", + cpuAvailable: overrides?.cpu ?? 4000, + memoryAvailable: 8_000_000_000, + gpus: [], + ephStorageAvailable: 100_000_000_000, + persistentStorage: [] + } + ], + storage: [] + }; +} + +async function* arrayStream(messages: StreamStatusMessage[]) { + for (const msg of messages) { + yield msg; + } +} + +async function* throwingStream(error: Error): AsyncGenerator { + yield* []; // satisfy require-yield + throw error; +} + +function hangingStream(): AsyncIterable { + return { + [Symbol.asyncIterator]() { + return { next: () => new Promise>(() => {}) }; + } + }; +} + +function captureSignal(streamFactory: { openStatusStream: { mock: { calls: Array<[string, AbortSignal]> } } }, callIndex?: number): AbortSignal { + const idx = callIndex ?? streamFactory.openStatusStream.mock.calls.length - 1; + return streamFactory.openStatusStream.mock.calls[idx][1]; +} + +async function flush() { + // setTimeout(0) fires after all pending microtasks (Promise callbacks, async generator yields) have drained + await new Promise(resolve => setTimeout(resolve, 0)); +} diff --git a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts new file mode 100644 index 0000000000..ad63ed034e --- /dev/null +++ b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts @@ -0,0 +1,87 @@ +import type { LoggerService } from "@akashnetwork/logging"; +import { inject, singleton } from "tsyringe"; + +import { projectRow } from "@src/lib/project-row/project-row"; +import type { LoggerFactory } from "@src/providers/logger-factory.provider"; +import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; +import type { ProviderStreamFactory } from "@src/providers/provider-stream.provider"; +import { PROVIDER_STREAM_FACTORY } from "@src/providers/provider-stream.provider"; +import { ProviderInventoryWriterService } from "@src/services/provider-inventory-writer/provider-inventory-writer.service"; +import type { ChainProvider } from "@src/types/chain-provider"; + +@singleton() +export class StreamLifecycleManagerService { + readonly #logger: LoggerService; + readonly #streamFactory: ProviderStreamFactory; + readonly #writer: ProviderInventoryWriterService; + readonly #activeStreams = new Map(); + + constructor( + @inject(PROVIDER_STREAM_FACTORY) streamFactory: ProviderStreamFactory, + @inject(ProviderInventoryWriterService) writer: ProviderInventoryWriterService, + @inject(LOGGER_FACTORY) loggerFactory: LoggerFactory + ) { + this.#streamFactory = streamFactory; + this.#writer = writer; + this.#logger = loggerFactory({ context: "StreamLifecycleManager" }); + } + + reconcile(providers: ChainProvider[]): void { + const currentOwners = new Set(providers.map(p => p.owner)); + + for (const [owner, controller] of this.#activeStreams) { + if (!currentOwners.has(owner)) { + controller.abort(); + this.#activeStreams.delete(owner); + this.#logger.info({ event: "STREAM_STOPPED_PROVIDER_GONE", owner }); + } + } + + for (const provider of providers) { + if (this.#activeStreams.has(provider.owner)) continue; + this.#startStream(provider); + } + } + + #startStream(provider: ChainProvider): void { + const controller = new AbortController(); + this.#activeStreams.set(provider.owner, controller); + void this.#runStream(provider, controller.signal); + } + + async #runStream(provider: ChainProvider, signal: AbortSignal): Promise { + try { + const stream = this.#streamFactory.openStatusStream(provider.hostUri, signal); + + for await (const message of stream) { + if (signal.aborted) break; + const row = projectRow(message); + try { + await this.#writer.upsertInventory(provider, row); + } catch (error) { + this.#logger.error({ event: "STREAM_PROVIDER_WRITE_ERROR", owner: provider.owner, error }); + } + } + } catch (error) { + if (error instanceof Error && error.name !== "AbortError") { + this.#logger.error({ event: "STREAM_ERROR", owner: provider.owner, error }); + } + } finally { + if (this.#activeStreams.get(provider.owner)?.signal === signal) { + this.#activeStreams.delete(provider.owner); + } + } + } + + shutdown(): void { + for (const [owner, controller] of this.#activeStreams) { + controller.abort(); + this.#logger.info({ event: "STREAM_CLOSED", owner }); + } + this.#activeStreams.clear(); + } + + dispose(): void { + this.shutdown(); + } +} diff --git a/apps/provider-inventory/src/types/chain-provider.ts b/apps/provider-inventory/src/types/chain-provider.ts new file mode 100644 index 0000000000..c7c9b3cb33 --- /dev/null +++ b/apps/provider-inventory/src/types/chain-provider.ts @@ -0,0 +1,18 @@ +export interface SelfAttribute { + key: string; + value: string; +} + +export interface SignedAttribute { + key: string; + value: string; + auditor: string; +} + +export interface ChainProvider { + owner: string; + hostUri: string; + createdHeight: bigint; + selfAttributes: SelfAttribute[]; + signedAttributes: SignedAttribute[]; +} diff --git a/apps/provider-inventory/src/types/inventory.ts b/apps/provider-inventory/src/types/inventory.ts new file mode 100644 index 0000000000..0e7198d611 --- /dev/null +++ b/apps/provider-inventory/src/types/inventory.ts @@ -0,0 +1,46 @@ +export interface InventoryNodeGpu { + vendor: string; + model: string; + available: number; +} + +export interface InventoryNodeStorage { + class: string; + available: number; +} + +export interface InventoryNode { + name: string; + cpu: { available: number }; + memory: { available: number }; + gpu: InventoryNodeGpu[]; + ephStorage: { available: number }; + persistentStorage: InventoryNodeStorage[]; +} + +export interface InventoryClusterStorage { + class: string; + available: number; +} + +export interface Inventory { + nodes: InventoryNode[]; + storage: InventoryClusterStorage[]; +} + +export interface InventoryRollups { + totalAvailableCpu: bigint; + totalAvailableMemory: bigint; + totalAvailableGpu: bigint; + totalAvailableEph: bigint; + totalAvailablePersistent: bigint; + maxNodeFreeCpu: bigint; + maxNodeFreeMemory: bigint; + maxNodeFreeGpu: bigint; + gpuModels: string[]; + storageClasses: string[]; +} + +export interface ProjectedRow extends InventoryRollups { + inventory: Inventory; +} diff --git a/apps/provider-inventory/src/types/stream-status.ts b/apps/provider-inventory/src/types/stream-status.ts new file mode 100644 index 0000000000..c8f0c96ff0 --- /dev/null +++ b/apps/provider-inventory/src/types/stream-status.ts @@ -0,0 +1,29 @@ +export interface StreamStatusNodeGpu { + vendor: string; + model: string; + available: number; +} + +export interface StreamStatusNodeStorage { + class: string; + available: number; +} + +export interface StreamStatusNode { + name: string; + cpuAvailable: number; + memoryAvailable: number; + gpus: StreamStatusNodeGpu[]; + ephStorageAvailable: number; + persistentStorage: StreamStatusNodeStorage[]; +} + +export interface StreamStatusClusterStorage { + class: string; + available: number; +} + +export interface StreamStatusMessage { + nodes: StreamStatusNode[]; + storage: StreamStatusClusterStorage[]; +}