diff --git a/apps/api/src/bid-screening/http-schemas/bid-screening.schema.ts b/apps/api/src/bid-screening/http-schemas/bid-screening.schema.ts index df9428f2e..e550abbe8 100644 --- a/apps/api/src/bid-screening/http-schemas/bid-screening.schema.ts +++ b/apps/api/src/bid-screening/http-schemas/bid-screening.schema.ts @@ -1,3 +1,4 @@ +/* v8 ignore start */ import { z } from "@hono/zod-openapi"; const UIntStringSchema = z.string().regex(/^\d+$/, "Must be an unsigned integer string"); @@ -6,16 +7,37 @@ const ResourceValueSchema = z.object({ val: UIntStringSchema.openapi({ description: "String-encoded integer value", example: "1000" }) }); +// Mirrors AttributeNameRegexpStringWildcard in akash-network/chain-sdk +// (go/node/types/v1beta3/attribute.go) — only trailing `*` is a permitted glob metachar. +const SDL_ATTRIBUTE_KEY_REGEX = /^([a-zA-Z][\w/.-]{1,126}[\w*]?)$/; const AttributeSchema = z.object({ - key: z.string().openapi({ description: "Attribute key", example: "persistent" }), + key: z + .string() + .min(1) + .max(128) + .regex(SDL_ATTRIBUTE_KEY_REGEX, "Invalid attribute key format") + .openapi({ description: "Attribute key", example: "persistent" }), value: z.string().openapi({ description: "Attribute value", example: "false" }) }); -const StorageResourceSchema = z.object({ - name: z.string().openapi({ description: "Storage volume name", example: "default" }), - quantity: ResourceValueSchema, - attributes: z.array(AttributeSchema) -}); +const StorageResourceSchema = z + .object({ + name: z.string().openapi({ description: "Storage volume name", example: "default" }), + quantity: ResourceValueSchema, + attributes: z.array(AttributeSchema) + }) + .superRefine((vol, ctx) => { + const isPersistent = vol.attributes.some(a => a.key === "persistent" && a.value === "true"); + if (!isPersistent) return; + const storageClass = vol.attributes.find(a => a.key === "class")?.value; + if (!storageClass || storageClass === "ram") { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Persistent storage volume "${vol.name}" must specify a valid storage class (not "${storageClass || "empty"}")`, + path: ["attributes"] + }); + } + }); const ResourceSchema = z.object({ id: z.number().int().openapi({ description: "Resource unit ID", example: 1 }), diff --git a/apps/api/test/functional/__snapshots__/docs.spec.ts.snap b/apps/api/test/functional/__snapshots__/docs.spec.ts.snap index 141a37d46..22356f4ba 100644 --- a/apps/api/test/functional/__snapshots__/docs.spec.ts.snap +++ b/apps/api/test/functional/__snapshots__/docs.spec.ts.snap @@ -2759,6 +2759,9 @@ exports[`API Docs > GET /v1/doc > returns docs with all routes expected 1`] = ` "key": { "description": "Attribute key", "example": "persistent", + "maxLength": 128, + "minLength": 1, + "pattern": "^([a-zA-Z][\\w/.-]{1,126}[\\w*]?)$", "type": "string", }, "value": { @@ -2834,6 +2837,9 @@ exports[`API Docs > GET /v1/doc > returns docs with all routes expected 1`] = ` "key": { "description": "Attribute key", "example": "persistent", + "maxLength": 128, + "minLength": 1, + "pattern": "^([a-zA-Z][\\w/.-]{1,126}[\\w*]?)$", "type": "string", }, "value": { @@ -2885,6 +2891,9 @@ exports[`API Docs > GET /v1/doc > returns docs with all routes expected 1`] = ` "key": { "description": "Attribute key", "example": "persistent", + "maxLength": 128, + "minLength": 1, + "pattern": "^([a-zA-Z][\\w/.-]{1,126}[\\w*]?)$", "type": "string", }, "value": { @@ -2935,6 +2944,9 @@ exports[`API Docs > GET /v1/doc > returns docs with all routes expected 1`] = ` "key": { "description": "Attribute key", "example": "persistent", + "maxLength": 128, + "minLength": 1, + "pattern": "^([a-zA-Z][\\w/.-]{1,126}[\\w*]?)$", "type": "string", }, "value": { @@ -2981,6 +2993,9 @@ exports[`API Docs > GET /v1/doc > returns docs with all routes expected 1`] = ` "key": { "description": "Attribute key", "example": "persistent", + "maxLength": 128, + "minLength": 1, + "pattern": "^([a-zA-Z][\\w/.-]{1,126}[\\w*]?)$", "type": "string", }, "value": { diff --git a/apps/provider-inventory/src/http-schemas/bid-screening.schema.ts b/apps/provider-inventory/src/http-schemas/bid-screening.schema.ts index df9428f2e..05ea09749 100644 --- a/apps/provider-inventory/src/http-schemas/bid-screening.schema.ts +++ b/apps/provider-inventory/src/http-schemas/bid-screening.schema.ts @@ -6,16 +6,37 @@ const ResourceValueSchema = z.object({ val: UIntStringSchema.openapi({ description: "String-encoded integer value", example: "1000" }) }); +// Mirrors AttributeNameRegexpStringWildcard in akash-network/chain-sdk +// (go/node/types/v1beta3/attribute.go) — only trailing `*` is a permitted glob metachar. +const SDL_ATTRIBUTE_KEY_REGEX = /^([a-zA-Z][\w/.-]{1,126}[\w*]?)$/; const AttributeSchema = z.object({ - key: z.string().openapi({ description: "Attribute key", example: "persistent" }), + key: z + .string() + .min(1) + .max(128) + .regex(SDL_ATTRIBUTE_KEY_REGEX, "Invalid attribute key format") + .openapi({ description: "Attribute key", example: "persistent" }), value: z.string().openapi({ description: "Attribute value", example: "false" }) }); -const StorageResourceSchema = z.object({ - name: z.string().openapi({ description: "Storage volume name", example: "default" }), - quantity: ResourceValueSchema, - attributes: z.array(AttributeSchema) -}); +const StorageResourceSchema = z + .object({ + name: z.string().openapi({ description: "Storage volume name", example: "default" }), + quantity: ResourceValueSchema, + attributes: z.array(AttributeSchema) + }) + .superRefine((vol, ctx) => { + const isPersistent = vol.attributes.some(a => a.key === "persistent" && a.value === "true"); + if (!isPersistent) return; + const storageClass = vol.attributes.find(a => a.key === "class")?.value; + if (!storageClass || storageClass === "ram") { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: `Persistent storage volume "${vol.name}" must specify a valid storage class (not "${storageClass || "empty"}")`, + path: ["attributes"] + }); + } + }); const ResourceSchema = z.object({ id: z.number().int().openapi({ description: "Resource unit ID", example: 1 }), diff --git a/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.spec.ts b/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.spec.ts new file mode 100644 index 000000000..22cf22b81 --- /dev/null +++ b/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.spec.ts @@ -0,0 +1,70 @@ +import { describe, expect, it } from "vitest"; + +import { hydrateClusterState } from "./hydrate-cluster-state"; + +describe(hydrateClusterState.name, () => { + it("rebuilds nodes with ResourcePair instances from persisted plain objects", () => { + const cluster = hydrateClusterState({ + nodes: [ + { + name: "node1", + cpu: { allocatable: 8000, allocated: 2000 }, + memory: { allocatable: 17179869184, allocated: 4294967296 }, + ephemeralStorage: { allocatable: 107374182400, allocated: 0 }, + gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, + storageClasses: ["beta2"], + cpus: [] + } + ], + storage: {} + }); + + const node = cluster.nodes[0]; + expect(node.name).toBe("node1"); + expect(node.cpu.allocatable).toBe(8000n); + expect(node.cpu.allocated).toBe(2000n); + expect(node.memory.allocatable).toBe(17179869184n); + expect(node.memory.allocated).toBe(4294967296n); + expect(node.ephemeralStorage.allocatable).toBe(107374182400n); + expect(node.ephemeralStorage.allocated).toBe(0n); + expect(node.storageClasses).toEqual(["beta2"]); + }); + + it("hydrates cluster storage pools as ResourcePair entries keyed by class", () => { + const cluster = hydrateClusterState({ + nodes: [], + storage: { beta2: { class: "beta2", quantity: { allocatable: 536870912000, allocated: 0 } } } + }); + + const beta2 = cluster.storage["beta2"]; + expect(beta2.class).toBe("beta2"); + expect(beta2.quantity.allocatable).toBe(536870912000n); + expect(beta2.quantity.allocated).toBe(0n); + }); + + it("preserves bigint magnitudes that exceed Number.MAX_SAFE_INTEGER", () => { + const unsafe = 9007199254740993n; + const cluster = hydrateClusterState({ + nodes: [ + { + name: "node1", + cpu: { allocatable: unsafe, allocated: 0 }, + memory: { allocatable: 0, allocated: 0 }, + ephemeralStorage: { allocatable: 0, allocated: 0 }, + gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, + storageClasses: [], + cpus: [] + } + ], + storage: {} + }); + + expect(cluster.nodes[0].cpu.allocatable).toBe(unsafe); + }); + + it("returns empty cluster for nullish input", () => { + const cluster = hydrateClusterState(undefined); + expect(cluster.nodes).toEqual([]); + expect(cluster.storage).toEqual({}); + }); +}); diff --git a/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.ts b/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.ts new file mode 100644 index 000000000..56a61a00d --- /dev/null +++ b/apps/provider-inventory/src/lib/hydrate-cluster-state/hydrate-cluster-state.ts @@ -0,0 +1,43 @@ +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import type { ClusterState, NodeState } from "@src/types/inventory.types"; + +type RawPair = { allocatable: number | bigint; allocated: number | bigint }; +type RawNode = Omit & { + cpu: RawPair; + memory: RawPair; + ephemeralStorage: RawPair; + gpu: { quantity: RawPair; info: NodeState["gpu"]["info"] }; +}; +type RawCluster = { + nodes?: RawNode[]; + storage?: Record; +}; + +export function hydrateClusterState(raw: unknown): ClusterState { + const cluster = (raw ?? {}) as RawCluster; + const nodes = (cluster.nodes ?? []).map(hydrateNode); + const storage: ClusterState["storage"] = Object.create(null); + for (const [key, pool] of Object.entries(cluster.storage ?? {})) { + storage[key] = { class: pool.class, quantity: hydratePair(pool.quantity) }; + } + return { nodes, storage }; +} + +function hydrateNode(node: RawNode): NodeState { + return { + name: node.name, + cpu: hydratePair(node.cpu), + memory: hydratePair(node.memory), + ephemeralStorage: hydratePair(node.ephemeralStorage), + gpu: { quantity: hydratePair(node.gpu.quantity), info: node.gpu.info ?? [] }, + storageClasses: node.storageClasses ?? [], + cpus: node.cpus ?? [] + }; +} + +function hydratePair(pair: RawPair): ResourcePair { + const allocatable = typeof pair.allocatable === "bigint" ? pair.allocatable : BigInt(pair.allocatable || 0); + const allocated = typeof pair.allocated === "bigint" ? pair.allocated : BigInt(pair.allocated || 0); + + return new ResourcePair(allocatable, allocated); +} diff --git a/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.spec.ts b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.spec.ts new file mode 100644 index 000000000..3d8f10a80 --- /dev/null +++ b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.spec.ts @@ -0,0 +1,176 @@ +import { describe, expect, it } from "vitest"; + +import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; +import type { RequestedResourceUnit, RequestedStorage } from "@src/types/inventory.types"; +import { aggregateCriteria } from "./bid-screening.aggregator"; + +describe(aggregateCriteria.name, () => { + describe("totals", () => { + it("multiplies single-unit cpu/memory/gpu by count", () => { + const c = aggregateCriteria([makeUnit({ cpu: 1000n, memory: 2_000_000n, gpu: 1n, count: 3 })], makeRequirements()); + expect(c.totalCpu).toBe(3000n); + expect(c.totalMemory).toBe(6_000_000n); + expect(c.totalGpu).toBe(3n); + }); + + it("sums across multiple units", () => { + const c = aggregateCriteria( + [makeUnit({ cpu: 1000n, memory: 1_000_000n, gpu: 0n, count: 2 }), makeUnit({ cpu: 500n, memory: 2_000_000n, gpu: 1n, count: 4 })], + makeRequirements() + ); + expect(c.totalCpu).toBe(1000n * 2n + 500n * 4n); + expect(c.totalMemory).toBe(1_000_000n * 2n + 2_000_000n * 4n); + expect(c.totalGpu).toBe(0n + 4n); + }); + + it("count=0 contributes nothing to totals", () => { + const c = aggregateCriteria([makeUnit({ cpu: 1000n, memory: 1n, gpu: 1n, count: 0 })], makeRequirements()); + expect(c.totalCpu).toBe(0n); + expect(c.totalMemory).toBe(0n); + expect(c.totalGpu).toBe(0n); + }); + + it("routes persistent and ephemeral storage to separate totals", () => { + const c = aggregateCriteria( + [ + makeUnit({ + count: 2, + storage: [ + { + name: "data", + quantity: 1000n, + attributes: [ + { key: "persistent", value: "true" }, + { key: "class", value: "beta2" } + ] + }, + { name: "scratch", quantity: 500n, attributes: [] } + ] + }) + ], + makeRequirements() + ); + expect(c.totalPersistentStorage).toBe(2000n); + expect(c.totalEphemeralStorage).toBe(1000n); + }); + + it("skips ram-class storage from totalMemory (issue 4 territory)", () => { + const c = aggregateCriteria( + [ + makeUnit({ + memory: 100n, + count: 1, + storage: [{ name: "shm", quantity: 99_999n, attributes: [{ key: "class", value: "ram" }] }] + }) + ], + makeRequirements() + ); + expect(c.totalMemory).toBe(100n); + expect(c.totalEphemeralStorage).toBe(0n); + expect(c.totalPersistentStorage).toBe(0n); + }); + }); + + describe("maxPerReplica", () => { + it("picks max of unit.cpu — not max of unit.cpu * count", () => { + const c = aggregateCriteria( + [ + makeUnit({ cpu: 1000n, count: 5 }), // would be 5000 if multiplied + makeUnit({ cpu: 4000n, count: 1 }) + ], + makeRequirements() + ); + expect(c.maxPerReplicaCpu).toBe(4000n); + }); + + it("picks max of unit.memory across units", () => { + const c = aggregateCriteria([makeUnit({ memory: 1n, count: 100 }), makeUnit({ memory: 50n, count: 1 })], makeRequirements()); + expect(c.maxPerReplicaMemory).toBe(50n); + }); + + it("picks max of unit.gpu across units", () => { + const c = aggregateCriteria([makeUnit({ gpu: 1n, count: 8 }), makeUnit({ gpu: 4n, count: 1 })], makeRequirements()); + expect(c.maxPerReplicaGpu).toBe(4n); + }); + }); + + describe("attribute partition", () => { + it("routes exact keys to attributes and trailing-* keys to globAttributes", () => { + const c = aggregateCriteria( + [makeUnit({})], + makeRequirements({ + attributes: [ + { key: "region", value: "us-east" }, + { key: "host/*", value: "true" } + ] + }) + ); + expect(c.attributes).toEqual([{ key: "region", value: "us-east" }]); + expect(c.globAttributes).toHaveLength(1); + expect(c.globAttributes[0].value).toBe("true"); + }); + + it("escapes regex specials in glob prefix (dot)", () => { + const c = aggregateCriteria([makeUnit({})], makeRequirements({ attributes: [{ key: "host.gpu/*", value: "true" }] })); + const re = new RegExp(c.globAttributes[0].keyPattern); + expect(re.test("host.gpu/a")).toBe(true); + expect(re.test("host.gpu/a/b")).toBe(false); + expect(re.test("hostXgpu/a")).toBe(false); + }); + + it("uses [^/]* so glob does not cross path separators", () => { + const c = aggregateCriteria([makeUnit({})], makeRequirements({ attributes: [{ key: "host/*", value: "true" }] })); + const re = new RegExp(c.globAttributes[0].keyPattern); + expect(re.test("host/gpu")).toBe(true); + expect(re.test("host/gpu/foo")).toBe(false); + }); + + it("empty requirements.attributes produces empty arrays, not undefined", () => { + const c = aggregateCriteria([makeUnit({})], makeRequirements()); + expect(c.attributes).toEqual([]); + expect(c.globAttributes).toEqual([]); + }); + }); + + describe("signedBy", () => { + it("passes allOf and anyOf through unchanged", () => { + const c = aggregateCriteria([makeUnit({})], makeRequirements({ signedBy: { allOf: ["a", "b"], anyOf: ["c"] } })); + expect(c.signedBy).toEqual({ allOf: ["a", "b"], anyOf: ["c"] }); + }); + + it("empty signedBy arrays survive as empty arrays", () => { + const c = aggregateCriteria([makeUnit({})], makeRequirements()); + expect(c.signedBy).toEqual({ allOf: [], anyOf: [] }); + }); + }); + + describe("units dimension", () => { + it("emits an empty per-unit filter slot for each unit in this slice (issues 2/3 populate it)", () => { + const c = aggregateCriteria([makeUnit({}), makeUnit({})], makeRequirements()); + expect(c.units).toEqual([ + { gpuTokens: [], persistentClasses: [] }, + { gpuTokens: [], persistentClasses: [] } + ]); + }); + }); +}); + +function makeUnit(input: { cpu?: bigint; memory?: bigint; gpu?: bigint; count?: number; storage?: RequestedStorage[] }): RequestedResourceUnit { + return { + id: 1, + count: input.count ?? 1, + resources: { + cpu: { units: input.cpu ?? 0n, attributes: [] }, + memory: { quantity: input.memory ?? 0n, attributes: [] }, + gpu: { units: input.gpu ?? 0n, attributes: [] }, + storage: input.storage ?? [] + } + }; +} + +function makeRequirements(input?: Partial): GroupSpecJSON["requirements"] { + return { + signedBy: input?.signedBy ?? { allOf: [], anyOf: [] }, + attributes: input?.attributes ?? [] + }; +} diff --git a/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.ts b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.ts new file mode 100644 index 000000000..84a66d333 --- /dev/null +++ b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.aggregator.ts @@ -0,0 +1,98 @@ +import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; +import { parseStorageAttributes } from "@src/lib/storage-attribute-parser/storage-attribute-parser"; +import type { RequestedResourceUnit } from "@src/types/inventory.types"; + +interface UnitFilters { + gpuTokens: string[]; + persistentClasses: string[]; +} + +export interface BidScreeningCriteria { + totalCpu: bigint; + totalMemory: bigint; + totalGpu: bigint; + totalEphemeralStorage: bigint; + totalPersistentStorage: bigint; + maxPerReplicaCpu: bigint; + maxPerReplicaMemory: bigint; + maxPerReplicaGpu: bigint; + attributes: { key: string; value: string }[]; + globAttributes: { keyPattern: string; value: string }[]; + signedBy: { allOf: string[]; anyOf: string[] }; + units: UnitFilters[]; +} + +export function aggregateCriteria(resourceUnits: RequestedResourceUnit[], requirements: GroupSpecJSON["requirements"]): BidScreeningCriteria { + let totalCpu = 0n; + let totalMemory = 0n; + let totalGpu = 0n; + let totalEphemeralStorage = 0n; + let totalPersistentStorage = 0n; + let maxPerReplicaCpu = 0n; + let maxPerReplicaMemory = 0n; + let maxPerReplicaGpu = 0n; + const units: UnitFilters[] = []; + + for (const unit of resourceUnits) { + const count = BigInt(unit.count); + const cpu = unit.resources.cpu.units; + const memory = unit.resources.memory.quantity; + const gpu = unit.resources.gpu.units; + + totalCpu += cpu * count; + totalMemory += memory * count; + totalGpu += gpu * count; + + if (cpu > maxPerReplicaCpu) maxPerReplicaCpu = cpu; + if (memory > maxPerReplicaMemory) maxPerReplicaMemory = memory; + if (gpu > maxPerReplicaGpu) maxPerReplicaGpu = gpu; + + for (const vol of unit.resources.storage) { + const parsed = parseStorageAttributes(vol.attributes); + if (parsed.classification === "persistent") { + totalPersistentStorage += vol.quantity * count; + } else if (parsed.classification === "ephemeral") { + totalEphemeralStorage += vol.quantity * count; + } + // ram volumes intentionally skipped — issue 4 will add them to totalMemory + } + + units.push({ gpuTokens: [], persistentClasses: [] }); + } + + const attributes: BidScreeningCriteria["attributes"] = []; + const globAttributes: BidScreeningCriteria["globAttributes"] = []; + + for (const attr of requirements.attributes) { + if (attr.key.endsWith("*")) { + const prefix = attr.key.slice(0, -1); + globAttributes.push({ keyPattern: `^${escapeRegex(prefix)}[^/]*$`, value: attr.value }); + } else { + attributes.push({ key: attr.key, value: attr.value }); + } + } + + return { + totalCpu, + totalMemory, + totalGpu, + totalEphemeralStorage, + totalPersistentStorage, + maxPerReplicaCpu, + maxPerReplicaMemory, + maxPerReplicaGpu, + attributes, + globAttributes, + signedBy: { + allOf: requirements.signedBy.allOf, + anyOf: requirements.signedBy.anyOf + }, + units + }; +} + +// The SDL attribute key regex (enforced upstream in BidScreeningService) admits only +// [a-zA-Z][\w\/\.\-]*[\w\*]?, so the only regex special that can appear in the prefix is `.`. +function escapeRegex(input: string): string { + return input.replace(/[\\.^$*+?()[\]{}|]/g, "\\$&"); +} diff --git a/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.integration.ts b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.integration.ts new file mode 100644 index 000000000..c6b6d8977 --- /dev/null +++ b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.integration.ts @@ -0,0 +1,260 @@ +import "@src/providers"; + +import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; +import { container } from "tsyringe"; +import { beforeEach, describe, expect, it } from "vitest"; + +import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; +import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; +import { providerInventory } from "@src/model-schemas/provider-inventory/provider-inventory.schema"; +import { DRIZZLE_DB } from "@src/providers/drizzle.provider"; +import type { RequestedResourceUnit } from "@src/types/inventory.types"; +import { AUDITOR, BidScreeningRepository } from "./bid-screening.repository"; + +describe(BidScreeningRepository.name, () => { + let repository: BidScreeningRepository; + let db: PostgresJsDatabase; + + beforeEach(() => { + repository = container.resolve(BidScreeningRepository); + db = container.resolve(DRIZZLE_DB); + }); + + describe("capacity filters", () => { + it("excludes providers whose total available capacity falls short", async () => { + await seed({ owner: "akash1small", totalAvailableCpu: 500n, maxNodeFreeCpu: 500n }); + await seed({ owner: "akash1big", totalAvailableCpu: 2000n, maxNodeFreeCpu: 2000n }); + + const rows = await repository.findCandidates([unit({ cpu: 1000n, count: 1 })], requirements()); + + expect(owners(rows)).toEqual(["akash1big"]); + }); + + it("excludes providers whose largest node cannot fit a single replica", async () => { + await seed({ owner: "akash1fragmented", totalAvailableCpu: 4000n, maxNodeFreeCpu: 100n }); + await seed({ owner: "akash1roomy", totalAvailableCpu: 4000n, maxNodeFreeCpu: 400n }); + + const rows = await repository.findCandidates([unit({ cpu: 200n, count: 1 })], requirements()); + + expect(owners(rows)).toEqual(["akash1roomy"]); + }); + }); + + describe("signedBy", () => { + it("requires every auditor in allOf via @>", async () => { + await seed({ owner: "akash1a", auditedBy: ["aud-a"] }); + await seed({ owner: "akash1ab", auditedBy: ["aud-a", "aud-b"] }); + + const rows = await repository.findCandidates([unit({})], requirements({ signedBy: { allOf: ["aud-a", "aud-b"], anyOf: [] } })); + + expect(owners(rows)).toEqual(["akash1ab"]); + }); + + it("matches any auditor in anyOf via &&", async () => { + await seed({ owner: "akash1a", auditedBy: ["aud-a"] }); + await seed({ owner: "akash1c", auditedBy: ["aud-c"] }); + await seed({ owner: "akash1none", auditedBy: [] }); + + const rows = await repository.findCandidates([unit({})], requirements({ signedBy: { allOf: [], anyOf: ["aud-a", "aud-b"] } })); + + expect(owners(rows)).toEqual(["akash1a"]); + }); + + it("omits the clause entirely when signedBy is empty", async () => { + await seed({ owner: "akash1audited", auditedBy: ["aud-a"] }); + await seed({ owner: "akash1unaudited", auditedBy: [] }); + + const rows = await repository.findCandidates([unit({})], requirements()); + + expect(owners(rows).sort()).toEqual(["akash1audited", "akash1unaudited"]); + }); + }); + + describe("self-attribute filters", () => { + it("matches exact-key attributes via jsonb @>", async () => { + await seed({ owner: "akash1east", selfAttributes: [{ key: "region", value: "us-east" }] }); + await seed({ owner: "akash1west", selfAttributes: [{ key: "region", value: "us-west" }] }); + + const rows = await repository.findCandidates([unit({})], requirements({ attributes: [{ key: "region", value: "us-east" }] })); + + expect(owners(rows)).toEqual(["akash1east"]); + }); + + it("trailing-* glob does not cross path separators", async () => { + await seed({ owner: "akash1leaf", selfAttributes: [{ key: "host/gpu", value: "true" }] }); + await seed({ owner: "akash1nested", selfAttributes: [{ key: "host/gpu/foo", value: "true" }] }); + + const rows = await repository.findCandidates([unit({})], requirements({ attributes: [{ key: "host/*", value: "true" }] })); + + expect(owners(rows)).toEqual(["akash1leaf"]); + }); + + it("ANDs exact and glob clauses", async () => { + await seed({ + owner: "akash1both", + selfAttributes: [ + { key: "region", value: "us-east" }, + { key: "host/gpu", value: "true" } + ] + }); + await seed({ + owner: "akash1regionOnly", + selfAttributes: [{ key: "region", value: "us-east" }] + }); + await seed({ + owner: "akash1hostOnly", + selfAttributes: [{ key: "host/gpu", value: "true" }] + }); + + const rows = await repository.findCandidates( + [unit({})], + requirements({ + attributes: [ + { key: "region", value: "us-east" }, + { key: "host/*", value: "true" } + ] + }) + ); + + expect(owners(rows)).toEqual(["akash1both"]); + }); + + it("compares values case-sensitively", async () => { + await seed({ owner: "akash1lower", selfAttributes: [{ key: "region", value: "us-east" }] }); + + const rows = await repository.findCandidates([unit({})], requirements({ attributes: [{ key: "region", value: "US-EAST" }] })); + + expect(rows).toEqual([]); + }); + }); + + describe("isAudited projection", () => { + it("returns isAudited=true when auditedBy contains the canonical AUDITOR", async () => { + await seed({ owner: "akash1audited", auditedBy: [AUDITOR] }); + + const rows = await repository.findCandidates([unit({})], requirements()); + + expect(rows[0]).toMatchObject({ owner: "akash1audited", isAudited: true }); + }); + + it("returns isAudited=false when signedBy matches a different auditor", async () => { + await seed({ owner: "akash1other", auditedBy: ["aud-x"] }); + + const rows = await repository.findCandidates([unit({})], requirements({ signedBy: { allOf: ["aud-x"], anyOf: [] } })); + + expect(rows[0]).toMatchObject({ owner: "akash1other", isAudited: false }); + }); + }); + + describe("online filter", () => { + it("excludes rows where is_online is false", async () => { + await seed({ owner: "akash1up" }); + await seed({ owner: "akash1down", isOnline: false }); + + const rows = await repository.findCandidates([unit({})], requirements()); + + expect(owners(rows)).toEqual(["akash1up"]); + }); + + it("excludes rows where is_online_since is null", async () => { + await seed({ owner: "akash1stable" }); + await seed({ owner: "akash1ghost", isOnlineSince: null }); + + const rows = await repository.findCandidates([unit({})], requirements()); + + expect(owners(rows)).toEqual(["akash1stable"]); + }); + }); + + describe("ClusterState hydration", () => { + it("rebuilds ResourcePair instances from persisted inventory JSONB", async () => { + await seed({ + owner: "akash1full", + inventory: { + nodes: [ + { + name: "node1", + cpu: { allocatable: 8000, allocated: 2000 }, + memory: { allocatable: 17179869184, allocated: 0 }, + ephemeralStorage: { allocatable: 0, allocated: 0 }, + gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, + storageClasses: ["beta2"], + cpus: [] + } + ], + storage: {} + } + }); + + const [row] = await repository.findCandidates([unit({})], requirements()); + + const node = row.cluster.nodes[0]; + expect(node.cpu).toBeInstanceOf(ResourcePair); + expect(node.cpu.allocatable).toBe(8000n); + expect(node.cpu.allocated).toBe(2000n); + expect(node.memory.allocatable).toBe(17179869184n); + }); + }); + + interface SeedInput { + owner: string; + hostUri?: string; + isOnline?: boolean; + isOnlineSince?: Date | null; + totalAvailableCpu?: bigint; + totalAvailableMemory?: bigint; + totalAvailableGpu?: bigint; + totalAvailableEph?: bigint; + totalAvailablePersistent?: bigint; + maxNodeFreeCpu?: bigint; + maxNodeFreeMemory?: bigint; + maxNodeFreeGpu?: bigint; + selfAttributes?: { key: string; value: string }[]; + auditedBy?: string[]; + inventory?: unknown; + } + + async function seed(input: SeedInput): Promise { + await db.insert(providerInventory).values({ + owner: input.owner, + hostUri: input.hostUri ?? `https://${input.owner}:8443`, + isOnline: input.isOnline ?? true, + isOnlineSince: input.isOnlineSince === undefined ? new Date() : input.isOnlineSince, + totalAvailableCpu: input.totalAvailableCpu ?? 1_000_000n, + totalAvailableMemory: input.totalAvailableMemory ?? 1_000_000_000n, + totalAvailableGpu: input.totalAvailableGpu ?? 0n, + totalAvailableEph: input.totalAvailableEph ?? 1_000_000_000n, + totalAvailablePersistent: input.totalAvailablePersistent ?? 0n, + maxNodeFreeCpu: input.maxNodeFreeCpu ?? 1_000_000n, + maxNodeFreeMemory: input.maxNodeFreeMemory ?? 1_000_000_000n, + maxNodeFreeGpu: input.maxNodeFreeGpu ?? 0n, + selfAttributes: input.selfAttributes ?? [], + auditedBy: input.auditedBy ?? [], + inventory: input.inventory ?? { nodes: [], storage: {} } + }); + } +}); + +function unit(input: { cpu?: bigint; memory?: bigint; gpu?: bigint; count?: number }): RequestedResourceUnit { + return { + id: 1, + count: input.count ?? 1, + resources: { + cpu: { units: input.cpu ?? 0n, attributes: [] }, + memory: { quantity: input.memory ?? 0n, attributes: [] }, + gpu: { units: input.gpu ?? 0n, attributes: [] }, + storage: [] + } + }; +} + +function requirements(input?: Partial): GroupSpecJSON["requirements"] { + return { + signedBy: input?.signedBy ?? { allOf: [], anyOf: [] }, + attributes: input?.attributes ?? [] + }; +} + +function owners(rows: { owner: string }[]): string[] { + return rows.map(r => r.owner).sort(); +} diff --git a/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.ts b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.ts new file mode 100644 index 000000000..62ec161f7 --- /dev/null +++ b/apps/provider-inventory/src/repositories/bid-screening/bid-screening.repository.ts @@ -0,0 +1,89 @@ +import { and, arrayContains, arrayOverlaps, eq, gte, isNotNull, type SQL, sql } from "drizzle-orm"; +import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; +import { inject, singleton } from "tsyringe"; + +import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; +import { hydrateClusterState } from "@src/lib/hydrate-cluster-state/hydrate-cluster-state"; +import { providerInventory } from "@src/model-schemas/provider-inventory/provider-inventory.schema"; +import { DRIZZLE_DB } from "@src/providers/drizzle.provider"; +import type { RequestedResourceUnit } from "@src/types/inventory.types"; +import type { ProviderWithClusterState } from "@src/types/provider"; +import { aggregateCriteria, type BidScreeningCriteria } from "./bid-screening.aggregator"; + +// TODO(Issue 5): move auditor allowlist into configuration and accept it as a request input. +export const AUDITOR = "akash1365yvmc4s7awdyj3n2sav7xfx76adc6dnmlx63"; + +export type BidScreeningCandidate = ProviderWithClusterState & { isAudited: boolean }; + +@singleton() +export class BidScreeningRepository { + readonly #db: PostgresJsDatabase; + + constructor(@inject(DRIZZLE_DB) db: PostgresJsDatabase) { + this.#db = db; + } + + async findCandidates(resourceUnits: RequestedResourceUnit[], requirements: GroupSpecJSON["requirements"]): Promise { + const criteria = aggregateCriteria(resourceUnits, requirements); + const where = and(...this.#buildWhere(criteria)); + + const rows = await this.#db + .select({ + owner: providerInventory.owner, + hostUri: providerInventory.hostUri, + ipRegion: providerInventory.ipRegion, + uptime7d: providerInventory.uptime7d, + inventory: providerInventory.inventory, + isAudited: sql`${providerInventory.auditedBy} @> ARRAY[${AUDITOR}]::text[]`.as("isAudited") + }) + .from(providerInventory) + .where(where); + + return rows.map(row => ({ + owner: row.owner, + hostUri: row.hostUri, + ipRegion: row.ipRegion, + uptime7d: row.uptime7d, + cluster: hydrateClusterState(row.inventory), + isAudited: row.isAudited + })); + } + + #buildWhere(criteria: BidScreeningCriteria): SQL[] { + /** + * Ensure that total* are enough to cover resource request (may have false positives), + * and that the largest service can be placed on at least one node (prevents false positives due to fragmentation). + */ + const conditions: SQL[] = [ + eq(providerInventory.isOnline, true), + isNotNull(providerInventory.isOnlineSince), + gte(providerInventory.totalAvailableCpu, criteria.totalCpu), + gte(providerInventory.totalAvailableMemory, criteria.totalMemory), + gte(providerInventory.totalAvailableGpu, criteria.totalGpu), + gte(providerInventory.totalAvailableEph, criteria.totalEphemeralStorage), + gte(providerInventory.totalAvailablePersistent, criteria.totalPersistentStorage), + gte(providerInventory.maxNodeFreeCpu, criteria.maxPerReplicaCpu), + gte(providerInventory.maxNodeFreeMemory, criteria.maxPerReplicaMemory), + gte(providerInventory.maxNodeFreeGpu, criteria.maxPerReplicaGpu) + ]; + + if (criteria.attributes.length > 0) { + conditions.push(sql`${providerInventory.selfAttributes} @> ${sql.param(criteria.attributes)}::jsonb`); + } + + for (const glob of criteria.globAttributes) { + conditions.push( + sql`EXISTS (SELECT 1 FROM jsonb_array_elements(${providerInventory.selfAttributes}) AS sa WHERE sa->>'key' ~* ${glob.keyPattern} AND sa->>'value' = ${glob.value})` + ); + } + + if (criteria.signedBy.allOf.length > 0) { + conditions.push(arrayContains(providerInventory.auditedBy, criteria.signedBy.allOf)); + } + if (criteria.signedBy.anyOf.length > 0) { + conditions.push(arrayOverlaps(providerInventory.auditedBy, criteria.signedBy.anyOf)); + } + + return conditions; + } +} diff --git a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.integration.ts b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.integration.ts deleted file mode 100644 index d6786bc98..000000000 --- a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.integration.ts +++ /dev/null @@ -1,50 +0,0 @@ -import "@src/providers"; - -import { container } from "tsyringe"; -import { describe, expect, it } from "vitest"; - -import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; -import type { ChainProvider } from "@src/types/chain-provider"; - -describe(ProviderInventoryRepository.name, () => { - it("upserts attributes and resolves audited providers by overlapping auditors", async () => { - const { repository } = setup(); - - await repository.upsertAttributes( - createProvider({ - owner: "akash1audited", - hostUri: "https://audited:8443", - signedAttributes: [ - { key: "region", value: "us-east", auditor: "akash1auditor-a" }, - { key: "tier", value: "gold", auditor: "akash1auditor-b" } - ] - }) - ); - await repository.upsertAttributes( - createProvider({ - owner: "akash1unaudited", - hostUri: "https://unaudited:8443" - }) - ); - - const audited = await repository.getAuditedProviderAddresses(["akash1auditor-a"]); - const noneMatch = await repository.getAuditedProviderAddresses(["akash1auditor-unknown"]); - const emptyQuery = await repository.getAuditedProviderAddresses([]); - - expect(audited).toEqual(new Set(["akash1audited"])); - expect(noneMatch).toEqual(new Set()); - expect(emptyQuery).toEqual(new Set()); - }); - - function setup() { - return { repository: container.resolve(ProviderInventoryRepository) }; - } -}); - -function createProvider(overrides: Partial & Pick): ChainProvider { - return { - selfAttributes: [], - signedAttributes: [], - ...overrides - }; -} diff --git a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts index b4712eec4..dee0af137 100644 --- a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts +++ b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.spec.ts @@ -69,129 +69,6 @@ describe(ProviderInventoryRepository.name, () => { }); }); - describe("getOnlineProviders", () => { - it("hydrates persisted ClusterState back into ResourcePair instances", async () => { - const { writer } = setup({ - selectRows: [ - { - owner: "akash1abc", - hostUri: "https://h:8443", - ipRegion: "us-east", - uptime7d: 0.998, - inventory: { - nodes: [ - { - name: "node1", - cpu: { allocatable: 8000, allocated: 2000 }, - memory: { allocatable: 17179869184, allocated: 4294967296 }, - ephemeralStorage: { allocatable: 107374182400, allocated: 0 }, - gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, - storageClasses: ["beta2"], - cpus: [] - } - ], - storage: {} - } - } - ] - }); - - const result = await writer.getOnlineProviders(); - - expect(result).toHaveLength(1); - expect(result[0]).toMatchObject({ owner: "akash1abc", hostUri: "https://h:8443", ipRegion: "us-east", uptime7d: 0.998 }); - const node = result[0].cluster.nodes[0]; - expect(node.name).toBe("node1"); - expect(node.cpu.allocatable).toBe(8000n); - expect(node.cpu.allocated).toBe(2000n); - expect(node.memory.allocatable).toBe(17179869184n); - expect(node.memory.allocated).toBe(4294967296n); - expect(node.ephemeralStorage.allocatable).toBe(107374182400n); - expect(node.ephemeralStorage.allocated).toBe(0n); - expect(node.storageClasses).toEqual(["beta2"]); - }); - - it("hydrates cluster storage pools as ResourcePair entries keyed by class", async () => { - const { writer } = setup({ - selectRows: [ - { - owner: "akash1abc", - hostUri: "https://h:8443", - ipRegion: null, - uptime7d: null, - inventory: { - nodes: [], - storage: { beta2: { class: "beta2", quantity: { allocatable: 536870912000, allocated: 0 } } } - } - } - ] - }); - - const result = await writer.getOnlineProviders(); - - const beta2 = result[0].cluster.storage["beta2"]; - expect(beta2.class).toBe("beta2"); - expect(beta2.quantity.allocatable).toBe(536870912000n); - expect(beta2.quantity.allocated).toBe(0n); - }); - - it("preserves bigint magnitudes that exceed Number.MAX_SAFE_INTEGER", async () => { - const unsafe = 9007199254740993n; - const { writer } = setup({ - selectRows: [ - { - owner: "akash1abc", - hostUri: "https://h:8443", - ipRegion: null, - uptime7d: null, - inventory: { - nodes: [ - { - name: "node1", - cpu: { allocatable: unsafe, allocated: 0 }, - memory: { allocatable: 0, allocated: 0 }, - ephemeralStorage: { allocatable: 0, allocated: 0 }, - gpu: { quantity: { allocatable: 0, allocated: 0 }, info: [] }, - storageClasses: [], - cpus: [] - } - ], - storage: {} - } - } - ] - }); - - const result = await writer.getOnlineProviders(); - - expect(result[0].cluster.nodes[0].cpu.allocatable).toBe(unsafe); - }); - - it("returns empty array when no rows match", async () => { - const { writer } = setup({ selectRows: [] }); - expect(await writer.getOnlineProviders()).toEqual([]); - }); - }); - - describe("getAuditedProviderAddresses", () => { - it("returns empty set when no auditors are given without hitting the DB", async () => { - const { writer, db } = setup(); - - const result = await writer.getAuditedProviderAddresses([]); - - expect(result).toEqual(new Set()); - expect(db.select).not.toHaveBeenCalled(); - }); - - it("returns the set of owners whose auditedBy overlaps the requested auditors", async () => { - const { writer } = setup({ selectRows: [{ owner: "akash1abc" }, { owner: "akash1def" }] }); - - const result = await writer.getAuditedProviderAddresses(["auditor-a"]); - - expect(result).toEqual(new Set(["akash1abc", "akash1def"])); - }); - }); - function setup(input?: { selectRows?: unknown[] }) { const deleteWhere = vi.fn().mockResolvedValue(undefined); const insertOnConflict = vi.fn().mockResolvedValue(undefined); diff --git a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts index 10244e2f9..87e06bcc2 100644 --- a/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts +++ b/apps/provider-inventory/src/repositories/provider-inventory/provider-inventory.repository.ts @@ -1,17 +1,14 @@ import type { LoggerService } from "@akashnetwork/logging"; -import { and, arrayOverlaps, eq, sql as rawSql } from "drizzle-orm"; +import { eq, sql as rawSql } from "drizzle-orm"; import type { PostgresJsDatabase } from "drizzle-orm/postgres-js"; import { inject, singleton } from "tsyringe"; -import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; import { providerInventory } from "@src/model-schemas/provider-inventory/provider-inventory.schema"; import { DRIZZLE_DB } from "@src/providers/drizzle.provider"; import type { LoggerFactory } from "@src/providers/logger-factory.provider"; import { LOGGER_FACTORY } from "@src/providers/logger-factory.provider"; import type { ChainProvider } from "@src/types/chain-provider"; import type { ProjectedRow } from "@src/types/inventory"; -import type { ClusterState, NodeState } from "@src/types/inventory.types"; -import type { ProviderWithClusterState } from "@src/types/provider"; @singleton() export class ProviderInventoryRepository { @@ -82,79 +79,4 @@ export class ProviderInventoryRepository { this.#logger.debug({ event: "PROVIDER_ATTRIBUTES_UPSERTED", owner: provider.owner }); } - - async getOnlineProviders(): Promise { - const rows = await this.#db - .select({ - owner: providerInventory.owner, - hostUri: providerInventory.hostUri, - ipRegion: providerInventory.ipRegion, - uptime7d: providerInventory.uptime7d, - inventory: providerInventory.inventory - }) - .from(providerInventory) - .where(and(eq(providerInventory.isOnline, true))); - - return rows.map(row => ({ - owner: row.owner, - hostUri: row.hostUri, - ipRegion: row.ipRegion, - uptime7d: row.uptime7d, - cluster: hydrateClusterState(row.inventory) - })); - } - - async getAuditedProviderAddresses(auditorAddresses: string[]): Promise> { - if (auditorAddresses.length === 0) { - return new Set(); - } - - const rows = await this.#db - .select({ owner: providerInventory.owner }) - .from(providerInventory) - .where(arrayOverlaps(providerInventory.auditedBy, auditorAddresses)); - - return new Set(rows.map(r => r.owner)); - } -} - -type RawPair = { allocatable: number | bigint; allocated: number | bigint }; -type RawNode = Omit & { - cpu: RawPair; - memory: RawPair; - ephemeralStorage: RawPair; - gpu: { quantity: RawPair; info: NodeState["gpu"]["info"] }; -}; -type RawCluster = { - nodes?: RawNode[]; - storage?: Record; -}; - -export function hydrateClusterState(raw: unknown): ClusterState { - const cluster = (raw ?? {}) as RawCluster; - const nodes = (cluster.nodes ?? []).map(hydrateNode); - const storage: ClusterState["storage"] = Object.create(null); - for (const [key, pool] of Object.entries(cluster.storage ?? {})) { - storage[key] = { class: pool.class, quantity: hydratePair(pool.quantity) }; - } - return { nodes, storage }; -} - -function hydrateNode(node: RawNode): NodeState { - return { - name: node.name, - cpu: hydratePair(node.cpu), - memory: hydratePair(node.memory), - ephemeralStorage: hydratePair(node.ephemeralStorage), - gpu: { quantity: hydratePair(node.gpu.quantity), info: node.gpu.info ?? [] }, - storageClasses: node.storageClasses ?? [], - cpus: node.cpus ?? [] - }; -} - -function hydratePair(pair: RawPair): ResourcePair { - const allocatable = typeof pair.allocatable === "bigint" ? pair.allocatable : BigInt(pair.allocatable || 0); - const allocated = typeof pair.allocated === "bigint" ? pair.allocated : BigInt(pair.allocated || 0); - - return new ResourcePair(allocatable, allocated); } diff --git a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts index 353cc7315..f8c7fc15f 100644 --- a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts +++ b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.spec.ts @@ -4,17 +4,15 @@ import { mock } from "vitest-mock-extended"; import type { GroupSpecJSON } from "@src/lib/groupspec-mapper/groupspec-mapper"; import { ResourcePair } from "@src/lib/resource-pair/resource-pair"; -import type { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; -import type { ProviderWithClusterState } from "../../types/provider"; +import type { BidScreeningCandidate, BidScreeningRepository } from "@src/repositories/bid-screening/bid-screening.repository"; import type { ClusterInventoryMatcherService } from "../cluster-inventory-matcher/cluster-inventory-matcher.service"; import { BidScreeningService } from "./bid-screening.service"; describe(BidScreeningService.name, () => { describe("findMatchingProviders", () => { - it("returns passing providers with metadata", async () => { + it("returns passing candidates with metadata", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + repository.findCandidates.mockResolvedValue([makeCandidate("akash1abc")]); matcher.match.mockReturnValue({ matched: true }); const results = await service.findMatchingProviders(makeRequest()); @@ -30,10 +28,9 @@ describe(BidScreeningService.name, () => { ]); }); - it("filters out providers that fail matching", async () => { + it("filters out candidates that fail matching", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + repository.findCandidates.mockResolvedValue([makeCandidate("akash1abc"), makeCandidate("akash1def")]); matcher.match.mockReturnValueOnce({ matched: true }).mockReturnValueOnce({ matched: false, error: "INSUFFICIENT_CAPACITY" }); const results = await service.findMatchingProviders(makeRequest()); @@ -42,69 +39,62 @@ describe(BidScreeningService.name, () => { expect(results[0].owner).toBe("akash1abc"); }); - it("passes the provider's ClusterState directly to the matcher", async () => { + it("passes the candidate's ClusterState directly to the matcher", async () => { const { service, repository, matcher } = setup(); - const provider = makeProvider("akash1abc"); - repository.getOnlineProviders.mockResolvedValue([provider]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + const candidate = makeCandidate("akash1abc"); + repository.findCandidates.mockResolvedValue([candidate]); matcher.match.mockReturnValue({ matched: true }); await service.findMatchingProviders(makeRequest()); - expect(matcher.match).toHaveBeenCalledWith(provider.cluster, expect.any(Array)); + expect(matcher.match).toHaveBeenCalledWith(candidate.cluster, expect.any(Array)); }); - it("returns empty array when no providers are online", async () => { + it("returns empty array when no candidates are returned", async () => { const { service, repository } = setup(); - repository.getOnlineProviders.mockResolvedValue([]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + repository.findCandidates.mockResolvedValue([]); const results = await service.findMatchingProviders(makeRequest()); expect(results).toEqual([]); }); - it("returns empty array when all providers fail matching", async () => { + it("returns empty array when all candidates fail matching", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + repository.findCandidates.mockResolvedValue([makeCandidate("akash1abc")]); matcher.match.mockReturnValue({ matched: false, error: "INSUFFICIENT_CAPACITY" }); const results = await service.findMatchingProviders(makeRequest()); expect(results).toEqual([]); }); - }); - describe("filtering", () => { - it("loads providers and audited owners in parallel after pre-filter", async () => { + it("calls findCandidates exactly once with parsed resource units and requirements", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc")]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set()); + repository.findCandidates.mockResolvedValue([]); matcher.match.mockReturnValue({ matched: true }); + const request = makeRequest({ signedBy: { allOf: ["aud-a"], anyOf: [] } }); - await service.findMatchingProviders(makeRequest()); + await service.findMatchingProviders(request); - expect(repository.getOnlineProviders).toHaveBeenCalledTimes(1); - expect(repository.getAuditedProviderAddresses).toHaveBeenCalledTimes(1); + expect(repository.findCandidates).toHaveBeenCalledTimes(1); + expect(repository.findCandidates).toHaveBeenCalledWith(expect.any(Array), request.requirements); }); - it("enriches isAudited=true for providers with matching auditor signatures", async () => { + it("threads candidate.isAudited through to the result", async () => { const { service, repository, matcher } = setup(); - repository.getOnlineProviders.mockResolvedValue([makeProvider("akash1abc"), makeProvider("akash1def")]); - repository.getAuditedProviderAddresses.mockResolvedValue(new Set(["akash1abc"])); + repository.findCandidates.mockResolvedValue([makeCandidate("akash1abc", { isAudited: true }), makeCandidate("akash1def", { isAudited: false })]); matcher.match.mockReturnValue({ matched: true }); const results = await service.findMatchingProviders(makeRequest()); - expect(results).toHaveLength(2); expect(results.find(r => r.owner === "akash1abc")?.isAudited).toBe(true); expect(results.find(r => r.owner === "akash1def")?.isAudited).toBe(false); }); }); function setup() { - const repository = mock(); + const repository = mock(); const matcher = mock(); const logger = mock(); const service = new BidScreeningService(repository, matcher, logger); @@ -112,12 +102,13 @@ describe(BidScreeningService.name, () => { } }); -function makeProvider(owner: string, hostUri = "https://provider.example.com:8443"): ProviderWithClusterState { +function makeCandidate(owner: string, overrides?: { isAudited?: boolean }): BidScreeningCandidate { return { owner, - hostUri, + hostUri: "https://provider.example.com:8443", ipRegion: "us-east", uptime7d: 0.998, + isAudited: overrides?.isAudited ?? false, cluster: { nodes: [ { diff --git a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts index e6a0fad09..4248c5a7e 100644 --- a/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts +++ b/apps/provider-inventory/src/services/bid-screening/bid-screening.service.ts @@ -1,27 +1,19 @@ -import { BadRequest } from "http-errors"; -import { inject, singleton } from "tsyringe"; +import { singleton } from "tsyringe"; import { LoggerService } from "@src/providers/logging.provider"; -import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; +import { type BidScreeningCandidate, BidScreeningRepository } from "@src/repositories/bid-screening/bid-screening.repository"; import type { GroupSpecJSON } from "../../lib/groupspec-mapper/groupspec-mapper"; import { mapGroupSpecToResourceUnits } from "../../lib/groupspec-mapper/groupspec-mapper"; import type { BidScreeningResult } from "../../types/inventory.types"; -import type { ProviderWithClusterState } from "../../types/provider"; import { ClusterInventoryMatcherService } from "../cluster-inventory-matcher/cluster-inventory-matcher.service"; -const AUDITOR = "akash1365yvmc4s7awdyj3n2sav7xfx76adc6dnmlx63"; - @singleton() export class BidScreeningService { - readonly #repository: ProviderInventoryRepository; + readonly #repository: BidScreeningRepository; readonly #matcher: ClusterInventoryMatcherService; readonly #logger: LoggerService; - constructor( - @inject(ProviderInventoryRepository) repository: ProviderInventoryRepository, - @inject(ClusterInventoryMatcherService) matcher: ClusterInventoryMatcherService, - @inject(LoggerService) logger: LoggerService - ) { + constructor(repository: BidScreeningRepository, matcher: ClusterInventoryMatcherService, logger: LoggerService) { this.#repository = repository; this.#matcher = matcher; this.#logger = logger; @@ -29,28 +21,26 @@ export class BidScreeningService { async findMatchingProviders(request: GroupSpecJSON): Promise { const startTime = Date.now(); - - this.#validateRequest(request); - const resourceUnits = mapGroupSpecToResourceUnits(request); this.#logger.info({ event: "BID_SCREENING_START", resourceGroupCount: resourceUnits.length }); - const [providers, auditedOwners] = await Promise.all([this.#repository.getOnlineProviders(), this.#repository.getAuditedProviderAddresses([AUDITOR])]); + const candidates = await this.#repository.findCandidates(resourceUnits, request.requirements); + this.#logger.debug({ event: "BID_SCREENING_CANDIDATES_FETCHED", count: candidates.length }); const results: BidScreeningResult[] = []; - for (const provider of providers) { - const matchResult = this.#matcher.match(provider.cluster, resourceUnits); + for (const candidate of candidates) { + const matchResult = this.#matcher.match(candidate.cluster, resourceUnits); if (matchResult.matched) { - results.push(this.#toResult(provider, auditedOwners.has(provider.owner))); + results.push(this.#toResult(candidate)); } } this.#logger.info({ event: "BID_SCREENING_COMPLETE", - providerCount: providers.length, + providerCount: candidates.length, matchedCount: results.length, durationMs: Date.now() - startTime }); @@ -58,34 +48,13 @@ export class BidScreeningService { return results; } - #validateRequest(request: GroupSpecJSON): void { - if (request.resources.length === 0) { - throw new BadRequest("GroupSpec must contain at least one resource unit"); - } - - for (let i = 0; i < request.resources.length; i++) { - const resource = request.resources[i].resource; - - for (let j = 0; j < resource.storage.length; j++) { - const vol = resource.storage[j]; - const isPersistent = vol.attributes.some(a => a.key === "persistent" && a.value === "true"); - const storageClass = vol.attributes.find(a => a.key === "class")?.value; - const isRam = storageClass === "ram"; - - if (isPersistent && (!storageClass || isRam)) { - throw new BadRequest(`Persistent storage volume "${vol.name}" must specify a valid storage class (not "${storageClass || "empty"}")`); - } - } - } - } - - #toResult(provider: ProviderWithClusterState, isAudited: boolean): BidScreeningResult { + #toResult(candidate: BidScreeningCandidate): BidScreeningResult { return { - owner: provider.owner, - hostUri: provider.hostUri, - region: provider.ipRegion ?? null, - uptime7d: provider.uptime7d ?? null, - isAudited + owner: candidate.owner, + hostUri: candidate.hostUri, + region: candidate.ipRegion ?? null, + uptime7d: candidate.uptime7d ?? null, + isAudited: candidate.isAudited }; } }