diff --git a/apps/provider-inventory/src/bootstrap-entry.ts b/apps/provider-inventory/src/bootstrap-entry.ts new file mode 100644 index 000000000..c90c72523 --- /dev/null +++ b/apps/provider-inventory/src/bootstrap-entry.ts @@ -0,0 +1,9 @@ +// !!! WARNING: we need this file to ensure that providers and model schemas are loaded before the entry point. +// To preserve the order of app related side-effects, use dynamic imports. But ideally, just get rid of them. +import "reflect-metadata"; +import "@akashnetwork/env-loader"; + +export async function bootstrapEntry(loadEntry: () => Promise): Promise { + await import("./providers/index.ts"); + return await loadEntry(); +} diff --git a/apps/provider-inventory/src/providers-sync-app.ts b/apps/provider-inventory/src/providers-sync-app.ts new file mode 100644 index 000000000..2a58af06f --- /dev/null +++ b/apps/provider-inventory/src/providers-sync-app.ts @@ -0,0 +1,37 @@ +import { HttpLoggerInterceptor } from "@akashnetwork/logging/hono"; +import { createOtelLogger } from "@akashnetwork/logging/otel"; +import { otel } from "@hono/otel"; +import { Hono } from "hono"; +import { container } from "tsyringe"; + +import { APP_CONFIG } from "@src/providers/app-config.provider"; +import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; +import { healthzRouter } from "@src/routes"; +import { DiscoverySchedulerService } from "@src/services/discovery-scheduler/discovery-scheduler.service"; +import { HonoErrorHandlerService } from "@src/services/hono-error-handler/hono-error-handler.service"; +import { startServer } from "@src/services/start-server/start-server"; +import type { AppEnv } from "@src/types/app-context"; + +export async function bootstrap(): Promise { + const app = new Hono(); + app.use("*", otel({ captureRequestHeaders: ["baggage"] })); + app.use(container.resolve(HttpLoggerInterceptor).intercept()); + app.route("/", healthzRouter); + app.onError(container.resolve(HonoErrorHandlerService).handle); + + const server = await startServer(app, createOtelLogger({ context: "PROVIDERS_SYNC" }), process, { + port: container.resolve(APP_CONFIG).PORT, + beforeStart: async () => { + await container.resolve(ProviderInventoryRepository).resetOnlineSince(); + } + }); + + if (server) { + try { + container.resolve(DiscoverySchedulerService).start(); + } catch (error) { + server.close(); + throw error; + } + } +} diff --git a/apps/provider-inventory/src/providers/drizzle.provider.ts b/apps/provider-inventory/src/providers/drizzle.provider.ts index 752511ad8..10f2d4fce 100644 --- a/apps/provider-inventory/src/providers/drizzle.provider.ts +++ b/apps/provider-inventory/src/providers/drizzle.provider.ts @@ -4,9 +4,14 @@ import type { InjectionToken } from "tsyringe"; import { container, instancePerContainerCachingFactory } from "tsyringe"; import { PG_CLIENT } from "@src/providers/postgres.provider"; +import * as modelsSchema from "../model-schemas"; export const DRIZZLE_DB: InjectionToken = Symbol("DRIZZLE_DB"); container.register(DRIZZLE_DB, { - useFactory: instancePerContainerCachingFactory(c => drizzle(c.resolve(PG_CLIENT))) + useFactory: instancePerContainerCachingFactory(c => + drizzle(c.resolve(PG_CLIENT), { + schema: modelsSchema + }) + ) }); diff --git a/apps/provider-inventory/src/providers/index.ts b/apps/provider-inventory/src/providers/index.ts index 091a81d87..09939ea9b 100644 --- a/apps/provider-inventory/src/providers/index.ts +++ b/apps/provider-inventory/src/providers/index.ts @@ -4,4 +4,3 @@ export * from "./logging.provider"; export * from "./postgres.provider"; export * from "./drizzle.provider"; export * from "./logger-factory.provider"; -export * from "./stream-boostrap.provider"; diff --git a/apps/provider-inventory/src/providers/stream-boostrap.provider.ts b/apps/provider-inventory/src/providers/stream-boostrap.provider.ts deleted file mode 100644 index 1e51258da..000000000 --- a/apps/provider-inventory/src/providers/stream-boostrap.provider.ts +++ /dev/null @@ -1,16 +0,0 @@ -import { container, instancePerContainerCachingFactory } from "tsyringe"; - -import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository"; -import { DiscoverySchedulerService } from "@src/services/discovery-scheduler/discovery-scheduler.service"; -import { APP_INITIALIZER, ON_APP_START } from "@src/services/start-server/app-initializer"; - -container.register(APP_INITIALIZER, { - useFactory: instancePerContainerCachingFactory(c => { - return { - async [ON_APP_START]() { - await c.resolve(ProviderInventoryRepository).resetOnlineSince(); - c.resolve(DiscoverySchedulerService).start(); - } - }; - }) -}); diff --git a/apps/provider-inventory/src/index.ts b/apps/provider-inventory/src/rest-app.ts similarity index 82% rename from apps/provider-inventory/src/index.ts rename to apps/provider-inventory/src/rest-app.ts index 4d96cb05d..5199e95c7 100644 --- a/apps/provider-inventory/src/index.ts +++ b/apps/provider-inventory/src/rest-app.ts @@ -1,6 +1,3 @@ -import "./providers"; -import "./model-schemas"; - import { HttpLoggerInterceptor } from "@akashnetwork/logging/hono"; import { createOtelLogger } from "@akashnetwork/logging/otel"; import { otel } from "@hono/otel"; @@ -13,7 +10,7 @@ import { HonoErrorHandlerService } from "@src/services/hono-error-handler/hono-e import { startServer } from "@src/services/start-server/start-server"; import type { AppEnv } from "@src/types/app-context"; -export function createApp(): Hono { +export async function bootstrap(): Promise { const app = new Hono(); app.use("*", otel({ captureRequestHeaders: ["baggage"] })); app.use(container.resolve(HttpLoggerInterceptor).intercept()); @@ -21,13 +18,7 @@ export function createApp(): Hono { app.route("/", bidScreeningRouter); app.onError(container.resolve(HonoErrorHandlerService).handle); - return app; -} - -export async function bootstrap(): Promise { - const app = createApp(); - - await startServer(app, createOtelLogger({ context: "APP" }), process, { + await startServer(app, createOtelLogger({ context: "REST" }), process, { port: container.resolve(APP_CONFIG).PORT }); } diff --git a/apps/provider-inventory/src/server.ts b/apps/provider-inventory/src/server.ts index 2affa90f1..4ed14218c 100644 --- a/apps/provider-inventory/src/server.ts +++ b/apps/provider-inventory/src/server.ts @@ -1,3 +1,93 @@ -import { bootstrap } from "./index"; +import { parentPort, Worker } from "node:worker_threads"; -void bootstrap(); +import type { RawAppConfig } from "./providers/raw-app-config.provider"; +import { bootstrapEntry } from "./bootstrap-entry"; + +const SUPPORTED_INTERFACES = ["rest", "providers-sync"]; + +bootstrap(process.env as RawAppConfig); + +async function bootstrap(rawAppConfig: RawAppConfig): Promise { + const INTERFACE = rawAppConfig.INTERFACE || "all"; + const port = parseInt(rawAppConfig.PORT?.toString() || "3092", 10) || 3092; + + if (INTERFACE === "all") { + const workers = SUPPORTED_INTERFACES.map((interfaceName, index) => bootstrapInWorker({ PORT: String(port + index), INTERFACE: interfaceName })); + await Promise.all(workers.map(w => w.ready)); + + const forwardSignal = (signal: string) => { + workers.forEach(w => w.ref.postMessage(signal)); + }; + process.on("SIGTERM", () => forwardSignal("SIGTERM")); + process.on("SIGINT", () => forwardSignal("SIGINT")); + + await Promise.all(workers.map(w => w.exited)); + return; + } + + if (parentPort) { + parentPort.on("message", (code: string) => { + if (code === "SIGTERM" || code === "SIGINT") { + parentPort?.unref(); + process.emit(code, code); + } + }); + } + + let appModule: { bootstrap: () => Promise }; + switch (INTERFACE) { + case "rest": + appModule = await bootstrapEntry(() => import("./rest-app.ts")); + break; + case "providers-sync": + appModule = await bootstrapEntry(() => import("./providers-sync-app.ts")); + break; + default: + throw new Error(`Received invalid interface: ${INTERFACE}. Valid values: ${SUPPORTED_INTERFACES.join(", ")}`); + } + + await appModule.bootstrap(); + parentPort?.postMessage("ready"); +} + +function bootstrapInWorker({ PORT, INTERFACE }: RawAppConfig) { + const ref = new Worker(__filename, { + env: { + ...process.env, + PORT: String(PORT), + INTERFACE: String(INTERFACE) + } + }); + + const ready = new Promise((resolve, reject) => { + const onMessage = (m: unknown) => { + if (m === "ready") { + cleanup(); + resolve(); + } + }; + const onError = (err: Error) => { + cleanup(); + reject(err); + }; + const onExit = (code: number) => { + cleanup(); + reject(new Error(`[${INTERFACE}] exited with code ${code} before ready`)); + }; + const cleanup = () => { + ref.off("message", onMessage); + ref.off("error", onError); + ref.off("exit", onExit); + }; + ref.on("message", onMessage); + ref.once("error", onError); + ref.once("exit", onExit); + }); + + const exited = new Promise(resolve => { + ref.once("exit", resolve); + ref.once("error", resolve); + }); + + return { ref, ready, exited }; +} diff --git a/apps/provider-inventory/src/services/start-server/start-server.spec.ts b/apps/provider-inventory/src/services/start-server/start-server.spec.ts index a85deebfe..56853bb3a 100644 --- a/apps/provider-inventory/src/services/start-server/start-server.spec.ts +++ b/apps/provider-inventory/src/services/start-server/start-server.spec.ts @@ -167,6 +167,7 @@ describe("startServer", () => { const logger = mock(); const processEvents = new EventEmitter(); const container = mock({ + isRegistered: vi.fn().mockReturnValue(true), resolveAll: vi.fn().mockReturnValue(input?.initializers ?? []) }); diff --git a/apps/provider-inventory/src/services/start-server/start-server.ts b/apps/provider-inventory/src/services/start-server/start-server.ts index 10b347624..bb823e100 100644 --- a/apps/provider-inventory/src/services/start-server/start-server.ts +++ b/apps/provider-inventory/src/services/start-server/start-server.ts @@ -44,7 +44,9 @@ export async function startServer( }); try { await options.beforeStart?.(); - await Promise.all(container.resolveAll(APP_INITIALIZER).map(initializer => initializer[ON_APP_START]())); + if (container.isRegistered(APP_INITIALIZER, true)) { + await Promise.all(container.resolveAll(APP_INITIALIZER).map(initializer => initializer[ON_APP_START]())); + } logger.info({ event: "SERVER_STARTING", url: `http://localhost:${options.port}`, NODE_OPTIONS: process.env.NODE_OPTIONS }); server = serve({ diff --git a/apps/provider-inventory/tsup.config.ts b/apps/provider-inventory/tsup.config.ts index 555305fbd..92ac87337 100644 --- a/apps/provider-inventory/tsup.config.ts +++ b/apps/provider-inventory/tsup.config.ts @@ -13,13 +13,15 @@ export default defineConfig(async overrideOptions => prependEffectsToEntries: ["reflect-metadata", "@akashnetwork/env-loader"], entry: { server: "./src/server.ts", + "rest-app": "./src/rest-app.ts", + "providers-sync-app": "./src/providers-sync-app.ts", instrumentation: fileURLToPath(import.meta.resolve("@akashnetwork/instrumentation/register")) }, target: tsconfig.compilerOptions.target, tsconfig: "tsconfig.build.json", external: ["pino-pretty"], dts: false, - onSuccess: overrideOptions.watch && !isProduction ? "npm run prod" : undefined, + onSuccess: overrideOptions.watch && !isProduction ? "NODE_OPTIONS='--allow-worker' npm run prod" : undefined, ...overrideOptions }) );