diff --git a/apps/provider-inventory/.eslintrc.js b/apps/provider-inventory/.eslintrc.js index 45bafd708..72640a937 100644 --- a/apps/provider-inventory/.eslintrc.js +++ b/apps/provider-inventory/.eslintrc.js @@ -1 +1,12 @@ -module.exports = require("@akashnetwork/dev-config/.eslintrc.ts"); +module.exports = { + extends: [require.resolve("@akashnetwork/dev-config/.eslintrc.ts")], + overrides: [ + { + files: ["*.ts", "*.tsx"], + parserOptions: { + emitDecoratorMetadata: true, + experimentalDecorators: true + } + } + ] +}; diff --git a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts deleted file mode 100644 index 7b5409c33..000000000 --- a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts +++ /dev/null @@ -1,268 +0,0 @@ -import { describe, expect, it } from "vitest"; - -import { computeRollups } from "./compute-rollups"; - -describe(computeRollups.name, () => { - it("returns all zeros for an empty cluster", () => { - const result = computeRollups({ nodes: [], storage: [] }); - - expect(result).toEqual({ - totalAvailableCpu: 0n, - totalAvailableMemory: 0n, - totalAvailableGpu: 0n, - totalAvailableEph: 0n, - totalAvailablePersistent: 0n, - maxNodeFreeCpu: 0n, - maxNodeFreeMemory: 0n, - maxNodeFreeGpu: 0n, - gpuModels: [], - storageClasses: [] - }); - }); - - it("computes rollups for a single node", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 4000 }, - memory: { available: 8_000_000_000 }, - gpu: [{ vendor: "nvidia", model: "a100", available: 2 }], - ephStorage: { available: 100_000_000_000 }, - persistentStorage: [{ class: "beta2", available: 500_000_000_000 }] - } - ], - storage: [] - }); - - expect(result.totalAvailableCpu).toBe(4000n); - expect(result.totalAvailableMemory).toBe(8_000_000_000n); - expect(result.totalAvailableGpu).toBe(2n); - expect(result.totalAvailableEph).toBe(100_000_000_000n); - expect(result.totalAvailablePersistent).toBe(500_000_000_000n); - expect(result.maxNodeFreeCpu).toBe(4000n); - expect(result.maxNodeFreeMemory).toBe(8_000_000_000n); - expect(result.maxNodeFreeGpu).toBe(2n); - expect(result.gpuModels).toEqual(["nvidia/a100"]); - expect(result.storageClasses).toEqual(["beta2"]); - }); - - it("sums totals across multiple nodes and tracks max-per-node", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 2000 }, - memory: { available: 4_000_000_000 }, - gpu: [{ vendor: "nvidia", model: "a100", available: 1 }], - ephStorage: { available: 50_000_000_000 }, - persistentStorage: [] - }, - { - name: "node-2", - cpu: { available: 8000 }, - memory: { available: 16_000_000_000 }, - gpu: [{ vendor: "nvidia", model: "a100", available: 4 }], - ephStorage: { available: 200_000_000_000 }, - persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] - } - ], - storage: [] - }); - - expect(result.totalAvailableCpu).toBe(10_000n); - expect(result.totalAvailableMemory).toBe(20_000_000_000n); - expect(result.totalAvailableGpu).toBe(5n); - expect(result.totalAvailableEph).toBe(250_000_000_000n); - expect(result.totalAvailablePersistent).toBe(1_000_000_000_000n); - expect(result.maxNodeFreeCpu).toBe(8000n); - expect(result.maxNodeFreeMemory).toBe(16_000_000_000n); - expect(result.maxNodeFreeGpu).toBe(4n); - }); - - it("deduplicates GPU models across nodes", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 1000 }, - memory: { available: 1000 }, - gpu: [ - { vendor: "nvidia", model: "a100", available: 1 }, - { vendor: "amd", model: "mi300x", available: 1 } - ], - ephStorage: { available: 0 }, - persistentStorage: [] - }, - { - name: "node-2", - cpu: { available: 1000 }, - memory: { available: 1000 }, - gpu: [{ vendor: "nvidia", model: "a100", available: 2 }], - ephStorage: { available: 0 }, - persistentStorage: [] - } - ], - storage: [] - }); - - expect(result.gpuModels).toEqual(["amd/mi300x", "nvidia/a100"]); - }); - - it("handles ephemeral-only storage", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 1000 }, - memory: { available: 1000 }, - gpu: [], - ephStorage: { available: 500_000_000_000 }, - persistentStorage: [] - } - ], - storage: [] - }); - - expect(result.totalAvailableEph).toBe(500_000_000_000n); - expect(result.totalAvailablePersistent).toBe(0n); - expect(result.storageClasses).toEqual([]); - }); - - it("handles persistent-only storage with multiple classes", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 1000 }, - memory: { available: 1000 }, - gpu: [], - ephStorage: { available: 0 }, - persistentStorage: [ - { class: "beta2", available: 100_000_000_000 }, - { class: "beta3", available: 200_000_000_000 } - ] - } - ], - storage: [] - }); - - expect(result.totalAvailablePersistent).toBe(300_000_000_000n); - expect(result.storageClasses).toEqual(["beta2", "beta3"]); - }); - - it("collects storage classes from both nodes and cluster-level storage", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [], - ephStorage: { available: 0 }, - persistentStorage: [{ class: "beta2", available: 100 }] - } - ], - storage: [{ class: "beta3", available: 500 }] - }); - - expect(result.storageClasses).toEqual(["beta2", "beta3"]); - }); - - it("handles nodes with no GPUs", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 4000 }, - memory: { available: 8_000_000_000 }, - gpu: [], - ephStorage: { available: 100_000_000_000 }, - persistentStorage: [] - } - ], - storage: [] - }); - - expect(result.totalAvailableGpu).toBe(0n); - expect(result.maxNodeFreeGpu).toBe(0n); - expect(result.gpuModels).toEqual([]); - }); - - it("clamps negative values to zero (overcommit)", () => { - const result = computeRollups({ - nodes: [ - { - name: "overcommitted", - cpu: { available: -500 }, - memory: { available: -1_000_000 }, - gpu: [{ vendor: "nvidia", model: "a100", available: -1 }], - ephStorage: { available: -100 }, - persistentStorage: [{ class: "beta2", available: -200 }] - } - ], - storage: [] - }); - - expect(result.totalAvailableCpu).toBe(0n); - expect(result.totalAvailableMemory).toBe(0n); - expect(result.totalAvailableGpu).toBe(0n); - expect(result.totalAvailableEph).toBe(0n); - expect(result.totalAvailablePersistent).toBe(0n); - expect(result.maxNodeFreeCpu).toBe(0n); - expect(result.maxNodeFreeMemory).toBe(0n); - expect(result.maxNodeFreeGpu).toBe(0n); - }); - - it("handles all-zero capacity", () => { - const result = computeRollups({ - nodes: [ - { - name: "idle", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [], - ephStorage: { available: 0 }, - persistentStorage: [] - } - ], - storage: [] - }); - - expect(result.totalAvailableCpu).toBe(0n); - expect(result.totalAvailableMemory).toBe(0n); - expect(result.maxNodeFreeCpu).toBe(0n); - expect(result.maxNodeFreeMemory).toBe(0n); - }); - - it("sums GPU count per node for max-node-free-gpu", () => { - const result = computeRollups({ - nodes: [ - { - name: "node-1", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [ - { vendor: "nvidia", model: "a100", available: 2 }, - { vendor: "nvidia", model: "h100", available: 3 } - ], - ephStorage: { available: 0 }, - persistentStorage: [] - }, - { - name: "node-2", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [{ vendor: "nvidia", model: "a100", available: 4 }], - ephStorage: { available: 0 }, - persistentStorage: [] - } - ], - storage: [] - }); - - expect(result.maxNodeFreeGpu).toBe(5n); - expect(result.totalAvailableGpu).toBe(9n); - expect(result.gpuModels).toEqual(["nvidia/a100", "nvidia/h100"]); - }); -}); diff --git a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts b/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts deleted file mode 100644 index d619b7fab..000000000 --- a/apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts +++ /dev/null @@ -1,65 +0,0 @@ -import type { Inventory, InventoryRollups } from "@src/types/inventory"; - -export function computeRollups(inventory: Inventory): InventoryRollups { - let totalAvailableCpu = 0n; - let totalAvailableMemory = 0n; - let totalAvailableGpu = 0n; - let totalAvailableEph = 0n; - let totalAvailablePersistent = 0n; - let maxNodeFreeCpu = 0n; - let maxNodeFreeMemory = 0n; - let maxNodeFreeGpu = 0n; - const gpuModelSet = new Set(); - const storageClassSet = new Set(); - - for (const node of inventory.nodes) { - const nodeCpu = clamp(node.cpu.available); - const nodeMemory = clamp(node.memory.available); - const nodeEph = clamp(node.ephStorage.available); - - totalAvailableCpu += nodeCpu; - totalAvailableMemory += nodeMemory; - totalAvailableEph += nodeEph; - - if (nodeCpu > maxNodeFreeCpu) maxNodeFreeCpu = nodeCpu; - if (nodeMemory > maxNodeFreeMemory) maxNodeFreeMemory = nodeMemory; - - let nodeGpuTotal = 0n; - for (const gpu of node.gpu) { - const gpuCount = clamp(gpu.available); - nodeGpuTotal += gpuCount; - totalAvailableGpu += gpuCount; - if (gpu.vendor && gpu.model) { - gpuModelSet.add(`${gpu.vendor}/${gpu.model}`); - } - } - if (nodeGpuTotal > maxNodeFreeGpu) maxNodeFreeGpu = nodeGpuTotal; - - for (const ps of node.persistentStorage) { - totalAvailablePersistent += clamp(ps.available); - if (ps.class) storageClassSet.add(ps.class); - } - } - - for (const s of inventory.storage) { - if (s.class) storageClassSet.add(s.class); - } - - return { - totalAvailableCpu, - totalAvailableMemory, - totalAvailableGpu, - totalAvailableEph, - totalAvailablePersistent, - maxNodeFreeCpu, - maxNodeFreeMemory, - maxNodeFreeGpu, - gpuModels: [...gpuModelSet].sort(), - storageClasses: [...storageClassSet].sort() - }; -} - -function clamp(value: number): bigint { - if (!Number.isFinite(value) || value <= 0) return 0n; - return BigInt(value); -} diff --git a/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.spec.ts b/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.spec.ts deleted file mode 100644 index c6156381c..000000000 --- a/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.spec.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { describe, expect, it } from "vitest"; - -import type { ProviderWithSnapshot } from "../../types/provider"; -import { mapSnapshotToInventory } from "./inventory-mapper"; - -describe(mapSnapshotToInventory.name, () => { - it("converts node CPU resources to bigint ResourcePairState", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].cpu.toState().allocatable).toBe(8000n); - expect(inventory.nodes[0].cpu.toState().allocated).toBe(2000n); - }); - - it("converts node memory resources to bigint ResourcePairState", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].memory.toState().allocatable).toBe(17179869184n); - expect(inventory.nodes[0].memory.toState().allocated).toBe(4294967296n); - }); - - it("converts node ephemeral storage resources to bigint ResourcePairState", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].ephemeralStorage.toState().allocatable).toBe(107374182400n); - expect(inventory.nodes[0].ephemeralStorage.toState().allocated).toBe(0n); - }); - - it("maps GPU info from snapshot node GPUs", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].gpu.info).toEqual([{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }]); - expect(inventory.nodes[0].gpu.quantity.toState().allocatable).toBe(1n); - }); - - it("maps storage class capabilities from node booleans", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].storageClasses).toEqual(["beta2"]); - }); - - it("maps cluster storage pools", () => { - const { inventory } = setup({}); - expect(inventory.storage["beta2"].class).toBe("beta2"); - expect(inventory.storage["beta2"].quantity.toState()).toEqual({ allocatable: 536870912000n, allocated: 0n }); - }); - - it("maps CPU info from snapshot node CPUs", () => { - const { inventory } = setup({}); - expect(inventory.nodes[0].cpus).toEqual([{ vendor: "GenuineIntel", model: "Intel Xeon Platinum 8375C" }]); - }); - - it("handles empty node list", () => { - const { inventory } = setup({ emptyNodes: true }); - expect(inventory.nodes).toEqual([]); - }); - - it("handles nodes with all storage class capabilities", () => { - const { inventory } = setup({ allStorageClasses: true }); - expect(inventory.nodes[0].storageClasses).toEqual(["beta1", "beta2", "beta3"]); - }); - - function setup(input: { emptyNodes?: boolean; allStorageClasses?: boolean }) { - const provider: ProviderWithSnapshot = { - owner: "akash1abc", - hostUri: "https://provider.example.com:8443", - ipRegion: "us-east", - uptime7d: 0.998, - lastSuccessfulSnapshot: { - nodes: input.emptyNodes - ? [] - : [ - { - name: "node1", - cpuAllocatable: 8000, - cpuAllocated: 2000, - memoryAllocatable: 17179869184, - memoryAllocated: 4294967296, - ephemeralStorageAllocatable: 107374182400, - ephemeralStorageAllocated: 0, - gpuAllocatable: 1, - gpuAllocated: 0, - capabilitiesStorageHDD: input.allStorageClasses ?? false, - capabilitiesStorageSSD: true, - capabilitiesStorageNVME: input.allStorageClasses ?? false, - gpus: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }], - cpus: [{ vendor: "GenuineIntel", model: "Intel Xeon Platinum 8375C", vcores: 4 }] - } - ], - storage: [{ class: "beta2", allocatable: 536870912000, allocated: 0 }] - } - }; - - const inventory = mapSnapshotToInventory(provider); - return { inventory, provider }; - } -}); diff --git a/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.ts b/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.ts deleted file mode 100644 index 6d5e13960..000000000 --- a/apps/provider-inventory/src/lib/inventory-mapper/inventory-mapper.ts +++ /dev/null @@ -1,58 +0,0 @@ -import type { ClusterState, CpuInfo, GpuInfo } from "../../types/inventory.types"; -import type { ProviderWithSnapshot } from "../../types/provider"; -import { ResourcePair } from "../resource-pair/resource-pair"; - -const STORAGE_CLASS_MAP: Record = { - capabilitiesStorageHDD: "beta1", - capabilitiesStorageSSD: "beta2", - capabilitiesStorageNVME: "beta3" -}; - -export function mapSnapshotToInventory(provider: ProviderWithSnapshot): ClusterState { - const snapshot = provider.lastSuccessfulSnapshot; - - const nodes = snapshot.nodes.map(node => { - const storageClasses: string[] = []; - if (node.capabilitiesStorageHDD) storageClasses.push(STORAGE_CLASS_MAP.capabilitiesStorageHDD); - if (node.capabilitiesStorageSSD) storageClasses.push(STORAGE_CLASS_MAP.capabilitiesStorageSSD); - if (node.capabilitiesStorageNVME) storageClasses.push(STORAGE_CLASS_MAP.capabilitiesStorageNVME); - - const gpuInfo: GpuInfo[] = (node.gpus || []) - .map(gpu => ({ - vendor: gpu.vendor, - name: gpu.name, - modelId: gpu.modelId, - interface: gpu.interface, - memorySize: gpu.memorySize - })) - .sort((a, b) => a.vendor.localeCompare(b.vendor) || a.name.localeCompare(b.name) || a.memorySize.localeCompare(b.memorySize)); - - const cpuInfo: CpuInfo[] = (node.cpus || []).map(cpu => ({ - vendor: cpu.vendor, - model: cpu.model - })); - - return { - name: node.name, - cpu: new ResourcePair(BigInt(node.cpuAllocatable), BigInt(node.cpuAllocated)), - memory: new ResourcePair(BigInt(node.memoryAllocatable), BigInt(node.memoryAllocated)), - ephemeralStorage: new ResourcePair(BigInt(node.ephemeralStorageAllocatable), BigInt(node.ephemeralStorageAllocated)), - gpu: { - quantity: new ResourcePair(BigInt(node.gpuAllocatable), BigInt(node.gpuAllocated)), - info: gpuInfo - }, - storageClasses, - cpus: cpuInfo - }; - }); - - const storage = (snapshot.storage || []).reduce>((acc, pool) => { - acc[pool.class] = { - class: pool.class, - quantity: new ResourcePair(BigInt(pool.allocatable), BigInt(pool.allocated)) - }; - return acc; - }, Object.create(null)); - - return { nodes, storage }; -} diff --git a/apps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.column.ts b/apps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.column.ts new file mode 100644 index 000000000..17049c8bf --- /dev/null +++ b/apps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.column.ts @@ -0,0 +1,15 @@ +import { customType } from "drizzle-orm/pg-core"; + +export const jsonbBigint = customType<{ data: unknown; driverData: unknown }>({ + dataType() { + return "jsonb"; + }, + toDriver(value) { + // pass value as is because then it's handled by postgres.js jsonb serializer + return value; + }, + fromDriver(value) { + // postgres.js returns parsed JSON + return value; + } +}); diff --git a/apps/provider-inventory/src/lib/project-row/project-row.spec.ts b/apps/provider-inventory/src/lib/project-row/project-row.spec.ts index c438d907f..61efb80af 100644 --- a/apps/provider-inventory/src/lib/project-row/project-row.spec.ts +++ b/apps/provider-inventory/src/lib/project-row/project-row.spec.ts @@ -1,80 +1,203 @@ import { describe, expect, it } from "vitest"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { ClusterState, GpuInfo, NodeState } from "@src/types/inventory.types"; import { projectRow } from "./project-row"; describe(projectRow.name, () => { it("projects an empty cluster", () => { - const result = projectRow({ nodes: [], storage: [] }); + const result = projectRow(buildCluster()); - expect(result.inventory).toEqual({ nodes: [], storage: [] }); - expect(result.totalAvailableCpu).toBe(0n); - expect(result.totalAvailableMemory).toBe(0n); - expect(result.totalAvailableGpu).toBe(0n); - expect(result.gpuModels).toEqual([]); - expect(result.storageClasses).toEqual([]); + expect(result).toEqual({ + cluster: { nodes: [], storage: {} }, + totalAvailableCpu: 0n, + totalAvailableMemory: 0n, + totalAvailableGpu: 0n, + totalAvailableEph: 0n, + totalAvailablePersistent: 0n, + maxNodeFreeCpu: 0n, + maxNodeFreeMemory: 0n, + maxNodeFreeGpu: 0n, + gpuModels: [], + storageClasses: [] + }); }); - it("normalizes a single node into inventory JSONB and computes rollups", () => { - const result = projectRow({ - nodes: [ - { - name: "node-1", - cpuAvailable: 8000, - memoryAvailable: 32_000_000_000, - gpus: [{ vendor: "nvidia", model: "rtx4090", available: 1 }], - ephStorageAvailable: 500_000_000_000, - persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] - } - ], - storage: [{ class: "beta2", available: 2_000_000_000_000 }] + it("passes ClusterState through unchanged", () => { + const cluster = buildCluster({ + nodes: [buildNode({ cpu: pair(8000n) })], + storage: { beta2: { class: "beta2", quantity: pair(2_000_000_000_000n) } } }); - expect(result.inventory.nodes).toHaveLength(1); - expect(result.inventory.nodes[0]).toEqual({ - name: "node-1", - cpu: { available: 8000 }, - memory: { available: 32_000_000_000 }, - gpu: [{ vendor: "nvidia", model: "rtx4090", available: 1 }], - ephStorage: { available: 500_000_000_000 }, - persistentStorage: [{ class: "beta2", available: 1_000_000_000_000 }] - }); - expect(result.inventory.storage).toEqual([{ class: "beta2", available: 2_000_000_000_000 }]); + const result = projectRow(cluster); + + expect(result.cluster).toBe(cluster); + }); + + it("computes rollups for a single node", () => { + const result = projectRow( + buildCluster({ + nodes: [ + buildNode({ + cpu: pair(4000n), + memory: pair(8_000_000_000n), + ephemeralStorage: pair(100_000_000_000n), + gpu: { quantity: pair(2n), info: [gpu("nvidia", "a100")] }, + storageClasses: ["beta2"] + }) + ], + storage: { beta2: { class: "beta2", quantity: pair(500_000_000_000n) } } + }) + ); - expect(result.totalAvailableCpu).toBe(8000n); - expect(result.totalAvailableGpu).toBe(1n); - expect(result.gpuModels).toEqual(["nvidia/rtx4090"]); + expect(result.totalAvailableCpu).toBe(4000n); + expect(result.totalAvailableMemory).toBe(8_000_000_000n); + expect(result.totalAvailableGpu).toBe(2n); + expect(result.totalAvailableEph).toBe(100_000_000_000n); + expect(result.totalAvailablePersistent).toBe(500_000_000_000n); + expect(result.maxNodeFreeCpu).toBe(4000n); + expect(result.maxNodeFreeMemory).toBe(8_000_000_000n); + expect(result.maxNodeFreeGpu).toBe(2n); + expect(result.gpuModels).toEqual(["nvidia/a100"]); expect(result.storageClasses).toEqual(["beta2"]); }); - it("projects a multi-node cluster with correct max-per-node tracking", () => { - const result = projectRow({ - nodes: [ - { - name: "small", - cpuAvailable: 2000, - memoryAvailable: 4_000_000_000, - gpus: [], - ephStorageAvailable: 50_000_000_000, - persistentStorage: [] - }, - { - name: "large", - cpuAvailable: 16000, - memoryAvailable: 64_000_000_000, - gpus: [{ vendor: "nvidia", model: "a100", available: 8 }], - ephStorageAvailable: 1_000_000_000_000, - persistentStorage: [{ class: "beta3", available: 2_000_000_000_000 }] - } - ], - storage: [] - }); + it("sums totals across multiple nodes and tracks max-per-node", () => { + const result = projectRow( + buildCluster({ + nodes: [ + buildNode({ + cpu: pair(2000n), + memory: pair(4_000_000_000n), + ephemeralStorage: pair(50_000_000_000n), + gpu: { quantity: pair(1n), info: [gpu("nvidia", "a100")] } + }), + buildNode({ + cpu: pair(8000n), + memory: pair(16_000_000_000n), + ephemeralStorage: pair(200_000_000_000n), + gpu: { quantity: pair(4n), info: [gpu("nvidia", "a100")] } + }) + ] + }) + ); + + expect(result.totalAvailableCpu).toBe(10_000n); + expect(result.totalAvailableMemory).toBe(20_000_000_000n); + expect(result.totalAvailableGpu).toBe(5n); + expect(result.totalAvailableEph).toBe(250_000_000_000n); + expect(result.maxNodeFreeCpu).toBe(8000n); + expect(result.maxNodeFreeMemory).toBe(16_000_000_000n); + expect(result.maxNodeFreeGpu).toBe(4n); + }); + + it("deduplicates GPU models across nodes", () => { + const result = projectRow( + buildCluster({ + nodes: [ + buildNode({ gpu: { quantity: pair(2n), info: [gpu("nvidia", "a100"), gpu("amd", "mi300x")] } }), + buildNode({ gpu: { quantity: pair(2n), info: [gpu("nvidia", "a100")] } }) + ] + }) + ); + + expect(result.gpuModels).toEqual(["amd/mi300x", "nvidia/a100"]); + }); + + it("handles ephemeral-only storage", () => { + const result = projectRow(buildCluster({ nodes: [buildNode({ ephemeralStorage: pair(500_000_000_000n) })] })); - expect(result.inventory.nodes).toHaveLength(2); - expect(result.totalAvailableCpu).toBe(18_000n); - expect(result.maxNodeFreeCpu).toBe(16_000n); - expect(result.totalAvailableMemory).toBe(68_000_000_000n); - expect(result.maxNodeFreeMemory).toBe(64_000_000_000n); - expect(result.totalAvailableGpu).toBe(8n); - expect(result.maxNodeFreeGpu).toBe(8n); + expect(result.totalAvailableEph).toBe(500_000_000_000n); + expect(result.totalAvailablePersistent).toBe(0n); + expect(result.storageClasses).toEqual([]); + }); + + it("collects storage classes from both nodes and cluster-level storage", () => { + const result = projectRow( + buildCluster({ + nodes: [buildNode({ storageClasses: ["beta2"] })], + storage: { beta3: { class: "beta3", quantity: pair(500n) } } + }) + ); + + expect(result.storageClasses).toEqual(["beta2", "beta3"]); + }); + + it("handles nodes with no GPUs", () => { + const result = projectRow( + buildCluster({ + nodes: [buildNode({ cpu: pair(4000n), memory: pair(8_000_000_000n), ephemeralStorage: pair(100_000_000_000n) })] + }) + ); + + expect(result.totalAvailableGpu).toBe(0n); + expect(result.maxNodeFreeGpu).toBe(0n); + expect(result.gpuModels).toEqual([]); + }); + + it("treats over-allocated nodes as zero available (no negative leakage)", () => { + const result = projectRow( + buildCluster({ + nodes: [ + buildNode({ + cpu: new ResourcePair(500n, 1000n), + memory: new ResourcePair(1_000_000n, 2_000_000n), + gpu: { quantity: new ResourcePair(0n, 1n), info: [gpu("nvidia", "a100")] }, + ephemeralStorage: new ResourcePair(100n, 200n) + }) + ] + }) + ); + + expect(result.totalAvailableCpu).toBe(0n); + expect(result.totalAvailableMemory).toBe(0n); + expect(result.totalAvailableGpu).toBe(0n); + expect(result.totalAvailableEph).toBe(0n); + expect(result.maxNodeFreeCpu).toBe(0n); + expect(result.maxNodeFreeMemory).toBe(0n); + expect(result.maxNodeFreeGpu).toBe(0n); + }); + + it("sums GPU count per node for max-node-free-gpu", () => { + const result = projectRow( + buildCluster({ + nodes: [ + buildNode({ gpu: { quantity: pair(5n), info: [gpu("nvidia", "a100"), gpu("nvidia", "h100")] } }), + buildNode({ gpu: { quantity: pair(4n), info: [gpu("nvidia", "a100")] } }) + ] + }) + ); + + expect(result.maxNodeFreeGpu).toBe(5n); + expect(result.totalAvailableGpu).toBe(9n); + expect(result.gpuModels).toEqual(["nvidia/a100", "nvidia/h100"]); }); }); + +function pair(allocatable: bigint): ResourcePair { + return new ResourcePair(allocatable, 0n); +} + +function gpu(vendor: string, name: string): GpuInfo { + return { vendor, name, modelId: "", interface: "", memorySize: "" }; +} + +function buildNode(overrides?: Partial): NodeState { + return { + name: "node-1", + cpu: pair(0n), + memory: pair(0n), + ephemeralStorage: pair(0n), + gpu: { quantity: pair(0n), info: [] }, + storageClasses: [], + cpus: [], + ...overrides + }; +} + +function buildCluster(overrides?: Partial): ClusterState { + return { + nodes: overrides?.nodes ?? [], + storage: overrides?.storage ?? Object.create(null) + }; +} diff --git a/apps/provider-inventory/src/lib/project-row/project-row.ts b/apps/provider-inventory/src/lib/project-row/project-row.ts index f53ffd638..882e3f2d4 100644 --- a/apps/provider-inventory/src/lib/project-row/project-row.ts +++ b/apps/provider-inventory/src/lib/project-row/project-row.ts @@ -1,31 +1,60 @@ -import { computeRollups } from "@src/lib/compute-rollups/compute-rollups"; -import type { Inventory, ProjectedRow } from "@src/types/inventory"; -import type { StreamStatusMessage } from "@src/types/stream-status"; - -export function projectRow(message: StreamStatusMessage): ProjectedRow { - const inventory: Inventory = { - nodes: message.nodes.map(node => ({ - name: node.name, - cpu: { available: node.cpuAvailable }, - memory: { available: node.memoryAvailable }, - gpu: node.gpus.map(g => ({ - vendor: g.vendor, - model: g.model, - available: g.available - })), - ephStorage: { available: node.ephStorageAvailable }, - persistentStorage: node.persistentStorage.map(ps => ({ - class: ps.class, - available: ps.available - })) - })), - storage: message.storage.map(s => ({ - class: s.class, - available: s.available - })) - }; +import type { ProjectedRow } from "@src/types/inventory"; +import type { ClusterState } from "@src/types/inventory.types"; + +export function projectRow(cluster: ClusterState): ProjectedRow { + let totalAvailableCpu = 0n; + let totalAvailableMemory = 0n; + let totalAvailableGpu = 0n; + let totalAvailableEph = 0n; + let totalAvailablePersistent = 0n; + let maxNodeFreeCpu = 0n; + let maxNodeFreeMemory = 0n; + let maxNodeFreeGpu = 0n; + const gpuModelSet = new Set(); + const storageClassSet = new Set(); + + for (const node of cluster.nodes) { + const nodeCpu = node.cpu.available(); + const nodeMemory = node.memory.available(); + const nodeEph = node.ephemeralStorage.available(); + const nodeGpu = node.gpu.quantity.available(); + + totalAvailableCpu += nodeCpu; + totalAvailableMemory += nodeMemory; + totalAvailableEph += nodeEph; + totalAvailableGpu += nodeGpu; - const rollups = computeRollups(inventory); + if (nodeCpu > maxNodeFreeCpu) maxNodeFreeCpu = nodeCpu; + if (nodeMemory > maxNodeFreeMemory) maxNodeFreeMemory = nodeMemory; + if (nodeGpu > maxNodeFreeGpu) maxNodeFreeGpu = nodeGpu; - return { inventory, ...rollups }; + for (const gpu of node.gpu.info) { + if (gpu.vendor && gpu.name) { + gpuModelSet.add(`${gpu.vendor}/${gpu.name}`); + } + } + + for (const cls of node.storageClasses) { + if (cls) storageClassSet.add(cls); + } + } + + for (const pool of Object.values(cluster.storage)) { + totalAvailablePersistent += pool.quantity.available(); + if (pool.class) storageClassSet.add(pool.class); + } + + return { + cluster, + totalAvailableCpu, + totalAvailableMemory, + totalAvailableGpu, + totalAvailableEph, + totalAvailablePersistent, + maxNodeFreeCpu, + maxNodeFreeMemory, + maxNodeFreeGpu, + gpuModels: [...gpuModelSet].sort(), + storageClasses: [...storageClassSet].sort() + }; } diff --git a/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.spec.ts b/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.spec.ts index ee28ba2b2..ed30aed5d 100644 --- a/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.spec.ts +++ b/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.spec.ts @@ -1,6 +1,8 @@ import { describe, expect, it } from "vitest"; -import type { Inventory, InventoryClusterStorage, InventoryNode, InventoryRollups, ProjectedRow } from "@src/types/inventory"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { InventoryRollups, ProjectedRow } from "@src/types/inventory"; +import type { ClusterState, GpuInfo, NodeState } from "@src/types/inventory.types"; import { projectedRowsEqual } from "./projected-row-equals"; describe(projectedRowsEqual.name, () => { @@ -29,7 +31,7 @@ describe(projectedRowsEqual.name, () => { gpuModels: ["nvidia/a100"], storageClasses: ["beta2", "beta3"], nodes: [buildNode({ name: "node-1" }), buildNode({ name: "node-2" })], - storage: [{ class: "beta2", available: 100 }] + storage: storageMap([{ class: "beta2", allocatable: 100n }]) }); const b = buildRow({ totalAvailableCpu: 12_000n, @@ -43,7 +45,7 @@ describe(projectedRowsEqual.name, () => { gpuModels: ["nvidia/a100"], storageClasses: ["beta2", "beta3"], nodes: [buildNode({ name: "node-1" }), buildNode({ name: "node-2" })], - storage: [{ class: "beta2", available: 100 }] + storage: storageMap([{ class: "beta2", allocatable: 100n }]) }); expect(projectedRowsEqual(a, b)).toBe(true); @@ -51,7 +53,7 @@ describe(projectedRowsEqual.name, () => { }); describe("single-field differs per rollup column", () => { - it.each([ + it.each<[string, Partial]>([ ["totalAvailableCpu", { totalAvailableCpu: 999n }], ["totalAvailableMemory", { totalAvailableMemory: 999n }], ["totalAvailableGpu", { totalAvailableGpu: 999n }], @@ -62,29 +64,35 @@ describe(projectedRowsEqual.name, () => { ["maxNodeFreeGpu", { maxNodeFreeGpu: 999n }], ["gpuModels", { gpuModels: ["nvidia/h100"] }], ["storageClasses", { storageClasses: ["beta3"] }] - ] as const)("returns false when %s differs", (_field, override) => { + ])("returns false when %s differs", (_field, override) => { const a = buildRow(); const b = buildRow(override); expect(projectedRowsEqual(a, b)).toBe(false); }); }); - describe("JSONB-nested differs", () => { - it("returns false when a node's cpu.available differs", () => { - const a = buildRow({ nodes: [buildNode({ cpu: { available: 1000 } })] }); - const b = buildRow({ nodes: [buildNode({ cpu: { available: 2000 } })] }); + describe("ClusterState-nested differs", () => { + it("returns false when a node's cpu allocatable differs", () => { + const a = buildRow({ nodes: [buildNode({ cpu: new ResourcePair(1000n, 0n) })] }); + const b = buildRow({ nodes: [buildNode({ cpu: new ResourcePair(2000n, 0n) })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's memory.available differs", () => { - const a = buildRow({ nodes: [buildNode({ memory: { available: 1000 } })] }); - const b = buildRow({ nodes: [buildNode({ memory: { available: 2000 } })] }); + it("returns false when a node's cpu allocated differs", () => { + const a = buildRow({ nodes: [buildNode({ cpu: new ResourcePair(1000n, 0n) })] }); + const b = buildRow({ nodes: [buildNode({ cpu: new ResourcePair(1000n, 500n) })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's ephStorage.available differs", () => { - const a = buildRow({ nodes: [buildNode({ ephStorage: { available: 1000 } })] }); - const b = buildRow({ nodes: [buildNode({ ephStorage: { available: 2000 } })] }); + it("returns false when a node's memory differs", () => { + const a = buildRow({ nodes: [buildNode({ memory: new ResourcePair(1000n, 0n) })] }); + const b = buildRow({ nodes: [buildNode({ memory: new ResourcePair(2000n, 0n) })] }); + expect(projectedRowsEqual(a, b)).toBe(false); + }); + + it("returns false when a node's ephemeralStorage differs", () => { + const a = buildRow({ nodes: [buildNode({ ephemeralStorage: new ResourcePair(1000n, 0n) })] }); + const b = buildRow({ nodes: [buildNode({ ephemeralStorage: new ResourcePair(2000n, 0n) })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); @@ -94,33 +102,33 @@ describe(projectedRowsEqual.name, () => { expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's gpu available count differs", () => { - const a = buildRow({ nodes: [buildNode({ gpu: [{ vendor: "nvidia", model: "a100", available: 1 }] })] }); - const b = buildRow({ nodes: [buildNode({ gpu: [{ vendor: "nvidia", model: "a100", available: 2 }] })] }); + it("returns false when a node's gpu quantity differs", () => { + const a = buildRow({ nodes: [buildNode({ gpu: { quantity: new ResourcePair(1n, 0n), info: [gpu("nvidia", "a100")] } })] }); + const b = buildRow({ nodes: [buildNode({ gpu: { quantity: new ResourcePair(2n, 0n), info: [gpu("nvidia", "a100")] } })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's gpu vendor/model differs", () => { - const a = buildRow({ nodes: [buildNode({ gpu: [{ vendor: "nvidia", model: "a100", available: 1 }] })] }); - const b = buildRow({ nodes: [buildNode({ gpu: [{ vendor: "amd", model: "mi300x", available: 1 }] })] }); + it("returns false when a node's gpu info differs", () => { + const a = buildRow({ nodes: [buildNode({ gpu: { quantity: new ResourcePair(1n, 0n), info: [gpu("nvidia", "a100")] } })] }); + const b = buildRow({ nodes: [buildNode({ gpu: { quantity: new ResourcePair(1n, 0n), info: [gpu("amd", "mi300x")] } })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's persistentStorage class differs", () => { - const a = buildRow({ nodes: [buildNode({ persistentStorage: [{ class: "beta2", available: 100 }] })] }); - const b = buildRow({ nodes: [buildNode({ persistentStorage: [{ class: "beta3", available: 100 }] })] }); + it("returns false when a node's storageClasses differ", () => { + const a = buildRow({ nodes: [buildNode({ storageClasses: ["beta2"] })] }); + const b = buildRow({ nodes: [buildNode({ storageClasses: ["beta3"] })] }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when a node's persistentStorage available differs", () => { - const a = buildRow({ nodes: [buildNode({ persistentStorage: [{ class: "beta2", available: 100 }] })] }); - const b = buildRow({ nodes: [buildNode({ persistentStorage: [{ class: "beta2", available: 200 }] })] }); + it("returns false when cluster-level storage class differs", () => { + const a = buildRow({ storage: storageMap([{ class: "beta2", allocatable: 100n }]) }); + const b = buildRow({ storage: storageMap([{ class: "beta3", allocatable: 100n }]) }); expect(projectedRowsEqual(a, b)).toBe(false); }); - it("returns false when cluster-level storage available differs", () => { - const a = buildRow({ storage: [{ class: "beta2", available: 100 }] }); - const b = buildRow({ storage: [{ class: "beta2", available: 200 }] }); + it("returns false when cluster-level storage quantity differs", () => { + const a = buildRow({ storage: storageMap([{ class: "beta2", allocatable: 100n }]) }); + const b = buildRow({ storage: storageMap([{ class: "beta2", allocatable: 200n }]) }); expect(projectedRowsEqual(a, b)).toBe(false); }); @@ -138,66 +146,12 @@ describe(projectedRowsEqual.name, () => { expect(projectedRowsEqual(a, b)).toBe(true); }); - it("ignores order of gpus within a node", () => { - const a = buildRow({ - nodes: [ - buildNode({ - gpu: [ - { vendor: "nvidia", model: "a100", available: 1 }, - { vendor: "amd", model: "mi300x", available: 2 } - ] - }) - ] - }); - const b = buildRow({ - nodes: [ - buildNode({ - gpu: [ - { vendor: "amd", model: "mi300x", available: 2 }, - { vendor: "nvidia", model: "a100", available: 1 } - ] - }) - ] - }); - expect(projectedRowsEqual(a, b)).toBe(true); - }); - - it("ignores order of persistentStorage within a node", () => { - const a = buildRow({ - nodes: [ - buildNode({ - persistentStorage: [ - { class: "beta2", available: 100 }, - { class: "beta3", available: 200 } - ] - }) - ] - }); - const b = buildRow({ - nodes: [ - buildNode({ - persistentStorage: [ - { class: "beta3", available: 200 }, - { class: "beta2", available: 100 } - ] - }) - ] - }); - expect(projectedRowsEqual(a, b)).toBe(true); - }); - - it("ignores order of cluster-level storage", () => { + it("ignores order of gpu info within a node", () => { const a = buildRow({ - storage: [ - { class: "beta2", available: 100 }, - { class: "beta3", available: 200 } - ] + nodes: [buildNode({ gpu: { quantity: new ResourcePair(3n, 0n), info: [gpu("nvidia", "a100"), gpu("amd", "mi300x")] } })] }); const b = buildRow({ - storage: [ - { class: "beta3", available: 200 }, - { class: "beta2", available: 100 } - ] + nodes: [buildNode({ gpu: { quantity: new ResourcePair(3n, 0n), info: [gpu("amd", "mi300x"), gpu("nvidia", "a100")] } })] }); expect(projectedRowsEqual(a, b)).toBe(true); }); @@ -213,62 +167,26 @@ describe(projectedRowsEqual.name, () => { const b = buildRow({ storageClasses: ["beta3", "beta2"] }); expect(projectedRowsEqual(a, b)).toBe(true); }); - - it("ignores order across all reorderable arrays simultaneously", () => { - const a = buildRow({ - gpuModels: ["amd/mi300x", "nvidia/a100"], - storageClasses: ["beta2", "beta3"], - nodes: [ - buildNode({ - name: "node-1", - gpu: [ - { vendor: "nvidia", model: "a100", available: 1 }, - { vendor: "amd", model: "mi300x", available: 1 } - ], - persistentStorage: [ - { class: "beta2", available: 100 }, - { class: "beta3", available: 200 } - ] - }), - buildNode({ name: "node-2" }) - ], - storage: [ - { class: "beta2", available: 100 }, - { class: "beta3", available: 200 } - ] - }); - const b = buildRow({ - gpuModels: ["nvidia/a100", "amd/mi300x"], - storageClasses: ["beta3", "beta2"], - nodes: [ - buildNode({ name: "node-2" }), - buildNode({ - name: "node-1", - gpu: [ - { vendor: "amd", model: "mi300x", available: 1 }, - { vendor: "nvidia", model: "a100", available: 1 } - ], - persistentStorage: [ - { class: "beta3", available: 200 }, - { class: "beta2", available: 100 } - ] - }) - ], - storage: [ - { class: "beta3", available: 200 }, - { class: "beta2", available: 100 } - ] - }); - expect(projectedRowsEqual(a, b)).toBe(true); - }); }); }); -function buildRow(overrides: Partial & { nodes?: InventoryNode[]; storage?: InventoryClusterStorage[] } = {}): ProjectedRow { +function gpu(vendor: string, name: string): GpuInfo { + return { vendor, name, modelId: "", interface: "", memorySize: "" }; +} + +function storageMap(pools: { class: string; allocatable: bigint; allocated?: bigint }[]): ClusterState["storage"] { + const result: ClusterState["storage"] = Object.create(null); + for (const pool of pools) { + result[pool.class] = { class: pool.class, quantity: new ResourcePair(pool.allocatable, pool.allocated ?? 0n) }; + } + return result; +} + +function buildRow(overrides: Partial & { nodes?: NodeState[]; storage?: ClusterState["storage"] } = {}): ProjectedRow { const { nodes, storage, ...rollupOverrides } = overrides; - const inventory: Inventory = { + const cluster: ClusterState = { nodes: nodes ?? [], - storage: storage ?? [] + storage: storage ?? Object.create(null) }; return { totalAvailableCpu: 0n, @@ -282,18 +200,19 @@ function buildRow(overrides: Partial & { nodes?: InventoryNode gpuModels: [], storageClasses: [], ...rollupOverrides, - inventory + cluster }; } -function buildNode(overrides?: Partial): InventoryNode { +function buildNode(overrides?: Partial): NodeState { return { name: "node-1", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [], - ephStorage: { available: 0 }, - persistentStorage: [], + cpu: new ResourcePair(0n, 0n), + memory: new ResourcePair(0n, 0n), + ephemeralStorage: new ResourcePair(0n, 0n), + gpu: { quantity: new ResourcePair(0n, 0n), info: [] }, + storageClasses: [], + cpus: [], ...overrides }; } diff --git a/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.ts b/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.ts index 608f61e1a..2e13c20f6 100644 --- a/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.ts +++ b/apps/provider-inventory/src/lib/projected-row-equals/projected-row-equals.ts @@ -1,4 +1,6 @@ -import type { Inventory, InventoryNode, InventoryNodeGpu, InventoryNodeStorage, ProjectedRow } from "@src/types/inventory"; +import type { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { ProjectedRow } from "@src/types/inventory"; +import type { ClusterState, CpuInfo, GpuInfo, NodeState } from "@src/types/inventory.types"; export function projectedRowsEqual(a: ProjectedRow, b: ProjectedRow): boolean { if (a === b) return true; @@ -19,19 +21,19 @@ export function projectedRowsEqual(a: ProjectedRow, b: ProjectedRow): boolean { if (!stringArrayEqual(a.gpuModels, b.gpuModels)) return false; if (!stringArrayEqual(a.storageClasses, b.storageClasses)) return false; - return inventoryEqual(a.inventory, b.inventory); + return clusterEqual(a.cluster, b.cluster); } -function inventoryEqual(a: Inventory, b: Inventory): boolean { +function clusterEqual(a: ClusterState, b: ClusterState): boolean { if (a === b) return true; - return storageEqual(a.storage, b.storage) && nodesEqual(a.nodes, b.nodes); + return clusterStorageEqual(a.storage, b.storage) && nodesEqual(a.nodes, b.nodes); } -function nodesEqual(a: readonly InventoryNode[], b: readonly InventoryNode[]): boolean { +function nodesEqual(a: readonly NodeState[], b: readonly NodeState[]): boolean { if (a === b) return true; if (a.length !== b.length) return false; - const byName = new Map(); + const byName = new Map(); for (let i = 0; i < a.length; i++) { byName.set(a[i].name, a[i]); } @@ -42,35 +44,59 @@ function nodesEqual(a: readonly InventoryNode[], b: readonly InventoryNode[]): b return true; } -function nodeEqual(a: InventoryNode, b: InventoryNode): boolean { +function nodeEqual(a: NodeState, b: NodeState): boolean { if (a === b) return true; return ( - a.cpu.available === b.cpu.available && - a.memory.available === b.memory.available && - a.ephStorage.available === b.ephStorage.available && - gpuListEqual(a.gpu, b.gpu) && - storageEqual(a.persistentStorage, b.persistentStorage) + pairEqual(a.cpu, b.cpu) && + pairEqual(a.memory, b.memory) && + pairEqual(a.ephemeralStorage, b.ephemeralStorage) && + pairEqual(a.gpu.quantity, b.gpu.quantity) && + gpuInfoEqual(a.gpu.info, b.gpu.info) && + stringArrayEqual(a.storageClasses, b.storageClasses) && + cpuInfoEqual(a.cpus, b.cpus) ); } -function gpuListEqual(a: readonly InventoryNodeGpu[], b: readonly InventoryNodeGpu[]): boolean { +function pairEqual(a: ResourcePair, b: ResourcePair): boolean { + return a.allocatable === b.allocatable && a.allocated === b.allocated; +} + +function gpuInfoEqual(a: readonly GpuInfo[], b: readonly GpuInfo[]): boolean { if (a === b) return true; if (a.length !== b.length) return false; for (let i = 0; i < a.length; i++) { - const match = b.find(g => g.vendor === a[i].vendor && g.model === a[i].model); - if (!match || match.available !== a[i].available) return false; + const match = b.find( + g => g.vendor === a[i].vendor && g.name === a[i].name && g.modelId === a[i].modelId && g.interface === a[i].interface && g.memorySize === a[i].memorySize + ); + if (!match) return false; } return true; } -function storageEqual(a: readonly InventoryNodeStorage[], b: readonly InventoryNodeStorage[]): boolean { +function cpuInfoEqual(a: readonly CpuInfo[], b: readonly CpuInfo[]): boolean { if (a === b) return true; if (a.length !== b.length) return false; for (let i = 0; i < a.length; i++) { - const match = b.find(s => s.class === a[i].class); - if (!match || match.available !== a[i].available) return false; + const match = b.find(c => c.vendor === a[i].vendor && c.model === a[i].model); + if (!match) return false; + } + return true; +} + +function clusterStorageEqual(a: ClusterState["storage"], b: ClusterState["storage"]): boolean { + if (a === b) return true; + const aKeys = Object.keys(a); + const bKeys = Object.keys(b); + if (aKeys.length !== bKeys.length) return false; + + for (const key of aKeys) { + const poolA = a[key]; + const poolB = b[key]; + if (!poolB) return false; + if (poolA.class !== poolB.class) return false; + if (!pairEqual(poolA.quantity, poolB.quantity)) return false; } return true; } diff --git a/apps/provider-inventory/src/lib/resource-pair/resource-pair.ts b/apps/provider-inventory/src/lib/resource-pair/resource-pair.ts index 1d9c15adb..a86317bee 100644 --- a/apps/provider-inventory/src/lib/resource-pair/resource-pair.ts +++ b/apps/provider-inventory/src/lib/resource-pair/resource-pair.ts @@ -1,45 +1,49 @@ -import type { ResourcePairState } from "../../types/inventory.types"; - const UNLIMITED = -1n; export const MAX_INT64 = 9223372036854775807n; export class ResourcePair { - private allocatable: bigint; - private allocated: bigint; + #allocatable: bigint; + #allocated: bigint; constructor(allocatable: bigint, allocated: bigint) { - this.allocatable = allocatable; - this.allocated = allocated; + this.#allocatable = allocatable; + this.#allocated = allocated; } available(): bigint { - if (this.allocatable === UNLIMITED) return MAX_INT64; - const diff = this.allocatable - this.allocated; + if (this.#allocatable === UNLIMITED) return MAX_INT64; + const diff = this.#allocatable - this.#allocated; return diff > 0n ? diff : 0n; } canAllocate(val: bigint): boolean { - return this.allocatable === UNLIMITED || this.allocatable - this.allocated >= val; + return this.#allocatable === UNLIMITED || this.#allocatable - this.#allocated >= val; } allocate(val: bigint): boolean { if (!this.canAllocate(val)) return false; - this.allocated += val; + this.#allocated += val; return true; } - toState(): ResourcePairState { - return { - allocatable: this.allocatable, - allocated: this.allocated - }; + get allocatable(): bigint { + return this.#allocatable; + } + + get allocated(): bigint { + return this.#allocated; } clone(): ResourcePair { - return new ResourcePair(this.allocatable, this.allocated); + return new ResourcePair(this.#allocatable, this.#allocated); } - static fromState(state: ResourcePairState): ResourcePair { - return new ResourcePair(state.allocatable, state.allocated); + toJSON(): ResourcePairState { + return { allocatable: this.#allocatable, allocated: this.#allocated }; } } + +export interface ResourcePairState { + allocatable: bigint; + allocated: bigint; +} diff --git a/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.spec.ts b/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.spec.ts new file mode 100644 index 000000000..de9d02a66 --- /dev/null +++ b/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.spec.ts @@ -0,0 +1,128 @@ +import { describe, expect, it } from "vitest"; + +import { parseQuantity } from "./stream-status-mapper"; + +describe(parseQuantity.name, () => { + describe("empty / unparseable input", () => { + it("returns 0n when the quantity is undefined", () => { + expect(parseQuantity(undefined)).toBe(0n); + }); + + it("returns 0n when the string is empty", () => { + expect(parseQuantity({ string: "" })).toBe(0n); + }); + + it("returns 0n when the string is whitespace", () => { + expect(parseQuantity({ string: " " })).toBe(0n); + }); + + it("returns 0n on garbage input", () => { + expect(parseQuantity({ string: "abc" })).toBe(0n); + }); + }); + + describe("integer mantissa with binary SI suffix", () => { + it("parses 1Ki as 2^10", () => { + expect(parseQuantity({ string: "1Ki" })).toBe(1024n); + }); + + it("parses 32Gi as 32 * 2^30", () => { + expect(parseQuantity({ string: "32Gi" })).toBe(34359738368n); + }); + + it("preserves precision for 1Ei (2^60 exceeds Number.MAX_SAFE_INTEGER)", () => { + expect(parseQuantity({ string: "1Ei" })).toBe(1n << 60n); + }); + + it("preserves precision for 1000Ei", () => { + expect(parseQuantity({ string: "1000Ei" })).toBe(1000n * (1n << 60n)); + }); + + it("preserves precision for 1Pi (2^50 already exceeds safe integer scale)", () => { + expect(parseQuantity({ string: "1Pi" })).toBe(1n << 50n); + }); + }); + + describe("fractional mantissa with binary SI suffix", () => { + it("parses 1.5Gi exactly", () => { + expect(parseQuantity({ string: "1.5Gi" })).toBe(1610612736n); + }); + + it("truncates fractional results toward zero", () => { + expect(parseQuantity({ string: "1.5Ki" })).toBe(1536n); + }); + + it("preserves precision for 1.5Ei", () => { + expect(parseQuantity({ string: "1.5Ei" })).toBe((15n * (1n << 60n)) / 10n); + }); + }); + + describe("integer mantissa with decimal SI suffix", () => { + it("parses '8' (no suffix) as 8n", () => { + expect(parseQuantity({ string: "8" })).toBe(8n); + }); + + it("parses 1k as 1000", () => { + expect(parseQuantity({ string: "1k" })).toBe(1000n); + }); + + it("parses 1M as 1e6", () => { + expect(parseQuantity({ string: "1M" })).toBe(1_000_000n); + }); + + it("parses 1E as 1e18 (exceeds Number.MAX_SAFE_INTEGER)", () => { + expect(parseQuantity({ string: "1E" })).toBe(10n ** 18n); + }); + + it("preserves precision for 1000E (1e21)", () => { + expect(parseQuantity({ string: "1000E" })).toBe(10n ** 21n); + }); + + it("parses 8000m as 8 (8000 * 1e-3, truncated)", () => { + expect(parseQuantity({ string: "8000m" })).toBe(8n); + }); + + it("truncates 500m to 0", () => { + expect(parseQuantity({ string: "500m" })).toBe(0n); + }); + }); + + describe("decimal-exponent form", () => { + it("parses 1e9 as 1000000000", () => { + expect(parseQuantity({ string: "1e9" })).toBe(1_000_000_000n); + }); + + it("parses 1.5e3 as 1500", () => { + expect(parseQuantity({ string: "1.5e3" })).toBe(1500n); + }); + + it("preserves precision for 1e21", () => { + expect(parseQuantity({ string: "1e21" })).toBe(10n ** 21n); + }); + + it("truncates a negative exponent toward zero", () => { + expect(parseQuantity({ string: "5e-3" })).toBe(0n); + }); + }); + + describe("signed values", () => { + it("parses negative integers", () => { + expect(parseQuantity({ string: "-100" })).toBe(-100n); + }); + + it("parses negative binary SI", () => { + expect(parseQuantity({ string: "-1Ki" })).toBe(-1024n); + }); + + it("parses negative fractional binary SI exactly", () => { + expect(parseQuantity({ string: "-1.5Gi" })).toBe(-1610612736n); + }); + }); + + describe("plain integer mantissa (no suffix)", () => { + it("parses unsafe integers exactly", () => { + const unsafe = 9007199254740993n; + expect(parseQuantity({ string: unsafe.toString() })).toBe(unsafe); + }); + }); +}); diff --git a/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.ts b/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.ts new file mode 100644 index 000000000..061e0ec27 --- /dev/null +++ b/apps/provider-inventory/src/lib/stream-status-mapper/stream-status-mapper.ts @@ -0,0 +1,117 @@ +import type { + Cluster, + CPUInfo, + GPUInfo, + Node as SdkNode, + ResourcePair as SdkResourcePair, + Storage as SdkStorage +} from "@akashnetwork/chain-sdk/private-types/provider.akash.v1"; + +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { ClusterState, CpuInfo, GpuInfo, NodeState } from "@src/types/inventory.types"; + +interface Quantity { + string?: string | undefined; +} + +const BINARY_POW2: Record = { Ki: 10, Mi: 20, Gi: 30, Ti: 40, Pi: 50, Ei: 60 }; +const DECIMAL_POW10: Record = { n: -9, u: -6, m: -3, "": 0, k: 3, M: 6, G: 9, T: 12, P: 15, E: 18 }; + +export function parseQuantity(q: Quantity | undefined): bigint { + const raw = q?.string?.trim(); + if (!raw) return 0n; + + const binary = /^(-?\d+(?:\.\d+)?)([KMGTPE]i)$/.exec(raw); + if (binary) return scaleByPow2(binary[1], BINARY_POW2[binary[2]]); + + const exponent = /^(-?\d+(?:\.\d+)?)[eE](-?\d+)$/.exec(raw); + if (exponent) return scaleByPow10(exponent[1], Number(exponent[2])); + + const decimal = /^(-?\d+(?:\.\d+)?)([numkMGTPE]?)$/.exec(raw); + if (decimal) return scaleByPow10(decimal[1], DECIMAL_POW10[decimal[2]]); + + return 0n; +} + +function scaleByPow2(mantissaStr: string, pow2: number): bigint { + const { negative, digits, fracLen } = parseMantissa(mantissaStr); + const scaled = digits * (1n << BigInt(pow2)); + const truncated = fracLen === 0 ? scaled : scaled / 10n ** BigInt(fracLen); + return negative ? -truncated : truncated; +} + +function scaleByPow10(mantissaStr: string, pow10: number): bigint { + const { negative, digits, fracLen } = parseMantissa(mantissaStr); + const netExp = pow10 - fracLen; + const scaled = netExp >= 0 ? digits * 10n ** BigInt(netExp) : digits / 10n ** BigInt(-netExp); + return negative ? -scaled : scaled; +} + +function parseMantissa(s: string): { negative: boolean; digits: bigint; fracLen: number } { + let i = 0; + let negative = false; + if (s[0] === "-") { + negative = true; + i = 1; + } else if (s[0] === "+") { + i = 1; + } + const body = s.slice(i); + const dot = body.indexOf("."); + if (dot === -1) { + return { negative, digits: BigInt(body || "0"), fracLen: 0 }; + } + const intPart = body.slice(0, dot); + const fracPart = body.slice(dot + 1); + return { negative, digits: BigInt((intPart || "0") + fracPart || "0"), fracLen: fracPart.length }; +} + +function pairFromSdk(pair: SdkResourcePair | undefined): ResourcePair { + return new ResourcePair(parseQuantity(pair?.allocatable), parseQuantity(pair?.allocated)); +} + +function mapGpuInfo(info: GPUInfo): GpuInfo { + return { + vendor: info.vendor, + name: info.name, + modelId: info.modelid, + interface: info.interface, + memorySize: info.memorySize + }; +} + +function mapCpuInfo(info: CPUInfo): CpuInfo { + return { vendor: info.vendor, model: info.model }; +} + +function mapNode(node: SdkNode): NodeState { + const resources = node.resources; + return { + name: node.name, + cpu: pairFromSdk(resources?.cpu?.quantity), + memory: pairFromSdk(resources?.memory?.quantity), + ephemeralStorage: pairFromSdk(resources?.ephemeralStorage), + gpu: { + quantity: pairFromSdk(resources?.gpu?.quantity), + info: (resources?.gpu?.info ?? []).map(mapGpuInfo) + }, + storageClasses: node.capabilities?.storageClasses ?? [], + cpus: (resources?.cpu?.info ?? []).map(mapCpuInfo) + }; +} + +function mapClusterStorage(storage: SdkStorage[]): ClusterState["storage"] { + const result: ClusterState["storage"] = Object.create(null); + for (const pool of storage) { + const cls = pool.info?.class ?? ""; + result[cls] = { class: cls, quantity: pairFromSdk(pool.quantity) }; + } + return result; +} + +export function mapClusterToStreamStatus(cluster: Cluster): ClusterState { + return { + nodes: cluster.nodes.map(mapNode), + storage: mapClusterStorage(cluster.storage) + }; +} diff --git a/apps/provider-inventory/src/model-schemas/provider-inventory/provider-inventory.schema.ts b/apps/provider-inventory/src/model-schemas/provider-inventory/provider-inventory.schema.ts index fa29d3f73..c007cf905 100644 --- a/apps/provider-inventory/src/model-schemas/provider-inventory/provider-inventory.schema.ts +++ b/apps/provider-inventory/src/model-schemas/provider-inventory/provider-inventory.schema.ts @@ -1,5 +1,7 @@ import { sql } from "drizzle-orm"; -import { bigint, boolean, doublePrecision, index, jsonb, pgTable, text, timestamp } from "drizzle-orm/pg-core"; +import { bigint, boolean, doublePrecision, index, pgTable, text, timestamp } from "drizzle-orm/pg-core"; + +import { jsonbBigint } from "@src/lib/jsonb-bigint/jsonb-bigint.column"; export const providerInventory = pgTable( "provider_inventory", @@ -12,7 +14,7 @@ export const providerInventory = pgTable( isOnline: boolean("is_online").notNull().default(false), isOnlineSince: timestamp("is_online_since", { withTimezone: true }), - inventory: jsonb("inventory").notNull().default({}), + inventory: jsonbBigint("inventory").notNull().default({}), totalAvailableCpu: bigint("total_available_cpu", { mode: "bigint" }).notNull().default(BigInt(0)), totalAvailableMemory: bigint("total_available_memory", { mode: "bigint" }).notNull().default(BigInt(0)), @@ -26,8 +28,8 @@ export const providerInventory = pgTable( gpuModels: text("gpu_models").array().notNull().default([]), storageClasses: text("storage_classes").array().notNull().default([]), - selfAttributes: jsonb("self_attributes").notNull().default([]), - signedAttributes: jsonb("signed_attributes").notNull().default([]), + selfAttributes: jsonbBigint("self_attributes").notNull().default([]), + signedAttributes: jsonbBigint("signed_attributes").notNull().default([]), auditedBy: text("audited_by").array().notNull().default([]), updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow() diff --git a/apps/provider-inventory/src/providers/index.ts b/apps/provider-inventory/src/providers/index.ts index 56490ebeb..887107752 100644 --- a/apps/provider-inventory/src/providers/index.ts +++ b/apps/provider-inventory/src/providers/index.ts @@ -5,5 +5,4 @@ export * from "./postgres.provider"; export * from "./drizzle.provider"; export * from "./logger-factory.provider"; export * from "./chain-query.provider"; -export * from "./provider-stream.provider"; export * from "./stream-boostrap.provider"; diff --git a/apps/provider-inventory/src/providers/provider-stream.provider.ts b/apps/provider-inventory/src/providers/provider-stream.provider.ts deleted file mode 100644 index 6e6d4100f..000000000 --- a/apps/provider-inventory/src/providers/provider-stream.provider.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { InjectionToken } from "tsyringe"; -import { container, instancePerContainerCachingFactory } from "tsyringe"; - -import type { StreamStatusMessage } from "@src/types/stream-status"; - -export interface ProviderStreamFactory { - openStatusStream(hostUri: string, signal: AbortSignal): AsyncIterable; -} - -export const PROVIDER_STREAM_FACTORY: InjectionToken = Symbol("PROVIDER_STREAM_FACTORY"); - -container.register(PROVIDER_STREAM_FACTORY, { - useFactory: instancePerContainerCachingFactory( - () => - ({ - // eslint-disable-next-line @typescript-eslint/no-empty-function - async *openStatusStream() {} - }) satisfies ProviderStreamFactory - ) -}); diff --git a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts index b0bb98720..518002583 100644 --- a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts +++ b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts @@ -2,9 +2,10 @@ import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; import { describe, expect, it, vi } from "vitest"; import { mock } from "vitest-mock-extended"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import type { LoggerFactory } from "@src/providers/logger-factory.provider"; import type { ChainProvider } from "@src/types/chain-provider"; -import type { Inventory, ProjectedRow } from "@src/types/inventory"; +import type { ProjectedRow } from "@src/types/inventory"; import { ProviderInventoryRepository } from "./provider-inventory.repository"; describe(ProviderInventoryRepository.name, () => { @@ -57,10 +58,19 @@ describe(ProviderInventoryRepository.name, () => { expect(db.update).toHaveBeenCalledTimes(1); expect(db._updateSet).toHaveBeenCalledWith(expect.objectContaining({ totalAvailableCpu: 1000n, isOnline: true })); }); + + it("writes the ClusterState as inventory JSONB", async () => { + const { writer, db } = setup(); + const cluster = createProjectedRow({ cpu: 8000n }).cluster; + + await writer.updateInventory(createProvider({ owner: "a" }), { ...createProjectedRow(), cluster }); + + expect(db._updateSet).toHaveBeenCalledWith(expect.objectContaining({ inventory: cluster })); + }); }); - describe("getOnlineProvidersWithSnapshots", () => { - it("maps node available capacity into allocatable with allocated=0", async () => { + describe("getOnlineProviders", () => { + it("hydrates persisted ClusterState back into ResourcePair instances", async () => { const { writer } = setup({ selectRows: [ { @@ -68,44 +78,40 @@ describe(ProviderInventoryRepository.name, () => { hostUri: "https://h:8443", ipRegion: "us-east", uptime7d: 0.998, - inventory: createInventory({ + inventory: { nodes: [ { name: "node1", - cpu: { available: 8000 }, - memory: { available: 17179869184 }, - gpu: [], - ephStorage: { available: 107374182400 }, - persistentStorage: [] + cpu: { allocatable: 8000, allocated: 2000 }, + memory: { allocatable: 17179869184, allocated: 4294967296 }, + ephemeralStorage: { allocatable: 107374182400, allocated: 0 }, + gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, + storageClasses: ["beta2"], + cpus: [] } - ] - }) + ], + storage: {} + } } ] }); - const result = await writer.getOnlineProvidersWithSnapshots(); + const result = await writer.getOnlineProviders(); expect(result).toHaveLength(1); expect(result[0]).toMatchObject({ owner: "akash1abc", hostUri: "https://h:8443", ipRegion: "us-east", uptime7d: 0.998 }); - const node = result[0].lastSuccessfulSnapshot.nodes[0]; - expect(node).toMatchObject({ - name: "node1", - cpuAllocatable: 8000, - cpuAllocated: 0, - memoryAllocatable: 17179869184, - memoryAllocated: 0, - ephemeralStorageAllocatable: 107374182400, - ephemeralStorageAllocated: 0, - gpuAllocatable: 0, - gpuAllocated: 0, - capabilitiesStorageHDD: false, - capabilitiesStorageSSD: false, - capabilitiesStorageNVME: false - }); + const node = result[0].cluster.nodes[0]; + expect(node.name).toBe("node1"); + expect(node.cpu.allocatable).toBe(8000n); + expect(node.cpu.allocated).toBe(2000n); + expect(node.memory.allocatable).toBe(17179869184n); + expect(node.memory.allocated).toBe(4294967296n); + expect(node.ephemeralStorage.allocatable).toBe(107374182400n); + expect(node.ephemeralStorage.allocated).toBe(0n); + expect(node.storageClasses).toEqual(["beta2"]); }); - it("derives storage class capabilities from persistentStorage entries", async () => { + it("hydrates cluster storage pools as ResourcePair entries keyed by class", async () => { const { writer } = setup({ selectRows: [ { @@ -113,34 +119,24 @@ describe(ProviderInventoryRepository.name, () => { hostUri: "https://h:8443", ipRegion: null, uptime7d: null, - inventory: createInventory({ - nodes: [ - { - name: "node1", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [], - ephStorage: { available: 0 }, - persistentStorage: [ - { class: "beta2", available: 0 }, - { class: "beta3", available: 0 } - ] - } - ] - }) + inventory: { + nodes: [], + storage: { beta2: { class: "beta2", quantity: { allocatable: 536870912000, allocated: 0 } } } + } } ] }); - const result = await writer.getOnlineProvidersWithSnapshots(); + const result = await writer.getOnlineProviders(); - const node = result[0].lastSuccessfulSnapshot.nodes[0]; - expect(node.capabilitiesStorageHDD).toBe(false); - expect(node.capabilitiesStorageSSD).toBe(true); - expect(node.capabilitiesStorageNVME).toBe(true); + const beta2 = result[0].cluster.storage["beta2"]; + expect(beta2.class).toBe("beta2"); + expect(beta2.quantity.allocatable).toBe(536870912000n); + expect(beta2.quantity.allocated).toBe(0n); }); - it("aggregates per-gpu available into a single gpuAllocatable", async () => { + it("preserves bigint magnitudes that exceed Number.MAX_SAFE_INTEGER", async () => { + const unsafe = 9007199254740993n; const { writer } = setup({ selectRows: [ { @@ -148,52 +144,32 @@ describe(ProviderInventoryRepository.name, () => { hostUri: "https://h:8443", ipRegion: null, uptime7d: null, - inventory: createInventory({ + inventory: { nodes: [ { name: "node1", - cpu: { available: 0 }, - memory: { available: 0 }, - gpu: [ - { vendor: "nvidia", model: "a100", available: 2 }, - { vendor: "nvidia", model: "h100", available: 1 } - ], - ephStorage: { available: 0 }, - persistentStorage: [] + cpu: { allocatable: unsafe, allocated: 0 }, + memory: { allocatable: 0, allocated: 0 }, + ephemeralStorage: { allocatable: 0, allocated: 0 }, + gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, + storageClasses: [], + cpus: [] } - ] - }) + ], + storage: {} + } } ] }); - const result = await writer.getOnlineProvidersWithSnapshots(); + const result = await writer.getOnlineProviders(); - expect(result[0].lastSuccessfulSnapshot.nodes[0].gpuAllocatable).toBe(3); - expect(result[0].lastSuccessfulSnapshot.nodes[0].gpus).toHaveLength(2); - }); - - it("maps cluster storage pools with allocated=0", async () => { - const { writer } = setup({ - selectRows: [ - { - owner: "akash1abc", - hostUri: "https://h:8443", - ipRegion: null, - uptime7d: null, - inventory: createInventory({ storage: [{ class: "beta2", available: 536870912000 }] }) - } - ] - }); - - const result = await writer.getOnlineProvidersWithSnapshots(); - - expect(result[0].lastSuccessfulSnapshot.storage).toEqual([{ class: "beta2", allocatable: 536870912000, allocated: 0 }]); + expect(result[0].cluster.nodes[0].cpu.allocatable).toBe(unsafe); }); it("returns empty array when no rows match", async () => { const { writer } = setup({ selectRows: [] }); - expect(await writer.getOnlineProvidersWithSnapshots()).toEqual([]); + expect(await writer.getOnlineProviders()).toEqual([]); }); }); @@ -259,8 +235,17 @@ function createProvider(overrides?: Partial): ChainProvider { } function createProjectedRow(overrides?: { cpu?: bigint }): ProjectedRow { + const node = { + name: "node-1", + cpu: new ResourcePair(overrides?.cpu ?? 0n, 0n), + memory: new ResourcePair(0n, 0n), + ephemeralStorage: new ResourcePair(0n, 0n), + gpu: { quantity: new ResourcePair(0n, 0n), info: [] }, + storageClasses: [], + cpus: [] + }; return { - inventory: { nodes: [], storage: [] }, + cluster: { nodes: [node], storage: Object.create(null) }, totalAvailableCpu: overrides?.cpu ?? 0n, totalAvailableMemory: 0n, totalAvailableGpu: 0n, @@ -273,10 +258,3 @@ function createProjectedRow(overrides?: { cpu?: bigint }): ProjectedRow { storageClasses: [] }; } - -function createInventory(overrides?: Partial): Inventory { - return { - nodes: overrides?.nodes ?? [], - storage: overrides?.storage ?? [] - }; -} diff --git a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts index 5887a69cf..54a676eda 100644 --- a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts +++ b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts @@ -3,17 +3,15 @@ import { and, arrayOverlaps, eq, sql as rawSql } from "drizzle-orm"; import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; import { inject, singleton } from "tsyringe"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import { providerInventory } from "@src/model-schemas/provider-inventory/provider-inventory.schema"; import { DRIZZLE_DB } from "@src/providers/drizzle.provider"; import type { LoggerFactory } from "@src/providers/logger-factory.provider"; import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; import type { ChainProvider } from "@src/types/chain-provider"; -import type { Inventory, ProjectedRow } from "@src/types/inventory"; -import type { ProviderWithSnapshot } from "@src/types/provider"; - -const STORAGE_CLASS_HDD = "beta1"; -const STORAGE_CLASS_SSD = "beta2"; -const STORAGE_CLASS_NVME = "beta3"; +import type { ProjectedRow } from "@src/types/inventory"; +import type { ClusterState, NodeState } from "@src/types/inventory.types"; +import type { ProviderWithClusterState } from "@src/types/provider"; @singleton() export class ProviderInventoryRepository { @@ -47,7 +45,7 @@ export class ProviderInventoryRepository { await this.#db .update(providerInventory) .set({ - inventory: row.inventory, + inventory: row.cluster, totalAvailableCpu: row.totalAvailableCpu, totalAvailableMemory: row.totalAvailableMemory, totalAvailableGpu: row.totalAvailableGpu, @@ -86,7 +84,7 @@ export class ProviderInventoryRepository { this.#logger.debug({ event: "PROVIDER_ATTRIBUTES_UPSERTED", owner: provider.owner }); } - async getOnlineProvidersWithSnapshots(): Promise { + async getOnlineProviders(): Promise { const rows = await this.#db .select({ owner: providerInventory.owner, @@ -103,7 +101,7 @@ export class ProviderInventoryRepository { hostUri: row.hostUri, ipRegion: row.ipRegion, uptime7d: row.uptime7d, - lastSuccessfulSnapshot: inventoryToSnapshot(row.inventory as Inventory) + cluster: hydrateClusterState(row.inventory) })); } @@ -121,38 +119,43 @@ export class ProviderInventoryRepository { } } -function inventoryToSnapshot(inventory: Inventory): ProviderWithSnapshot["lastSuccessfulSnapshot"] { +type RawPair = { allocatable: number | bigint; allocated: number | bigint }; +type RawNode = Omit & { + cpu: RawPair; + memory: RawPair; + ephemeralStorage: RawPair; + gpu: { quantity: RawPair; info: NodeState["gpu"]["info"] }; +}; +type RawCluster = { + nodes?: RawNode[]; + storage?: Record; +}; + +export function hydrateClusterState(raw: unknown): ClusterState { + const cluster = (raw ?? {}) as RawCluster; + const nodes = (cluster.nodes ?? []).map(hydrateNode); + const storage: ClusterState["storage"] = Object.create(null); + for (const [key, pool] of Object.entries(cluster.storage ?? {})) { + storage[key] = { class: pool.class, quantity: hydratePair(pool.quantity) }; + } + return { nodes, storage }; +} + +function hydrateNode(node: RawNode): NodeState { return { - nodes: inventory.nodes.map(node => { - const classes = new Set(node.persistentStorage.map(p => p.class)); - const gpuTotal = node.gpu.reduce((acc, g) => acc + g.available, 0); - return { - name: node.name, - cpuAllocatable: node.cpu.available, - cpuAllocated: 0, - memoryAllocatable: node.memory.available, - memoryAllocated: 0, - ephemeralStorageAllocatable: node.ephStorage.available, - ephemeralStorageAllocated: 0, - gpuAllocatable: gpuTotal, - gpuAllocated: 0, - capabilitiesStorageHDD: classes.has(STORAGE_CLASS_HDD), - capabilitiesStorageSSD: classes.has(STORAGE_CLASS_SSD), - capabilitiesStorageNVME: classes.has(STORAGE_CLASS_NVME), - gpus: node.gpu.map(g => ({ - vendor: g.vendor, - name: g.model, - modelId: "", - interface: "", - memorySize: "" - })), - cpus: [] - }; - }), - storage: inventory.storage.map(s => ({ - class: s.class, - allocatable: s.available, - allocated: 0 - })) + name: node.name, + cpu: hydratePair(node.cpu), + memory: hydratePair(node.memory), + ephemeralStorage: hydratePair(node.ephemeralStorage), + gpu: { quantity: hydratePair(node.gpu.quantity), info: node.gpu.info ?? [] }, + storageClasses: node.storageClasses ?? [], + cpus: node.cpus ?? [] }; } + +function hydratePair(pair: RawPair): ResourcePair { + const allocatable = typeof pair.allocatable === "bigint" ? pair.allocatable : BigInt(pair.allocatable || 0); + const allocated = typeof pair.allocated === "bigint" ? pair.allocated : BigInt(pair.allocated || 0); + + return new ResourcePair(allocatable, allocated); +} diff --git a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts index 2101bff79..353cc7315 100644 --- a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts +++ b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts @@ -3,8 +3,9 @@ import { describe, expect, it } from "vitest"; import { mock } from "vitest-mock-extended"; import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import type { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; -import type { ProviderWithSnapshot } from "../../types/provider"; +import type { ProviderWithClusterState } from "../../types/provider"; import type { ClusterInventoryMatcherService } from "../cluster-inventory-matcher/cluster-inventory-matcher.service"; import { BidScreeningService } from "./bid-screening.service"; @@ -12,7 +13,7 @@ describe(BidScreeningService.name, () => { describe("findMatchingProviders", () => { it("returns passing providers with metadata", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([makeProvider("akash1abc")]); + repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); matcher.match.mockReturnValue({ matched: true }); @@ -31,7 +32,7 @@ describe(BidScreeningService.name, () => { it("filters out providers that fail matching", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); + repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); matcher.match.mockReturnValueOnce({ matched: true }).mockReturnValueOnce({ matched: false, error: "INSUFFICIENT_CAPACITY" }); @@ -41,9 +42,21 @@ describe(BidScreeningService.name, () => { expect(results[0].owner).toBe("akash1abc"); }); + it("passes the provider's ClusterState directly to the matcher", async () => { + const { service, repository, matcher } = setup(); + const provider = makeProvider("akash1abc"); + repository.getOnlineProviders.mockResolvedValue([provider]); + repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + matcher.match.mockReturnValue({ matched: true }); + + await service.findMatchingProviders(makeRequest()); + + expect(matcher.match).toHaveBeenCalledWith(provider.cluster, expect.any(Array)); + }); + it("returns empty array when no providers are online", async () => { const { service, repository } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([]); + repository.getOnlineProviders.mockResolvedValue([]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); const results = await service.findMatchingProviders(makeRequest()); @@ -53,7 +66,7 @@ describe(BidScreeningService.name, () => { it("returns empty array when all providers fail matching", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([makeProvider("akash1abc")]); + repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); matcher.match.mockReturnValue({ matched: false, error: "INSUFFICIENT_CAPACITY" }); @@ -66,19 +79,19 @@ describe(BidScreeningService.name, () => { describe("filtering", () => { it("loads providers and audited owners in parallel after pre-filter", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([makeProvider("akash1abc")]); + repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); matcher.match.mockReturnValue({ matched: true }); await service.findMatchingProviders(makeRequest()); - expect(repository.getOnlineProvidersWithSnapshots).toHaveBeenCalledTimes(1); + expect(repository.getOnlineProviders).toHaveBeenCalledTimes(1); expect(repository.getAuditedProviderAddresses).toHaveBeenCalledTimes(1); }); it("enriches isAudited=true for providers with matching auditor signatures", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProvidersWithSnapshots.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); + repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); repository.getAuditedProviderAddresses.mockResolvedValue(new Set(["akash1abc"])); matcher.match.mockReturnValue({ matched: true }); @@ -99,32 +112,25 @@ describe(BidScreeningService.name, () => { } }); -function makeProvider(owner: string, hostUri = "https://provider.example.com:8443"): ProviderWithSnapshot { +function makeProvider(owner: string, hostUri = "https://provider.example.com:8443"): ProviderWithClusterState { return { owner, hostUri, ipRegion: "us-east", uptime7d: 0.998, - lastSuccessfulSnapshot: { + cluster: { nodes: [ { name: "node1", - cpuAllocatable: 8000, - cpuAllocated: 0, - memoryAllocatable: 17179869184, - memoryAllocated: 0, - ephemeralStorageAllocatable: 107374182400, - ephemeralStorageAllocated: 0, - gpuAllocatable: 0, - gpuAllocated: 0, - capabilitiesStorageHDD: false, - capabilitiesStorageSSD: true, - capabilitiesStorageNVME: false, - gpus: [], + cpu: new ResourcePair(8000n, 0n), + memory: new ResourcePair(17179869184n, 0n), + ephemeralStorage: new ResourcePair(107374182400n, 0n), + gpu: { quantity: new ResourcePair(0n, 0n), info: [] }, + storageClasses: ["beta2"], cpus: [] } ], - storage: [] + storage: Object.create(null) } }; } diff --git a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts index 3d9d22d58..e6a0fad09 100644 --- a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts +++ b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts @@ -6,7 +6,7 @@ import { ProviderInventoryRepository } from "@src/repositories/provider-inventor import type { GroupSpecJSON } from "../../lib/groupspec-mapper/groupspec-mapper"; import { mapGroupSpecToResourceUnits } from "../../lib/groupspec-mapper/groupspec-mapper"; import type { BidScreeningResult } from "../../types/inventory.types"; -import type { ProviderWithSnapshot } from "../../types/provider"; +import type { ProviderWithClusterState } from "../../types/provider"; import { ClusterInventoryMatcherService } from "../cluster-inventory-matcher/cluster-inventory-matcher.service"; const AUDITOR = "akash1365yvmc4s7awdyj3n2sav7xfx76adc6dnmlx63"; @@ -36,15 +36,12 @@ export class BidScreeningService { this.#logger.info({ event: "BID_SCREENING_START", resourceGroupCount: resourceUnits.length }); - const [providers, auditedOwners] = await Promise.all([ - this.#repository.getOnlineProvidersWithSnapshots(), - this.#repository.getAuditedProviderAddresses([AUDITOR]) - ]); + const [providers, auditedOwners] = await Promise.all([this.#repository.getOnlineProviders(), this.#repository.getAuditedProviderAddresses([AUDITOR])]); const results: BidScreeningResult[] = []; for (const provider of providers) { - const matchResult = this.#matcher.match(provider, resourceUnits); + const matchResult = this.#matcher.match(provider.cluster, resourceUnits); if (matchResult.matched) { results.push(this.#toResult(provider, auditedOwners.has(provider.owner))); @@ -82,7 +79,7 @@ export class BidScreeningService { } } - #toResult(provider: ProviderWithSnapshot, isAudited: boolean): BidScreeningResult { + #toResult(provider: ProviderWithClusterState, isAudited: boolean): BidScreeningResult { return { owner: provider.owner, hostUri: provider.hostUri, diff --git a/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.spec.ts b/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.spec.ts index e43b946a1..683325954 100644 --- a/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.spec.ts +++ b/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.spec.ts @@ -1,57 +1,57 @@ import { describe, expect, it } from "vitest"; -import type { ProviderWithSnapshot } from "@src/types/provider"; -import type { GpuInfo, RequestedResourceUnit } from "../../types/inventory.types"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { ClusterState, CpuInfo, GpuInfo, NodeState, RequestedResourceUnit } from "../../types/inventory.types"; import { ClusterInventoryMatcherService } from "./cluster-inventory-matcher.service"; describe(ClusterInventoryMatcherService.name, () => { describe("basic resource matching (US1)", () => { it("matches when single node has sufficient resources", () => { - const { service, provider, resourceUnits } = setup({}); - const result = service.match(provider, resourceUnits); + const { service, cluster, resourceUnits } = setup({}); + const result = service.match(cluster, resourceUnits); expect(result.matched).toBe(true); }); it("fails when single node has insufficient CPU", () => { - const { service, provider, resourceUnits } = setup({ requestedCpu: 20000n }); - const result = service.match(provider, resourceUnits); + const { service, cluster, resourceUnits } = setup({ requestedCpu: 20000n }); + const result = service.match(cluster, resourceUnits); expect(result.matched).toBe(false); expect(result.error).toBe("INSUFFICIENT_CAPACITY"); }); it("fails when single node has insufficient memory", () => { - const { service, provider, resourceUnits } = setup({ requestedMemory: 999999999999n }); - const result = service.match(provider, resourceUnits); + const { service, cluster, resourceUnits } = setup({ requestedMemory: 999999999999n }); + const result = service.match(cluster, resourceUnits); expect(result.matched).toBe(false); }); it("spreads replicas across multiple nodes when single node is insufficient", () => { const { service } = setup({}); - const provider = makeProvider([ - { cpu: 4000, memory: 8589934592, ephemeral: 10737418240 }, - { cpu: 4000, memory: 8589934592, ephemeral: 10737418240 } + const cluster = makeCluster([ + { cpu: 4000n, memory: 8589934592n, ephemeral: 10737418240n }, + { cpu: 4000n, memory: 8589934592n, ephemeral: 10737418240n } ]); const units = makeResourceUnits({ cpu: 3000n, memory: 4294967296n, ephemeral: 5368709120n, count: 2 }); - const result = service.match(provider, units); + const result = service.match(cluster, units); expect(result.matched).toBe(true); }); it("places all replicas on one node when possible", () => { const { service } = setup({}); - const provider = makeProvider([{ cpu: 16000, memory: 34359738368, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 16000n, memory: 34359738368n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 1073741824n, ephemeral: 5368709120n, count: 4 }); - const result = service.match(provider, units); + const result = service.match(cluster, units); expect(result.matched).toBe(true); }); - it("fails when provider has zero nodes", () => { + it("fails when cluster has zero nodes", () => { const { service } = setup({}); - const provider = makeProvider([]); + const cluster = makeCluster([]); const units = makeResourceUnits({ cpu: 1000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1 }); - const result = service.match(provider, units); + const result = service.match(cluster, units); expect(result.matched).toBe(false); }); }); @@ -59,15 +59,15 @@ describe(ClusterInventoryMatcherService.name, () => { describe("storage matching (US3)", () => { it("deducts ephemeral storage from node", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, ephemeral: 10737418240 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, ephemeral: 10737418240n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("deducts RAM-backed storage from node memory", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 8589934592n, @@ -85,12 +85,12 @@ describe(ClusterInventoryMatcherService.name, () => { ] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails RAM-backed storage when combined with memory exceeds node capacity", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 10737418240, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 10737418240n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 8589934592n, @@ -108,14 +108,14 @@ describe(ClusterInventoryMatcherService.name, () => { ] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("deducts persistent storage from cluster pool", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider( - [{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400, storageClasses: ["beta2"] }], - [{ class: "beta2", allocatable: 536870912000, allocated: 0 }] + const cluster = makeCluster( + [{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n, storageClasses: ["beta2"] }], + [{ class: "beta2", allocatable: 536870912000n, allocated: 0n }] ); const units = makeResourceUnits({ cpu: 1000n, @@ -134,14 +134,14 @@ describe(ClusterInventoryMatcherService.name, () => { ] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails when node lacks storage class capability", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider( - [{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400, storageClasses: ["beta1"] }], - [{ class: "beta2", allocatable: 536870912000, allocated: 0 }] + const cluster = makeCluster( + [{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n, storageClasses: ["beta1"] }], + [{ class: "beta2", allocatable: 536870912000n, allocated: 0n }] ); const units = makeResourceUnits({ cpu: 1000n, @@ -160,14 +160,14 @@ describe(ClusterInventoryMatcherService.name, () => { ] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("fails when cluster pool is exhausted", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider( - [{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400, storageClasses: ["beta2"] }], - [{ class: "beta2", allocatable: 1073741824, allocated: 0 }] + const cluster = makeCluster( + [{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n, storageClasses: ["beta2"] }], + [{ class: "beta2", allocatable: 1073741824n, allocated: 0n }] ); const units = makeResourceUnits({ cpu: 1000n, @@ -186,19 +186,19 @@ describe(ClusterInventoryMatcherService.name, () => { ] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); }); describe("GPU matching (US2)", () => { it("matches exact GPU vendor and model", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }] } ]); @@ -211,17 +211,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("matches wildcard model against any GPU model", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "rtx4090", modelId: "2684", interface: "PCIe", memorySize: "24Gi" }] } ]); @@ -234,17 +234,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("filters by RAM size", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "40Gi" }] } ]); @@ -257,17 +257,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100/ram/80Gi", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("filters by interface with sxm normalization", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "SXM4", memorySize: "80Gi" }] } ]); @@ -280,17 +280,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100/interface/sxm2", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails when no GPU matches on node", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "amd", name: "mi300x", modelId: "740f", interface: "PCIe", memorySize: "192Gi" }] } ]); @@ -303,25 +303,25 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("passes when zero GPUs are requested", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1, gpuUnits: 0n }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails when requesting 2 A100s but node has 1 A100 + 1 V100", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "v100", modelId: "1db4", interface: "PCIe", memorySize: "32Gi" } @@ -337,17 +337,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("pins to first GPU type when no GPU specs provided", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }] } ]); @@ -360,17 +360,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails GPU request when node has no GPU info", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 8000, - memory: 17179869184, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 8000n, + memory: 17179869184n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [] } ]); @@ -383,17 +383,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("rejects wildcard request for 2 GPUs when node has mixed RAM variants of the same model", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "40Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } @@ -409,17 +409,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("rejects wildcard request for 2 GPUs when node has mixed interfaces of the same model", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "SXM4", memorySize: "80Gi" } @@ -435,17 +435,17 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("matches wildcard request for 2 GPUs when node has identical SKUs", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } @@ -461,16 +461,16 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); }); describe("multi-group matching (US5)", () => { it("matches two groups spread across nodes", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ - { cpu: 8000, memory: 17179869184, ephemeral: 107374182400 }, - { cpu: 8000, memory: 17179869184, ephemeral: 107374182400 } + const cluster = makeCluster([ + { cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n }, + { cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n } ]); const cpuGroup: RequestedResourceUnit = { id: 1, @@ -493,24 +493,24 @@ describe(ClusterInventoryMatcherService.name, () => { count: 1 }; - const result = service.match(provider, [cpuGroup, gpuGroup]); + const result = service.match(cluster, [cpuGroup, gpuGroup]); expect(result.matched).toBe(false); }); it("places mixed CPU+GPU groups on appropriate nodes", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } ] }, - { cpu: 16000, memory: 34359738368, ephemeral: 107374182400 } + { cpu: 16000n, memory: 34359738368n, ephemeral: 107374182400n } ]); const cpuGroup: RequestedResourceUnit = { @@ -524,13 +524,13 @@ describe(ClusterInventoryMatcherService.name, () => { count: 4 }; - const result = service.match(provider, [cpuGroup]); + const result = service.match(cluster, [cpuGroup]); expect(result.matched).toBe(true); }); - it("fails entire provider when one group has insufficient capacity", () => { + it("fails entire cluster when one group has insufficient capacity", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 4000, memory: 8589934592, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 4000n, memory: 8589934592n, ephemeral: 107374182400n }]); const smallGroup: RequestedResourceUnit = { id: 1, @@ -553,13 +553,13 @@ describe(ClusterInventoryMatcherService.name, () => { count: 1 }; - const result = service.match(provider, [smallGroup, largeGroup]); + const result = service.match(cluster, [smallGroup, largeGroup]); expect(result.matched).toBe(false); }); it("handles all groups placed successfully on a large node", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 32000, memory: 68719476736, ephemeral: 536870912000 }]); + const cluster = makeCluster([{ cpu: 32000n, memory: 68719476736n, ephemeral: 536870912000n }]); const group1: RequestedResourceUnit = { id: 1, @@ -582,7 +582,7 @@ describe(ClusterInventoryMatcherService.name, () => { count: 3 }; - const result = service.match(provider, [group1, group2]); + const result = service.match(cluster, [group1, group2]); expect(result.matched).toBe(true); }); }); @@ -590,22 +590,22 @@ describe(ClusterInventoryMatcherService.name, () => { describe("replica GPU consistency invariant (US5)", () => { it("passes when all replicas with wildcard model land on nodes with the same GPU model", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } ] }, { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }] } ]); @@ -618,27 +618,27 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails when a later replica would resolve to a different GPU model than the first (wildcard)", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } ] }, { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "v100", modelId: "1db4", interface: "PCIe", memorySize: "32Gi" }] } ]); @@ -651,24 +651,24 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("fails when a later replica on another node has a different RAM size than the first (wildcard)", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }] }, { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 1, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 1n, gpuInfo: [{ vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "40Gi" }] } ]); @@ -681,71 +681,71 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); }); describe("nodes with existing allocations", () => { it("matches when remaining capacity after allocation is sufficient", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, cpuAllocated: 3000, memory: 17179869184, memoryAllocated: 4294967296, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, cpuAllocated: 3000n, memory: 17179869184n, memoryAllocated: 4294967296n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 4000n, memory: 8589934592n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("fails when CPU remaining after allocation is insufficient", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, cpuAllocated: 5000, memory: 17179869184, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, cpuAllocated: 5000n, memory: 17179869184n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 4000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("fails when memory remaining after allocation is insufficient", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, memoryAllocated: 14000000000, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, memoryAllocated: 14000000000n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 8589934592n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("fails when ephemeral storage remaining after allocation is insufficient", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400, ephemeralAllocated: 105000000000 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n, ephemeralAllocated: 105000000000n }]); const units = makeResourceUnits({ cpu: 1000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); it("matches at exact remaining boundary", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, cpuAllocated: 4000, memory: 17179869184, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, cpuAllocated: 4000n, memory: 17179869184n, ephemeral: 107374182400n }]); const units = makeResourceUnits({ cpu: 4000n, memory: 1073741824n, ephemeral: 5368709120n, count: 1 }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("spreads replicas across nodes with varying allocations", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ - { cpu: 8000, cpuAllocated: 4000, memory: 17179869184, ephemeral: 107374182400 }, - { cpu: 8000, cpuAllocated: 4000, memory: 17179869184, ephemeral: 107374182400 } + const cluster = makeCluster([ + { cpu: 8000n, cpuAllocated: 4000n, memory: 17179869184n, ephemeral: 107374182400n }, + { cpu: 8000n, cpuAllocated: 4000n, memory: 17179869184n, ephemeral: 107374182400n } ]); const units = makeResourceUnits({ cpu: 3000n, memory: 1073741824n, ephemeral: 5368709120n, count: 2 }); - expect(service.match(provider, units).matched).toBe(true); + expect(service.match(cluster, units).matched).toBe(true); }); it("accounts for GPU allocation on nodes", () => { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([ + const cluster = makeCluster([ { - cpu: 16000, - memory: 34359738368, - ephemeral: 107374182400, - gpuCount: 2, - gpuAllocated: 1, + cpu: 16000n, + memory: 34359738368n, + ephemeral: 107374182400n, + gpuCount: 2n, + gpuAllocated: 1n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "2235", interface: "PCIe", memorySize: "80Gi" } @@ -761,57 +761,52 @@ describe(ClusterInventoryMatcherService.name, () => { gpuAttributes: [{ key: "vendor/nvidia/model/a100", value: "true" }] }); - expect(service.match(provider, units).matched).toBe(false); + expect(service.match(cluster, units).matched).toBe(false); }); }); describe("boundary-value precision (4-node topology)", () => { - // Mirrors Go multipleReplicasGenNodes: - // node0: avail CPU 68780 (119800 - 51020), no GPU - // node1: avail CPU 68800 (119800 - 51000), 2x nvidia a100 - // node2: avail CPU 119525 (119800 - 275), no GPU - // node3: avail CPU 119495 (119800 - 305), no GPU const MEM_16GI = 17179869184n; const STORAGE_8GI = 8589934592n; - function makeFourNodeProvider() { - return makeProvider([ + function makeFourNodeCluster() { + return makeCluster([ { - cpu: 119800, - cpuAllocated: 51020, - memory: 457317732352, - memoryAllocated: 17495527424, - ephemeral: 7760751097705, - ephemeralAllocated: 8589934592 + cpu: 119800n, + cpuAllocated: 51020n, + memory: 457317732352n, + memoryAllocated: 17495527424n, + ephemeral: 7760751097705n, + ephemeralAllocated: 8589934592n }, { - cpu: 119800, - cpuAllocated: 51000, - memory: 457317732352, - memoryAllocated: 17495527424, - ephemeral: 7760751097705, - ephemeralAllocated: 8589934592, - gpuCount: 2, + cpu: 119800n, + cpuAllocated: 51000n, + memory: 457317732352n, + memoryAllocated: 17495527424n, + ephemeral: 7760751097705n, + ephemeralAllocated: 8589934592n, + gpuCount: 2n, gpuInfo: [ { vendor: "nvidia", name: "a100", modelId: "20b5", interface: "PCIe", memorySize: "80Gi" }, { vendor: "nvidia", name: "a100", modelId: "20b5", interface: "PCIe", memorySize: "80Gi" } ] }, { - cpu: 119800, - cpuAllocated: 275, - memory: 457317732352, - memoryAllocated: 17495527424, - ephemeral: 7760751097705, - ephemeralAllocated: 0 + cpu: 119800n, + cpuAllocated: 275n, + memory: 457317732352n, + memoryAllocated: 17495527424n, + ephemeral: 7760751097705n, + ephemeralAllocated: 0n }, { - cpu: 119800, - cpuAllocated: 305, - memory: 457317732352, - memoryAllocated: 17495527424, - ephemeral: 7760751097705, - ephemeralAllocated: 0 + cpu: 119800n, + cpuAllocated: 305n, + memory: 457317732352n, + memoryAllocated: 17495527424n, + ephemeral: 7760751097705n, + ephemeralAllocated: 0n } ]); } @@ -829,32 +824,32 @@ describe(ClusterInventoryMatcherService.name, () => { it("places 100000 CPU x 2 replicas on the two largest nodes", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(100000n, 0n, 2)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(100000n, 0n, 2)).matched).toBe(true); }); it("places 68780 CPU x 4 replicas across all four nodes", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(68780n, 0n, 4)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(68780n, 0n, 4)).matched).toBe(true); }); it("places 68800 CPU x 3 replicas skipping node 0", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(68800n, 0n, 3)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(68800n, 0n, 3)).matched).toBe(true); }); it("places 119495 CPU x 2 replicas on nodes 2 and 3", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(119495n, 0n, 2)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(119495n, 0n, 2)).matched).toBe(true); }); it("places 68780 CPU x 1 replica on node 0 at exact boundary", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(68780n, 0n, 1)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(68780n, 0n, 1)).matched).toBe(true); }); it("places 68780 CPU + 1 GPU x 1 replica on GPU node", () => { const service = new ClusterInventoryMatcherService(); - expect(service.match(makeFourNodeProvider(), makeFourNodeUnits(68780n, 1n, 1)).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), makeFourNodeUnits(68780n, 1n, 1)).matched).toBe(true); }); it("places multi-group reservation (CPU-only + GPU) across nodes", () => { @@ -898,26 +893,26 @@ describe(ClusterInventoryMatcherService.name, () => { count: 1 }; - expect(service.match(makeFourNodeProvider(), [cpuGroup, gpuGroup]).matched).toBe(true); + expect(service.match(makeFourNodeCluster(), [cpuGroup, gpuGroup]).matched).toBe(true); }); it("rejects 70000 CPU x 4 replicas (only 2 nodes large enough)", () => { const service = new ClusterInventoryMatcherService(); - const result = service.match(makeFourNodeProvider(), makeFourNodeUnits(70000n, 0n, 4)); + const result = service.match(makeFourNodeCluster(), makeFourNodeUnits(70000n, 0n, 4)); expect(result.matched).toBe(false); expect(result.error).toBe("INSUFFICIENT_CAPACITY"); }); it("rejects 100000 CPU x 3 replicas (only 2 nodes large enough)", () => { const service = new ClusterInventoryMatcherService(); - const result = service.match(makeFourNodeProvider(), makeFourNodeUnits(100000n, 0n, 3)); + const result = service.match(makeFourNodeCluster(), makeFourNodeUnits(100000n, 0n, 3)); expect(result.matched).toBe(false); expect(result.error).toBe("INSUFFICIENT_CAPACITY"); }); it("rejects 119525 CPU x 2 replicas (only 1 node large enough)", () => { const service = new ClusterInventoryMatcherService(); - const result = service.match(makeFourNodeProvider(), makeFourNodeUnits(119525n, 0n, 2)); + const result = service.match(makeFourNodeCluster(), makeFourNodeUnits(119525n, 0n, 2)); expect(result.matched).toBe(false); expect(result.error).toBe("INSUFFICIENT_CAPACITY"); }); @@ -925,64 +920,52 @@ describe(ClusterInventoryMatcherService.name, () => { function setup(input: { requestedCpu?: bigint; requestedMemory?: bigint }) { const service = new ClusterInventoryMatcherService(); - const provider = makeProvider([{ cpu: 8000, memory: 17179869184, ephemeral: 107374182400 }]); + const cluster = makeCluster([{ cpu: 8000n, memory: 17179869184n, ephemeral: 107374182400n }]); const resourceUnits = makeResourceUnits({ cpu: input.requestedCpu ?? 1000n, memory: input.requestedMemory ?? 1073741824n, ephemeral: 5368709120n, count: 1 }); - return { service, provider, resourceUnits }; + return { service, cluster, resourceUnits }; } }); -function makeProvider( +function makeCluster( nodes: { - cpu: number; - memory: number; - ephemeral: number; - cpuAllocated?: number; - memoryAllocated?: number; - ephemeralAllocated?: number; - gpuCount?: number; - gpuAllocated?: number; + cpu: bigint; + memory: bigint; + ephemeral: bigint; + cpuAllocated?: bigint; + memoryAllocated?: bigint; + ephemeralAllocated?: bigint; + gpuCount?: bigint; + gpuAllocated?: bigint; gpuInfo?: GpuInfo[]; storageClasses?: string[]; - cpus?: { vendor: string; model: string; vcores: number }[]; + cpus?: CpuInfo[]; }[], - storage?: { class: string; allocatable: number; allocated: number }[] -): ProviderWithSnapshot { - return { - owner: "akash1test", - hostUri: "https://test.example.com:8443", - ipRegion: null, - uptime7d: null, - lastSuccessfulSnapshot: { - nodes: nodes.map((n, i) => ({ - name: `node${i}`, - cpuAllocatable: n.cpu, - cpuAllocated: n.cpuAllocated ?? 0, - memoryAllocatable: n.memory, - memoryAllocated: n.memoryAllocated ?? 0, - ephemeralStorageAllocatable: n.ephemeral, - ephemeralStorageAllocated: n.ephemeralAllocated ?? 0, - gpuAllocatable: n.gpuCount ?? 0, - gpuAllocated: n.gpuAllocated ?? 0, - capabilitiesStorageHDD: n.storageClasses?.includes("beta1") ?? false, - capabilitiesStorageSSD: n.storageClasses?.includes("beta2") ?? false, - capabilitiesStorageNVME: n.storageClasses?.includes("beta3") ?? false, - gpus: (n.gpuInfo ?? []).map(g => ({ - vendor: g.vendor, - name: g.name, - modelId: g.modelId, - interface: g.interface, - memorySize: g.memorySize - })), - cpus: n.cpus ?? [] - })), - storage: storage ?? [] - } - }; + storage?: { class: string; allocatable: bigint; allocated: bigint }[] +): ClusterState { + const storageMap: ClusterState["storage"] = Object.create(null); + for (const pool of storage ?? []) { + storageMap[pool.class] = { class: pool.class, quantity: new ResourcePair(pool.allocatable, pool.allocated) }; + } + + const stateNodes: NodeState[] = nodes.map((n, i) => ({ + name: `node${i}`, + cpu: new ResourcePair(n.cpu, n.cpuAllocated ?? 0n), + memory: new ResourcePair(n.memory, n.memoryAllocated ?? 0n), + ephemeralStorage: new ResourcePair(n.ephemeral, n.ephemeralAllocated ?? 0n), + gpu: { + quantity: new ResourcePair(n.gpuCount ?? 0n, n.gpuAllocated ?? 0n), + info: n.gpuInfo ?? [] + }, + storageClasses: n.storageClasses ?? [], + cpus: n.cpus ?? [] + })); + + return { nodes: stateNodes, storage: storageMap }; } function makeResourceUnits(input: { diff --git a/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.ts b/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.ts index a5f515fd6..a3ca9feda 100644 --- a/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.ts +++ b/apps/provider-inventory/src/services/cluster-inventory-matcher/cluster-inventory-matcher.service.ts @@ -1,7 +1,5 @@ import { singleton } from "tsyringe"; -import { mapSnapshotToInventory } from "@src/lib/inventory-mapper/inventory-mapper"; -import type { ProviderWithSnapshot } from "@src/types/provider"; import { matchesGPU, type ParsedGPUAttributes, parseGPUAttributes } from "../../lib/gpu-attribute-parser/gpu-attribute-parser"; import { parseStorageAttributes } from "../../lib/storage-attribute-parser/storage-attribute-parser"; import type { ClusterState, MatchResult, NodeState, RequestedResourceUnit, ResourceAttribute } from "../../types/inventory.types"; @@ -19,9 +17,8 @@ const GROUP_MISMATCH = Object.freeze({ matched: false, error: "GROUP_RESOURCE_MI @singleton() export class ClusterInventoryMatcherService { - match(provider: ProviderWithSnapshot, resourceUnits: RequestedResourceUnit[]): MatchResult { - const inventory = mapSnapshotToInventory(provider); - return this.#adjust(inventory, resourceUnits); + match(cluster: ClusterState, resourceUnits: RequestedResourceUnit[]): MatchResult { + return this.#adjust(cloneCluster(cluster), resourceUnits); } #adjust(cluster: ClusterState, resourceUnits: RequestedResourceUnit[]): MatchResult { @@ -190,6 +187,16 @@ export class ClusterInventoryMatcherService { } } +function cloneCluster(cluster: ClusterState): ClusterState { + return { + nodes: cluster.nodes.map(copyNode), + storage: Object.values(cluster.storage).reduce((acc, pool) => { + acc[pool.class] = { class: pool.class, quantity: pool.quantity.clone() }; + return acc; + }, Object.create(null)) + }; +} + function copyNode(node: NodeState): NodeState { return { name: node.name, diff --git a/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts index 700f09182..296ed01f4 100644 --- a/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts +++ b/apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts @@ -149,7 +149,11 @@ describe(DiscoverySchedulerService.name, () => { LOG_LEVEL: "info", STD_OUT_LOG_FORMAT: "json", PORT: 3092, - DISCOVERY_INTERVAL_MS: 600_000 + DISCOVERY_INTERVAL_MS: 600_000, + STREAM_RECONNECT_INITIAL_DELAY_MS: 1_000, + STREAM_RECONNECT_MAX_DELAY_MS: 300_000, + STREAM_FIRST_MESSAGE_TIMEOUT_MS: 10_000, + REST_API_NODE_URL: "http://localhost:1317" }; if (input?.pollError) { diff --git a/apps/provider-inventory/src/services/provider-stream-factory/provider-stream-factory.sevice.ts b/apps/provider-inventory/src/services/provider-stream-factory/provider-stream-factory.sevice.ts new file mode 100644 index 000000000..e4fe82253 --- /dev/null +++ b/apps/provider-inventory/src/services/provider-stream-factory/provider-stream-factory.sevice.ts @@ -0,0 +1,25 @@ +import { createProviderSDK } from "@akashnetwork/chain-sdk"; +import { singleton } from "tsyringe"; + +import { mapClusterToStreamStatus } from "@src/lib/stream-status-mapper/stream-status-mapper"; +import { ChainProvider } from "@src/types/chain-provider"; +import type { ClusterState } from "@src/types/inventory.types"; + +@singleton() +export class ProviderStreamFactory { + async *openStatusStream(provider: ChainProvider, signal: AbortSignal): AsyncIterable { + const url = new URL(provider.hostUri); + url.port = "8444"; + const sdk = createProviderSDK({ + baseUrl: url.toString() + }); + + const stream = await sdk.akash.provider.v1.streamStatus({}, { signal }); + for await (const status of stream) { + const inventory = status.cluster?.inventory?.cluster; + if (inventory) { + yield mapClusterToStreamStatus(inventory); + } + } + } +} diff --git a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts index 096b8067d..e99c2c417 100644 --- a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts +++ b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.spec.ts @@ -3,12 +3,13 @@ import { setTimeout as delay } from "node:timers/promises"; import { describe, expect, it, vi } from "vitest"; import { mock, type MockProxy } from "vitest-mock-extended"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import type { EnvConfig } from "@src/providers/app-config.provider"; import type { LoggerFactory } from "@src/providers/logger-factory.provider"; -import type { ProviderStreamFactory } from "@src/providers/provider-stream.provider"; import type { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; import type { ChainProvider } from "@src/types/chain-provider"; -import type { StreamStatusMessage } from "@src/types/stream-status"; +import type { ClusterState, NodeState } from "@src/types/inventory.types"; +import type { ProviderStreamFactory } from "../provider-stream-factory/provider-stream-factory.sevice"; import { StreamLifecycleManagerService } from "./stream-lifecycle-manager.service"; describe(StreamLifecycleManagerService.name, () => { @@ -19,12 +20,12 @@ describe(StreamLifecycleManagerService.name, () => { manager.start(provider); - expect(streamFactory.openStatusStream).toHaveBeenCalledWith(provider.hostUri, expect.any(AbortSignal)); + expect(streamFactory.openStatusStream).toHaveBeenCalledWith(provider, expect.any(AbortSignal)); }); it("calls updateInventory for each unique stream message", async () => { const provider = createProvider(); - const messages = [createMessage({ cpu: 1000 }), createMessage({ cpu: 2000 })]; + const messages = [createMessage({ cpu: 1000n }), createMessage({ cpu: 2000n })]; const { manager, writer } = setup({ streams: { "https://p1:8443": msgsThenHang(messages) } }); manager.start(provider); @@ -70,7 +71,7 @@ describe(StreamLifecycleManagerService.name, () => { manager.restart(updated); expect(oldSignal.aborted).toBe(true); - expect(streamFactory.openStatusStream).toHaveBeenLastCalledWith("https://new:8443", expect.any(AbortSignal)); + expect(streamFactory.openStatusStream).toHaveBeenLastCalledWith(updated, expect.any(AbortSignal)); expect(writer.deleteByOwner).not.toHaveBeenCalled(); }); }); @@ -93,9 +94,8 @@ describe(StreamLifecycleManagerService.name, () => { describe("diff cache", () => { it("performs a single write when two consecutive identical messages arrive", async () => { - const message = createMessage({ cpu: 1000 }); const { manager, writer, logger } = setup({ - streams: { "https://p1:8443": msgsThenHang([message, structuredClone(message)]) } + streams: { "https://p1:8443": msgsThenHang([createMessage({ cpu: 1000n }), createMessage({ cpu: 1000n })]) } }); manager.start(createProvider()); @@ -107,7 +107,7 @@ describe(StreamLifecycleManagerService.name, () => { it("writes again when any meaningful field differs", async () => { const { manager, writer } = setup({ streams: { - "https://p1:8443": msgsThenHang([createMessage({ cpu: 1000 }), createMessage({ cpu: 1001 })]) + "https://p1:8443": msgsThenHang([createMessage({ cpu: 1000n }), createMessage({ cpu: 1001n })]) } }); manager.start(createProvider()); @@ -115,48 +115,11 @@ describe(StreamLifecycleManagerService.name, () => { await vi.waitFor(() => expect(writer.updateInventory).toHaveBeenCalledTimes(2)); }); - it("considers messages with reordered nodes/gpus/storage equal", async () => { - const first: StreamStatusMessage = { - nodes: [ - { - name: "node-1", - cpuAvailable: 4000, - memoryAvailable: 8_000_000_000, - gpus: [ - { vendor: "nvidia", model: "a100", available: 1 }, - { vendor: "amd", model: "mi300x", available: 2 } - ], - ephStorageAvailable: 100_000_000_000, - persistentStorage: [ - { class: "beta2", available: 100 }, - { class: "beta3", available: 200 } - ] - }, - { - name: "node-2", - cpuAvailable: 2000, - memoryAvailable: 4_000_000_000, - gpus: [], - ephStorageAvailable: 50_000_000_000, - persistentStorage: [] - } - ], - storage: [ - { class: "beta2", available: 500 }, - { class: "beta3", available: 600 } - ] - }; - const reordered: StreamStatusMessage = { - nodes: [ - first.nodes[1], - { - ...first.nodes[0], - gpus: [...first.nodes[0].gpus].reverse(), - persistentStorage: [...first.nodes[0].persistentStorage].reverse() - } - ], - storage: [...first.storage].reverse() - }; + it("considers messages with reordered nodes equal", async () => { + const nodeA = buildNode({ name: "node-a", cpu: new ResourcePair(4000n, 0n) }); + const nodeB = buildNode({ name: "node-b", cpu: new ResourcePair(2000n, 0n) }); + const first: ClusterState = { nodes: [nodeA, nodeB], storage: Object.create(null) }; + const reordered: ClusterState = { nodes: [nodeB, nodeA], storage: Object.create(null) }; const { manager, writer, logger } = setup({ streams: { "https://p1:8443": msgsThenHang([first, reordered]) } @@ -179,9 +142,8 @@ describe(StreamLifecycleManagerService.name, () => { }); it("does not cache a row when the write fails — next identical message retries", async () => { - const message = createMessage(); const { manager, writer } = setup({ - streams: { "https://p1:8443": msgsThenHang([message, structuredClone(message)]) } + streams: { "https://p1:8443": msgsThenHang([createMessage(), createMessage()]) } }); writer.updateInventory.mockRejectedValueOnce(new Error("DB down")); manager.start(createProvider()); @@ -192,12 +154,10 @@ describe(StreamLifecycleManagerService.name, () => { it("isolates caches across providers", async () => { const providerA = createProvider({ owner: "a", hostUri: "https://a:8443" }); const providerB = createProvider({ owner: "b", hostUri: "https://b:8443" }); - const messageA = createMessage({ cpu: 1000 }); - const messageB = createMessage({ cpu: 1000 }); const { manager, writer, logger } = setup({ streams: { - "https://a:8443": msgsThenHang([messageA, structuredClone(messageA)]), - "https://b:8443": msgsThenHang([messageB]) + "https://a:8443": msgsThenHang([createMessage({ cpu: 1000n }), createMessage({ cpu: 1000n })]), + "https://b:8443": msgsThenHang([createMessage({ cpu: 1000n })]) } }); manager.start(providerA); @@ -214,7 +174,7 @@ describe(StreamLifecycleManagerService.name, () => { describe("error handling", () => { it("logs and continues when updateInventory throws", async () => { - const messages = [createMessage({ cpu: 1000 }), createMessage({ cpu: 2000 })]; + const messages = [createMessage({ cpu: 1000n }), createMessage({ cpu: 2000n })]; const { manager, writer, logger } = setup({ streams: { "https://p1:8443": msgsThenHang(messages) } }); writer.updateInventory.mockRejectedValueOnce(new Error("DB down")); manager.start(createProvider()); @@ -242,7 +202,7 @@ describe(StreamLifecycleManagerService.name, () => { describe("stale finalizer", () => { it("does not evict a replacement stream's registry entry when the old stream finishes", async () => { let resolveOldStream!: () => void; - const oldStreamPromise = new Promise>(r => { + const oldStreamPromise = new Promise>(r => { resolveOldStream = () => r({ done: true, value: undefined }); }); const provider = createProvider(); @@ -301,21 +261,20 @@ describe(StreamLifecycleManagerService.name, () => { }); it("clears diff cache on shutdown", async () => { - const stable = createMessage({ cpu: 1000 }); - const { manager, streamFactory, writer } = setup({ streams: { "https://p1:8443": msgsThenHang([stable]) } }); + const { manager, streamFactory, writer } = setup({ streams: { "https://p1:8443": msgsThenHang([createMessage({ cpu: 1000n })]) } }); manager.start(createProvider()); await vi.waitFor(() => expect(writer.updateInventory).toHaveBeenCalledTimes(1)); manager.shutdown(); - streamFactory.openStatusStream.mockReturnValue(msgsThenHang([stable])); + streamFactory.openStatusStream.mockReturnValue(msgsThenHang([createMessage({ cpu: 1000n })])); manager.start(createProvider()); await vi.waitFor(() => expect(writer.updateInventory).toHaveBeenCalledTimes(2)); }); }); - function setup(input?: { streams?: Record | "hang">; streamFactory?: MockProxy }) { + function setup(input?: { streams?: Record | "hang">; streamFactory?: MockProxy }) { const streamFactory = input?.streamFactory ?? mock(); const writer = mock(); writer.deleteByOwner.mockResolvedValue(); @@ -331,8 +290,8 @@ describe(StreamLifecycleManagerService.name, () => { if (!input?.streamFactory) { const streams = input?.streams ?? {}; - streamFactory.openStatusStream.mockImplementation((hostUri: string, signal: AbortSignal) => { - const value = streams[hostUri]; + streamFactory.openStatusStream.mockImplementation((provider: ChainProvider, signal: AbortSignal) => { + const value = streams[provider.hostUri]; if (value === "hang") return hangingStream(signal); if (value) return value; return hangingStream(signal); @@ -356,38 +315,49 @@ function createProvider(overrides?: Partial): ChainProvider { }; } -function createMessage(overrides?: { cpu?: number }): StreamStatusMessage { +function createMessage(overrides?: { cpu?: bigint }): ClusterState { return { nodes: [ - { + buildNode({ name: "node-1", - cpuAvailable: overrides?.cpu ?? 4000, - memoryAvailable: 8_000_000_000, - gpus: [], - ephStorageAvailable: 100_000_000_000, - persistentStorage: [] - } + cpu: new ResourcePair(overrides?.cpu ?? 4000n, 0n), + memory: new ResourcePair(8_000_000_000n, 0n), + ephemeralStorage: new ResourcePair(100_000_000_000n, 0n) + }) ], - storage: [] + storage: Object.create(null) + }; +} + +function buildNode(overrides?: Partial): NodeState { + return { + name: "node-1", + cpu: new ResourcePair(0n, 0n), + memory: new ResourcePair(0n, 0n), + ephemeralStorage: new ResourcePair(0n, 0n), + gpu: { quantity: new ResourcePair(0n, 0n), info: [] }, + storageClasses: [], + cpus: [], + ...overrides }; } -async function* msgsThenHang(messages: StreamStatusMessage[]): AsyncGenerator { +async function* msgsThenHang(messages: ClusterState[]): AsyncGenerator { for (const msg of messages) yield msg; await new Promise(() => undefined); } -async function* throwingStream(error: Error): AsyncGenerator { +async function* throwingStream(error: Error): AsyncGenerator { yield* []; throw error; } -function hangingStream(signal?: AbortSignal): AsyncIterable { +function hangingStream(signal?: AbortSignal): AsyncIterable { return { [Symbol.asyncIterator]() { return { next: () => - new Promise>((_, reject) => { + new Promise>((_, reject) => { if (!signal) return; if (signal.aborted) reject(new DOMException("aborted", "AbortError")); signal.addEventListener("abort", () => reject(new DOMException("aborted", "AbortError")), { once: true }); @@ -397,7 +367,7 @@ function hangingStream(signal?: AbortSignal): AsyncIterable }; } -function captureSignal(streamFactory: { openStatusStream: { mock: { calls: Array<[string, AbortSignal]> } } }, callIndex?: number): AbortSignal { +function captureSignal(streamFactory: { openStatusStream: { mock: { calls: Array<[ChainProvider, AbortSignal]> } } }, callIndex?: number): AbortSignal { const idx = callIndex ?? streamFactory.openStatusStream.mock.calls.length - 1; return streamFactory.openStatusStream.mock.calls[idx][1]; } diff --git a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts index f2a4d1547..2a814a921 100644 --- a/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts +++ b/apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts @@ -10,11 +10,11 @@ import type { EnvConfig } from "@src/providers/app-config.provider"; import { APP_CONFIG } from "@src/providers/app-config.provider"; import type { LoggerFactory } from "@src/providers/logger-factory.provider"; import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; -import type { ProviderStreamFactory } from "@src/providers/provider-stream.provider"; -import { PROVIDER_STREAM_FACTORY } from "@src/providers/provider-stream.provider"; import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; import type { ChainProvider } from "@src/types/chain-provider"; import type { ProjectedRow } from "@src/types/inventory"; +import type { ClusterState } from "@src/types/inventory.types"; +import { ProviderStreamFactory } from "../provider-stream-factory/provider-stream-factory.sevice"; @singleton() export class StreamLifecycleManagerService { @@ -33,7 +33,7 @@ export class StreamLifecycleManagerService { readonly #retryStreamPolicy: RetryPolicy; constructor( - @inject(PROVIDER_STREAM_FACTORY) streamFactory: ProviderStreamFactory, + streamFactory: ProviderStreamFactory, @inject(ProviderInventoryRepository) writer: ProviderInventoryRepository, @inject(LOGGER_FACTORY) loggerFactory: LoggerFactory, @inject(APP_CONFIG) config: EnvConfig @@ -125,7 +125,7 @@ export class StreamLifecycleManagerService { }, this.#config.STREAM_FIRST_MESSAGE_TIMEOUT_MS); try { - const stream = this.#streamFactory.openStatusStream(provider.hostUri, composite); + const stream = this.#streamFactory.openStatusStream(provider, composite); for await (const message of stream) { if (outerSignal.aborted) return; @@ -154,7 +154,7 @@ export class StreamLifecycleManagerService { } } - async #applyMessage(provider: ChainProvider, message: Parameters[0]): Promise { + async #applyMessage(provider: ChainProvider, message: ClusterState): Promise { const row = projectRow(message); const cached = this.#lastInventoryPerProvider.get(provider.owner); diff --git a/apps/provider-inventory/src/types/inventory.ts b/apps/provider-inventory/src/types/inventory.ts index 0e7198d61..841db9b8c 100644 --- a/apps/provider-inventory/src/types/inventory.ts +++ b/apps/provider-inventory/src/types/inventory.ts @@ -1,32 +1,4 @@ -export interface InventoryNodeGpu { - vendor: string; - model: string; - available: number; -} - -export interface InventoryNodeStorage { - class: string; - available: number; -} - -export interface InventoryNode { - name: string; - cpu: { available: number }; - memory: { available: number }; - gpu: InventoryNodeGpu[]; - ephStorage: { available: number }; - persistentStorage: InventoryNodeStorage[]; -} - -export interface InventoryClusterStorage { - class: string; - available: number; -} - -export interface Inventory { - nodes: InventoryNode[]; - storage: InventoryClusterStorage[]; -} +import type { ClusterState } from "./inventory.types"; export interface InventoryRollups { totalAvailableCpu: bigint; @@ -42,5 +14,5 @@ export interface InventoryRollups { } export interface ProjectedRow extends InventoryRollups { - inventory: Inventory; + cluster: ClusterState; } diff --git a/apps/provider-inventory/src/types/inventory.types.ts b/apps/provider-inventory/src/types/inventory.types.ts index 51f1813c2..671680388 100644 --- a/apps/provider-inventory/src/types/inventory.types.ts +++ b/apps/provider-inventory/src/types/inventory.types.ts @@ -13,11 +13,6 @@ export interface CpuInfo { model: string; } -export interface ResourcePairState { - allocatable: bigint; - allocated: bigint; -} - export interface NodeState { name: string; cpu: ResourcePair; diff --git a/apps/provider-inventory/src/types/provider.ts b/apps/provider-inventory/src/types/provider.ts index 16005f55e..07497a7f7 100644 --- a/apps/provider-inventory/src/types/provider.ts +++ b/apps/provider-inventory/src/types/provider.ts @@ -1,39 +1,9 @@ -export interface ProviderWithSnapshot { +import type { ClusterState } from "./inventory.types"; + +export interface ProviderWithClusterState { owner: string; hostUri: string; ipRegion?: string | null; uptime7d?: number | null; - lastSuccessfulSnapshot: { - nodes: { - name: string; - cpuAllocatable: number; - cpuAllocated: number; - memoryAllocatable: number; - memoryAllocated: number; - ephemeralStorageAllocatable: number; - ephemeralStorageAllocated: number; - gpuAllocatable: number; - gpuAllocated: number; - capabilitiesStorageHDD: boolean; - capabilitiesStorageSSD: boolean; - capabilitiesStorageNVME: boolean; - gpus: { - vendor: string; - name: string; - modelId: string; - interface: string; - memorySize: string; - }[]; - cpus: { - vendor: string; - model: string; - vcores: number; - }[]; - }[]; - storage: { - class: string; - allocatable: number; - allocated: number; - }[]; - }; + cluster: ClusterState; } diff --git a/apps/provider-inventory/src/types/stream-status.ts b/apps/provider-inventory/src/types/stream-status.ts deleted file mode 100644 index c8f0c96ff..000000000 --- a/apps/provider-inventory/src/types/stream-status.ts +++ /dev/null @@ -1,29 +0,0 @@ -export interface StreamStatusNodeGpu { - vendor: string; - model: string; - available: number; -} - -export interface StreamStatusNodeStorage { - class: string; - available: number; -} - -export interface StreamStatusNode { - name: string; - cpuAvailable: number; - memoryAvailable: number; - gpus: StreamStatusNodeGpu[]; - ephStorageAvailable: number; - persistentStorage: StreamStatusNodeStorage[]; -} - -export interface StreamStatusClusterStorage { - class: string; - available: number; -} - -export interface StreamStatusMessage { - nodes: StreamStatusNode[]; - storage: StreamStatusClusterStorage[]; -} diff --git a/apps/provider-inventory/test/functional/discovery-pipeline.spec.ts b/apps/provider-inventory/test/functional/discovery-pipeline.spec.ts index 6f4817f5b..f65b6dde9 100644 --- a/apps/provider-inventory/test/functional/discovery-pipeline.spec.ts +++ b/apps/provider-inventory/test/functional/discovery-pipeline.spec.ts @@ -2,17 +2,17 @@ import { container, Lifecycle } from "tsyringe"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { mock } from "vitest-mock-extended"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import type { ChainQueryClient } from "@src/providers/chain-query.provider"; import { CHAIN_QUERY_CLIENT } from "@src/providers/chain-query.provider"; import { PG_CLIENT } from "@src/providers/postgres.provider"; -import type { ProviderStreamFactory } from "@src/providers/provider-stream.provider"; -import { PROVIDER_STREAM_FACTORY } from "@src/providers/provider-stream.provider"; import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; import { ChainProviderPollerService } from "@src/services/chain-provider-poller/chain-provider-poller.service"; import { DiscoverySchedulerService } from "@src/services/discovery-scheduler/discovery-scheduler.service"; +import { ProviderStreamFactory } from "@src/services/provider-stream-factory/provider-stream-factory.sevice"; import { StreamLifecycleManagerService } from "@src/services/stream-lifecycle-manager/stream-lifecycle-manager.service"; import type { ChainProvider } from "@src/types/chain-provider"; -import type { StreamStatusMessage } from "@src/types/stream-status"; +import type { ClusterState, NodeState } from "@src/types/inventory.types"; import { testDb } from "../setup-functional-tests"; describe("DiscoveryScheduler pipeline (functional)", () => { @@ -66,16 +66,14 @@ describe("DiscoveryScheduler pipeline (functional)", () => { queueStreamMessages("https://a:8443", [ { nodes: [ - { + buildNode({ name: "n1", - cpuAvailable: 4000, - memoryAvailable: 8_000_000_000, - gpus: [], - ephStorageAvailable: 100_000_000_000, - persistentStorage: [] - } + cpu: new ResourcePair(4000n, 0n), + memory: new ResourcePair(8_000_000_000n, 0n), + ephemeralStorage: new ResourcePair(100_000_000_000n, 0n) + }) ], - storage: [] + storage: Object.create(null) } ]); @@ -106,7 +104,7 @@ describe("DiscoveryScheduler pipeline (functional)", () => { const openedHosts: string[] = []; const abortedHosts: string[] = []; - const queuedMessages = new Map(); + const queuedMessages = new Map(); let pollError: Error | null = null; let providers: ChainProvider[] = input.providers ?? []; @@ -125,16 +123,16 @@ describe("DiscoveryScheduler pipeline (functional)", () => { chainClient.getAllProvidersAttributes.mockImplementation(() => Promise.resolve(providers.map(p => ({ owner: p.owner, attributes: p.signedAttributes })))); const streamFactory = mock(); - streamFactory.openStatusStream.mockImplementation((hostUri: string, signal: AbortSignal) => { - openedHosts.push(hostUri); - signal.addEventListener("abort", () => abortedHosts.push(hostUri), { once: true }); - const messages = queuedMessages.get(hostUri) ?? []; - queuedMessages.delete(hostUri); + streamFactory.openStatusStream.mockImplementation((provider: ChainProvider, signal: AbortSignal) => { + openedHosts.push(provider.hostUri); + signal.addEventListener("abort", () => abortedHosts.push(provider.hostUri), { once: true }); + const messages = queuedMessages.get(provider.hostUri) ?? []; + queuedMessages.delete(provider.hostUri); return makeStream(messages, signal); }); testContainer.register(CHAIN_QUERY_CLIENT, { useValue: chainClient }); - testContainer.register(PROVIDER_STREAM_FACTORY, { useValue: streamFactory }); + testContainer.register(ProviderStreamFactory, { useValue: streamFactory }); testContainer.register(ProviderInventoryRepository, { useClass: ProviderInventoryRepository }, { lifecycle: Lifecycle.ContainerScoped }); testContainer.register(ChainProviderPollerService, { useClass: ChainProviderPollerService }, { lifecycle: Lifecycle.ContainerScoped }); testContainer.register(StreamLifecycleManagerService, { useClass: StreamLifecycleManagerService }, { lifecycle: Lifecycle.ContainerScoped }); @@ -155,7 +153,7 @@ describe("DiscoveryScheduler pipeline (functional)", () => { const [row] = await pg`SELECT * FROM provider_inventory WHERE owner = ${owner}`; return row; }, - queueStreamMessages(hostUri: string, messages: StreamStatusMessage[]) { + queueStreamMessages(hostUri: string, messages: ClusterState[]) { queuedMessages.set(hostUri, messages); }, setPollError(error: Error | null) { @@ -177,17 +175,30 @@ function createProvider(overrides: Partial & Pick { +function buildNode(overrides?: Partial): NodeState { + return { + name: "node-1", + cpu: new ResourcePair(0n, 0n), + memory: new ResourcePair(0n, 0n), + ephemeralStorage: new ResourcePair(0n, 0n), + gpu: { quantity: new ResourcePair(0n, 0n), info: [] }, + storageClasses: [], + cpus: [], + ...overrides + }; +} + +function makeStream(messages: ClusterState[], signal: AbortSignal): AsyncIterable { return { [Symbol.asyncIterator]() { let index = 0; return { - async next(): Promise> { + async next(): Promise> { if (signal.aborted) return { done: true, value: undefined }; if (index < messages.length) { return { done: false, value: messages[index++] }; } - return new Promise>(resolve => { + return new Promise>(resolve => { const onAbort = () => resolve({ done: true, value: undefined }); signal.addEventListener("abort", onAbort, { once: true }); });