Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/provider-inventory/src/bootstrap-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// !!! WARNING: we need this file to ensure that providers and model schemas are loaded before the entry point.
// To preserve the order of app related side-effects, use dynamic imports. But ideally, just get rid of them.
import "reflect-metadata";
import "@akashnetwork/env-loader";

export async function bootstrapEntry<T>(loadEntry: () => Promise<T>): Promise<T> {
await import("./providers/index.ts");
Comment thread
stalniy marked this conversation as resolved.
return await loadEntry();
}
37 changes: 37 additions & 0 deletions apps/provider-inventory/src/providers-sync-app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { HttpLoggerInterceptor } from "@akashnetwork/logging/hono";
import { createOtelLogger } from "@akashnetwork/logging/otel";
import { otel } from "@hono/otel";
import { Hono } from "hono";
import { container } from "tsyringe";

import { APP_CONFIG } from "@src/providers/app-config.provider";
import { ProviderInventoryRepository } from "@src/repositories/provider-inventory/provider-inventory.repository";
import { healthzRouter } from "@src/routes";
import { DiscoverySchedulerService } from "@src/services/discovery-scheduler/discovery-scheduler.service";
import { HonoErrorHandlerService } from "@src/services/hono-error-handler/hono-error-handler.service";
import { startServer } from "@src/services/start-server/start-server";
import type { AppEnv } from "@src/types/app-context";

export async function bootstrap(): Promise<void> {
const app = new Hono<AppEnv>();
app.use("*", otel({ captureRequestHeaders: ["baggage"] }));
app.use(container.resolve(HttpLoggerInterceptor).intercept());
app.route("/", healthzRouter);
app.onError(container.resolve(HonoErrorHandlerService).handle);

const server = await startServer(app, createOtelLogger({ context: "PROVIDERS_SYNC" }), process, {
port: container.resolve(APP_CONFIG).PORT,
beforeStart: async () => {
await container.resolve(ProviderInventoryRepository).resetOnlineSince();
}
});

if (server) {
try {
container.resolve(DiscoverySchedulerService).start();
} catch (error) {
server.close();
throw error;
}
}
Comment thread
stalniy marked this conversation as resolved.
}
7 changes: 6 additions & 1 deletion apps/provider-inventory/src/providers/drizzle.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import type { InjectionToken } from "tsyringe";
import { container, instancePerContainerCachingFactory } from "tsyringe";

import { PG_CLIENT } from "@src/providers/postgres.provider";
import * as modelsSchema from "../model-schemas";
Comment thread
stalniy marked this conversation as resolved.

export const DRIZZLE_DB: InjectionToken<PostgresJsDatabase> = Symbol("DRIZZLE_DB");

container.register(DRIZZLE_DB, {
useFactory: instancePerContainerCachingFactory(c => drizzle(c.resolve(PG_CLIENT)))
useFactory: instancePerContainerCachingFactory(c =>
drizzle(c.resolve(PG_CLIENT), {
schema: modelsSchema
})
)
});
1 change: 0 additions & 1 deletion apps/provider-inventory/src/providers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ export * from "./logging.provider";
export * from "./postgres.provider";
export * from "./drizzle.provider";
export * from "./logger-factory.provider";
export * from "./stream-boostrap.provider";

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import "./providers";
import "./model-schemas";

import { HttpLoggerInterceptor } from "@akashnetwork/logging/hono";
import { createOtelLogger } from "@akashnetwork/logging/otel";
import { otel } from "@hono/otel";
Expand All @@ -13,21 +10,15 @@ import { HonoErrorHandlerService } from "@src/services/hono-error-handler/hono-e
import { startServer } from "@src/services/start-server/start-server";
import type { AppEnv } from "@src/types/app-context";

export function createApp(): Hono<AppEnv> {
export async function bootstrap(): Promise<void> {
const app = new Hono<AppEnv>();
app.use("*", otel({ captureRequestHeaders: ["baggage"] }));
app.use(container.resolve(HttpLoggerInterceptor).intercept());
app.route("/", healthzRouter);
app.route("/", bidScreeningRouter);
app.onError(container.resolve(HonoErrorHandlerService).handle);

return app;
}

export async function bootstrap(): Promise<void> {
const app = createApp();

await startServer(app, createOtelLogger({ context: "APP" }), process, {
await startServer(app, createOtelLogger({ context: "REST" }), process, {
port: container.resolve(APP_CONFIG).PORT
});
}
94 changes: 92 additions & 2 deletions apps/provider-inventory/src/server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,93 @@
import { bootstrap } from "./index";
import { parentPort, Worker } from "node:worker_threads";

void bootstrap();
import type { RawAppConfig } from "./providers/raw-app-config.provider";
import { bootstrapEntry } from "./bootstrap-entry";

const SUPPORTED_INTERFACES = ["rest", "providers-sync"];

bootstrap(process.env as RawAppConfig);

async function bootstrap(rawAppConfig: RawAppConfig): Promise<void> {
const INTERFACE = rawAppConfig.INTERFACE || "all";
const port = parseInt(rawAppConfig.PORT?.toString() || "3092", 10) || 3092;

if (INTERFACE === "all") {
const workers = SUPPORTED_INTERFACES.map((interfaceName, index) => bootstrapInWorker({ PORT: String(port + index), INTERFACE: interfaceName }));
Comment thread
stalniy marked this conversation as resolved.
await Promise.all(workers.map(w => w.ready));

const forwardSignal = (signal: string) => {
workers.forEach(w => w.ref.postMessage(signal));
};
process.on("SIGTERM", () => forwardSignal("SIGTERM"));
process.on("SIGINT", () => forwardSignal("SIGINT"));

await Promise.all(workers.map(w => w.exited));
return;
}

if (parentPort) {
parentPort.on("message", (code: string) => {
if (code === "SIGTERM" || code === "SIGINT") {
parentPort?.unref();
process.emit(code, code);
}
});
}

let appModule: { bootstrap: () => Promise<void> };
switch (INTERFACE) {
case "rest":
appModule = await bootstrapEntry(() => import("./rest-app.ts"));
break;
case "providers-sync":
appModule = await bootstrapEntry(() => import("./providers-sync-app.ts"));
break;
default:
throw new Error(`Received invalid interface: ${INTERFACE}. Valid values: ${SUPPORTED_INTERFACES.join(", ")}`);
}

await appModule.bootstrap();
parentPort?.postMessage("ready");
}

function bootstrapInWorker({ PORT, INTERFACE }: RawAppConfig) {
const ref = new Worker(__filename, {
env: {
...process.env,
PORT: String(PORT),
INTERFACE: String(INTERFACE)
}
});

const ready = new Promise<void>((resolve, reject) => {
const onMessage = (m: unknown) => {
if (m === "ready") {
cleanup();
resolve();
}
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const onExit = (code: number) => {
cleanup();
reject(new Error(`[${INTERFACE}] exited with code ${code} before ready`));
};
const cleanup = () => {
ref.off("message", onMessage);
ref.off("error", onError);
ref.off("exit", onExit);
};
ref.on("message", onMessage);
ref.once("error", onError);
ref.once("exit", onExit);
});

const exited = new Promise<void>(resolve => {
ref.once("exit", resolve);
ref.once("error", resolve);
});
Comment thread
stalniy marked this conversation as resolved.

return { ref, ready, exited };
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ describe("startServer", () => {
const logger = mock<LoggerService>();
const processEvents = new EventEmitter();
const container = mock<DependencyContainer>({
isRegistered: vi.fn().mockReturnValue(true),
resolveAll: vi.fn().mockReturnValue(input?.initializers ?? [])
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export async function startServer(
});
try {
await options.beforeStart?.();
await Promise.all(container.resolveAll(APP_INITIALIZER).map(initializer => initializer[ON_APP_START]()));
if (container.isRegistered(APP_INITIALIZER, true)) {
await Promise.all(container.resolveAll(APP_INITIALIZER).map(initializer => initializer[ON_APP_START]()));
}

logger.info({ event: "SERVER_STARTING", url: `http://localhost:${options.port}`, NODE_OPTIONS: process.env.NODE_OPTIONS });
server = serve({
Expand Down
4 changes: 3 additions & 1 deletion apps/provider-inventory/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ export default defineConfig(async overrideOptions =>
prependEffectsToEntries: ["reflect-metadata", "@akashnetwork/env-loader"],
entry: {
server: "./src/server.ts",
"rest-app": "./src/rest-app.ts",
"providers-sync-app": "./src/providers-sync-app.ts",
instrumentation: fileURLToPath(import.meta.resolve("@akashnetwork/instrumentation/register"))
},
target: tsconfig.compilerOptions.target,
tsconfig: "tsconfig.build.json",
external: ["pino-pretty"],
dts: false,
onSuccess: overrideOptions.watch && !isProduction ? "npm run prod" : undefined,
onSuccess: overrideOptions.watch && !isProduction ? "NODE_OPTIONS='--allow-worker' npm run prod" : undefined,
Comment thread
stalniy marked this conversation as resolved.
...overrideOptions
})
);
Loading