feat(provider): implement streamer walking skeleton with end-to-end write path#3153
feat(provider): implement streamer walking skeleton with end-to-end write path#3153
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds typed stream/provider/inventory models, env schema (PORT, DISCOVERY_INTERVAL_MS), DI providers, JSONB bigint support, aggregation/projection utilities, polling/scheduling, streaming ingestion and DB writer, bootstrap/start-server wiring, and Vitest tests. ChangesProvider Inventory Discovery & Streaming
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3153 +/- ##
==========================================
- Coverage 62.82% 62.14% -0.69%
==========================================
Files 1066 1036 -30
Lines 25286 24461 -825
Branches 6234 6089 -145
==========================================
- Hits 15887 15202 -685
+ Misses 8211 8083 -128
+ Partials 1188 1176 -12
*This pull request uses carry forward flags. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/provider-inventory/src/config/env.config.ts`:
- Around line 9-10: Replace the incorrect z.number({ coerce: true }) usage with
z.coerce.number() for both PORT and DISCOVERY_INTERVAL_MS and add explicit
bounds checks: for PORT use .int().min(1).max(65535) and keep the default 3092;
for DISCOVERY_INTERVAL_MS use .int().min(1) (or a sensible lower bound like
1000) and keep the default 600_000; update the schema entries named PORT and
DISCOVERY_INTERVAL_MS accordingly so coercion and valid-range validation occur
before defaults are applied.
In `@apps/provider-inventory/src/providers/provider-stream.provider.ts`:
- Around line 16-18: The current async generator openStatusStream in the
ProviderStreamFactory returns immediately and yields nothing; change it to fail
fast when no real stream client is configured by throwing a descriptive error
from openStatusStream (e.g., "Status stream not configured for provider:
<providerId>") instead of silently completing, so consumers will surface the
misconfiguration; update the implementation referenced as openStatusStream in
provider-stream.provider.ts to throw synchronously (or throw at generator start)
with clear context until a real streaming client is wired.
In
`@apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts`:
- Around line 10-16: Remove the top-level beforeEach/afterEach timer hooks and
replace them with a setup-scoped initializer: call setup in the spec to invoke
vi.useFakeTimers() during test setup and register a teardown from that setup to
call vi.useRealTimers(); update discovery-scheduler.service.spec.ts to remove
beforeEach/afterEach and use setup plus vi.useFakeTimers and vi.useRealTimers to
ensure timers are scoped to each test setup.
In
`@apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts`:
- Around line 21-25: The tick loop can reschedule itself after a stop/start
because an in-flight tick doesn't detect that it belongs to a previous run;
modify the start()/stop()/tick() logic to use a run identifier: add a numeric
runId property incremented in start() (and cleared/changed in stop()), capture
the current runId at the top of tick() and before any setTimeout scheduling, and
only schedule the next timer if this.runId still matches the captured value and
this.running is true; update references to running/timer scheduling inside
tick() to gate on that runId check so stale ticks cannot reschedule a new loop.
In
`@apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts`:
- Around line 48-49: The upsert currently overwrites isOnlineSince with
rawSql`now()` on every conflict; change the upsert so isOnlineSince is set only
on insert or, in the conflict UPDATE, preserved when an existing non-null value
exists (e.g., use COALESCE(existing.is_online_since, now()) or only include
isOnlineSince in the INSERT clause and omit it from the UPDATE clause). Update
the upsert logic where isOnlineSince: rawSql`now()` is used (and the analogous
place at the later occurrence) so existing non-null isOnlineSince is retained on
conflict instead of being reset.
- Around line 15-19: The field "db" is currently untyped (private readonly db;)
causing implicit any; update the declaration to an explicit type (e.g., private
readonly db: ReturnType<typeof drizzle> or a specific Drizzle DB type imported
from drizzle-orm) and ensure any required type import is added; keep the
constructor as-is (constructor(`@inject`(PG_CLIENT) sql: postgres.Sql) { this.db =
drizzle(sql); }) but adjust the field type to match the return type of drizzle
so all uses of db in this class (provider-inventory-writer.service.ts) are
strongly typed.
In
`@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts`:
- Around line 21-25: reconcile currently only starts streams for new providers
and never stops streams for providers that disappeared; update reconcile in
StreamLifecycleManagerService to compute the set of current provider.owner
values from the incoming providers array, then iterate this.activeStreams keys
and call the existing stopStream (or equivalent stream-teardown method) for any
owner not present in that set before/after starting new streams; ensure you
remove the stopped owner from this.activeStreams so stale streams are not kept.
- Around line 39-44: The loop currently lets a single upsert error break the
entire stream; wrap the per-message write in its own try/catch inside the
for-await loop (around the call to this.writer.upsertProvider(provider.owner,
provider, row, attributes)) so that when projectRow(message) produces row and
the upsert fails you catch the error, log it (using the service logger) with
context (provider id, message id/summary) and continue to the next message
without rethrowing; keep the outer try/catch for fatal stream-level errors only.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: edf106a6-70ec-4e97-8feb-5e97b2e88651
📒 Files selected for processing (19)
apps/provider-inventory/src/config/env.config.tsapps/provider-inventory/src/index.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.tsapps/provider-inventory/src/lib/project-row/project-row.spec.tsapps/provider-inventory/src/lib/project-row/project-row.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.spec.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.tsapps/provider-inventory/src/providers/chain-query.provider.tsapps/provider-inventory/src/providers/index.tsapps/provider-inventory/src/providers/provider-stream.provider.tsapps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.tsapps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.tsapps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.tsapps/provider-inventory/src/types/chain-provider.tsapps/provider-inventory/src/types/inventory.tsapps/provider-inventory/src/types/stream-status.ts
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.spec.ts`:
- Line 3: Replace the relative import of the module under test with the repo
path alias: update the import that currently pulls "./reduce-attributes" to use
the `@src/`* alias instead (importing the same export reduceAttributes from the
aliased module), so the test file reduce-attributes.spec.ts references the
source via `@src/`... and conforms to the apps/**/*.ts rule.
In `@apps/provider-inventory/src/server.ts`:
- Line 3: Replace the fire-and-forget call to bootstrap() with proper promise
handling: call bootstrap().catch(...) and in the catch handler log the error
(use LoggerService if available, otherwise console.error) and call
process.exit(1) to ensure startup failures don't leave unhandled rejections or a
hanging process; reference the bootstrap function name so the change wraps the
existing bootstrap invocation in a .catch that logs the thrown error and exits.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a4781f56-72ce-4017-84ac-ca6945c6f9e2
📒 Files selected for processing (19)
apps/provider-inventory/env/.envapps/provider-inventory/src/config/env.config.tsapps/provider-inventory/src/index.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.tsapps/provider-inventory/src/lib/project-row/project-row.spec.tsapps/provider-inventory/src/lib/project-row/project-row.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.spec.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.tsapps/provider-inventory/src/providers/drizzle.provider.tsapps/provider-inventory/src/providers/index.tsapps/provider-inventory/src/providers/logger-factory.provider.tsapps/provider-inventory/src/server.tsapps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.tsapps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.tsapps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.tsapps/provider-inventory/src/types/inventory.ts
💤 Files with no reviewable changes (1)
- apps/provider-inventory/env/.env
✅ Files skipped from review due to trivial changes (9)
- apps/provider-inventory/src/providers/index.ts
- apps/provider-inventory/src/lib/project-row/project-row.ts
- apps/provider-inventory/src/providers/drizzle.provider.ts
- apps/provider-inventory/src/providers/logger-factory.provider.ts
- apps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.ts
- apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts
- apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts
- apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts
- apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts
- apps/provider-inventory/src/types/inventory.ts
7de5708 to
6fa8d2f
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts (2)
48-53:⚠️ Potential issue | 🟠 Major | ⚡ Quick winA single write failure currently terminates the whole provider stream.
projectRow/upsertProvidererrors should be isolated per message so transient failures do not kill the stream.Suggested fix
for await (const message of stream) { if (signal.aborted) break; - const row = projectRow(message); - await this.#writer.upsertProvider(provider.owner, provider, row, attributes); + try { + const row = projectRow(message); + await this.#writer.upsertProvider(provider.owner, provider, row, attributes); + } catch (error) { + if (!signal.aborted) { + this.#logger.error({ + event: "STREAM_WRITE_ERROR", + owner: provider.owner, + error + }); + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts` around lines 48 - 53, The loop currently lets any error from projectRow or this.#writer.upsertProvider abort the entire stream; wrap the per-message processing inside its own try/catch so failures are handled per message (e.g., try { const row = projectRow(message); await this.#writer.upsertProvider(provider.owner, provider, row, attributes); } catch (err) { /* log with context (provider, message id/offset) and continue */ }), ensuring you do not rethrow the error so the for-await loop continues; reference projectRow and this.#writer.upsertProvider in StreamLifecycleManagerService to locate where to add the per-message try/catch and contextual logging.
30-35:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
reconciledoes not close streams for owners that disappeared.This keeps stale streams alive indefinitely and can write outdated data until process shutdown.
Suggested fix
reconcile(providers: ChainProvider[]): void { + const nextOwners = new Set(providers.map(provider => provider.owner)); + + for (const [owner, controller] of this.#activeStreams) { + if (!nextOwners.has(owner)) { + controller.abort(); + this.#activeStreams.delete(owner); + this.#logger.info({ event: "STREAM_CLOSED", owner }); + } + } + for (const provider of providers) { if (this.#activeStreams.has(provider.owner)) continue; this.#startStream(provider); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts` around lines 30 - 35, The reconcile method only starts missing streams but never stops streams for owners no longer present; update reconcile (in stream-lifecycle-manager.service.ts) to compute the set of current provider owners from the providers array, then iterate this.#activeStreams keys and call the existing stop/close method (e.g., `#stopStream` or stopStream) for any owner not in that set before starting new ones; ensure you remove stopped owners from this.#activeStreams to avoid stale entries and avoid starting duplicates by keeping the existing has-check and `#startStream` call.apps/provider-inventory/src/server.ts (1)
3-3:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winHandle
bootstrap()rejection instead of fire-and-forget invocation.Line 3 discards the promise; if startup fails, the rejection is unhandled and failure signaling becomes unreliable. Please attach a
.catch(...)path and terminate the process explicitly on fatal bootstrap errors.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/server.ts` at line 3, The bootstrap() promise is being ignored; change the fire-and-forget invocation of bootstrap() so its rejection is handled by attaching a .catch handler that logs the error (e.g., console.error or the app logger) and then calls process.exit(1) to signal a fatal startup failure; locate the bootstrap() call and replace the bare invocation with a promise chain (bootstrap().catch(...)) to ensure unhandled rejections terminate the process.apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts (1)
11-17: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winMove fake timer lifecycle into
setup()instead of top-level hooks.
beforeEach/afterEachhere still introduces shared lifecycle state in a service-level spec; please scope fake timers throughsetup()per test.As per coding guidelines
**/*.spec.ts: “Check that tests use a setup() function (not beforeEach with shared mutable state).”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts` around lines 11 - 17, Remove the top-level beforeEach/afterEach that call vi.useFakeTimers()/vi.useRealTimers() and instead invoke vi.useFakeTimers() inside the per-test setup() function used by this spec, and restore timers with vi.useRealTimers() in the corresponding teardown (e.g., by calling vi.useRealTimers() at the end of setup or returning a cleanup function from setup that runs after each test); specifically replace the shared beforeEach/afterEach hooks with per-test calls to vi.useFakeTimers()/vi.useRealTimers() inside setup() to avoid shared mutable lifecycle state.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/provider-inventory/src/server.ts`:
- Line 1: Replace the relative import of bootstrap in server.ts with the backend
path alias; specifically change the import of the symbol "bootstrap" that
currently comes from "./index" to use the `@src/`* alias (e.g. import { bootstrap
} from "@src/index" or the appropriate `@src` barrel) so the file follows the
apps/** coding guideline.
In
`@apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts`:
- Line 8: The test imports DiscoverySchedulerService via a relative path; change
the import to use the backend path alias instead (replace the relative import of
DiscoverySchedulerService in discovery-scheduler.service.spec.ts with the
appropriate `@src/`* alias form, e.g. import { DiscoverySchedulerService } from
"@src/services/discovery-scheduler/discovery-scheduler.service"), ensuring the
module path matches the service's location and the project's tsconfig path
mappings.
---
Duplicate comments:
In `@apps/provider-inventory/src/server.ts`:
- Line 3: The bootstrap() promise is being ignored; change the fire-and-forget
invocation of bootstrap() so its rejection is handled by attaching a .catch
handler that logs the error (e.g., console.error or the app logger) and then
calls process.exit(1) to signal a fatal startup failure; locate the bootstrap()
call and replace the bare invocation with a promise chain
(bootstrap().catch(...)) to ensure unhandled rejections terminate the process.
In
`@apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.ts`:
- Around line 11-17: Remove the top-level beforeEach/afterEach that call
vi.useFakeTimers()/vi.useRealTimers() and instead invoke vi.useFakeTimers()
inside the per-test setup() function used by this spec, and restore timers with
vi.useRealTimers() in the corresponding teardown (e.g., by calling
vi.useRealTimers() at the end of setup or returning a cleanup function from
setup that runs after each test); specifically replace the shared
beforeEach/afterEach hooks with per-test calls to
vi.useFakeTimers()/vi.useRealTimers() inside setup() to avoid shared mutable
lifecycle state.
In
`@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts`:
- Around line 48-53: The loop currently lets any error from projectRow or
this.#writer.upsertProvider abort the entire stream; wrap the per-message
processing inside its own try/catch so failures are handled per message (e.g.,
try { const row = projectRow(message); await
this.#writer.upsertProvider(provider.owner, provider, row, attributes); } catch
(err) { /* log with context (provider, message id/offset) and continue */ }),
ensuring you do not rethrow the error so the for-await loop continues; reference
projectRow and this.#writer.upsertProvider in StreamLifecycleManagerService to
locate where to add the per-message try/catch and contextual logging.
- Around line 30-35: The reconcile method only starts missing streams but never
stops streams for owners no longer present; update reconcile (in
stream-lifecycle-manager.service.ts) to compute the set of current provider
owners from the providers array, then iterate this.#activeStreams keys and call
the existing stop/close method (e.g., `#stopStream` or stopStream) for any owner
not in that set before starting new ones; ensure you remove stopped owners from
this.#activeStreams to avoid stale entries and avoid starting duplicates by
keeping the existing has-check and `#startStream` call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 4149752d-5887-452f-a661-fdee1844668f
📒 Files selected for processing (19)
apps/provider-inventory/env/.envapps/provider-inventory/src/config/env.config.tsapps/provider-inventory/src/index.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.tsapps/provider-inventory/src/lib/compute-rollups/compute-rollups.tsapps/provider-inventory/src/lib/project-row/project-row.spec.tsapps/provider-inventory/src/lib/project-row/project-row.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.spec.tsapps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.tsapps/provider-inventory/src/providers/drizzle.provider.tsapps/provider-inventory/src/providers/index.tsapps/provider-inventory/src/providers/logger-factory.provider.tsapps/provider-inventory/src/server.tsapps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.spec.tsapps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.tsapps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.tsapps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.tsapps/provider-inventory/src/types/inventory.ts
💤 Files with no reviewable changes (1)
- apps/provider-inventory/env/.env
✅ Files skipped from review due to trivial changes (13)
- apps/provider-inventory/src/providers/index.ts
- apps/provider-inventory/src/providers/drizzle.provider.ts
- apps/provider-inventory/src/lib/project-row/project-row.ts
- apps/provider-inventory/src/services/chain-provider-poller/chain-provider-poller.service.ts
- apps/provider-inventory/src/providers/logger-factory.provider.ts
- apps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.ts
- apps/provider-inventory/src/lib/reduce-attributes/reduce-attributes.spec.ts
- apps/provider-inventory/src/config/env.config.ts
- apps/provider-inventory/src/services/discovery-scheduler/discovery-scheduler.service.ts
- apps/provider-inventory/src/lib/project-row/project-row.spec.ts
- apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts
- apps/provider-inventory/src/lib/compute-rollups/compute-rollups.spec.ts
- apps/provider-inventory/src/types/inventory.ts
005d54c to
91e64a3
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
apps/provider-inventory/src/services/start-server/start-server.ts (1)
10-10: ⚡ Quick winUse
@src/*alias for internal backend imports.Line 10 uses a relative path in
apps/**/*.ts; switch it to the configured source alias.Suggested fix
-import { shutdownServer } from "../shutdown-server/shutdown-server"; +import { shutdownServer } from "@src/services/shutdown-server/shutdown-server";As per coding guidelines,
apps/**/*.ts: Use path aliases@src/*for source files and@test/*for test files in backend applications.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/services/start-server/start-server.ts` at line 10, Replace the relative import of shutdownServer in start-server.ts with the configured source alias: locate the import line importing shutdownServer (import { shutdownServer } from "../shutdown-server/shutdown-server") and change it to use the `@src/`* alias (e.g., import { shutdownServer } from "@src/services/shutdown-server/shutdown-server") so the module resolution follows the backend apps guideline for using `@src/`* for internal imports.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts`:
- Around line 62-64: clamp currently calls BigInt(value) directly and throws for
NaN/Infinity/fractional numbers; update the clamp function to validate and
normalize the numeric input: if value is not a finite number return 0n;
otherwise truncate the fractional part (e.g. Math.trunc(value)), then return 0n
for negative results or BigInt(truncated) for non-negative results. Apply this
change inside the clamp function so callers (e.g. where available: number is
passed) get a safe integer-to-BigInt conversion.
In `@apps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.ts`:
- Around line 1-3: The current fixed sentinel causes silent collisions and
JSON.stringify can return undefined; update the implementation so
BIGINT_SENTINEL is generated at runtime (e.g. a per-process unpredictable token
such as a uuid or random nonce) and rebuild BIGINT_PATTERN from that escaped
token so legitimate user strings can't collide, and in serializeJsonb guard
against JSON.stringify returning undefined (if json === undefined return
undefined or handle appropriately) before calling .replace; adjust references to
BIGINT_SENTINEL, BIGINT_PATTERN and serializeJsonb accordingly.
In `@apps/provider-inventory/src/services/shutdown-server/shutdown-server.ts`:
- Around line 14-17: The current call Promise.resolve(onShutdown?.()) can throw
synchronously if onShutdown throws, preventing the promise chain and resolve
from running; change the invocation to start a resolved promise and call
onShutdown inside then (e.g. Promise.resolve().then(() => onShutdown?.())) so
any synchronous throw becomes a rejected promise that will be handled by
.catch(), and ensure you keep .finally() (or call resolve() in finally) to
always resolve the outer shutdown Promise; update the chain around onShutdown in
shutdown-server.ts accordingly.
In `@apps/provider-inventory/src/services/start-server/start-server.ts`:
- Line 19: Replace the untyped Hono any usage: change the parameter type from
Hono<any> to Hono<AppEnv> in startServer (or the exported function that accepts
app) and add the AppEnv import used across the repo; specifically update the
function signature referencing Hono<any> to Hono<AppEnv> and import the AppEnv
type (same symbol used in apps/provider-inventory/src/index.ts) so the signature
and imports are consistent with the codebase’s type-safe pattern.
---
Nitpick comments:
In `@apps/provider-inventory/src/services/start-server/start-server.ts`:
- Line 10: Replace the relative import of shutdownServer in start-server.ts with
the configured source alias: locate the import line importing shutdownServer
(import { shutdownServer } from "../shutdown-server/shutdown-server") and change
it to use the `@src/`* alias (e.g., import { shutdownServer } from
"@src/services/shutdown-server/shutdown-server") so the module resolution
follows the backend apps guideline for using `@src/`* for internal imports.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 5f8cb24a-0fe4-4381-9ee1-b0d70c2cf098
📒 Files selected for processing (7)
apps/provider-inventory/src/lib/compute-rollups/compute-rollups.tsapps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.spec.tsapps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.tsapps/provider-inventory/src/providers/postgres.provider.tsapps/provider-inventory/src/services/hono-error-handler/hono-error-handler.service.tsapps/provider-inventory/src/services/shutdown-server/shutdown-server.tsapps/provider-inventory/src/services/start-server/start-server.ts
✅ Files skipped from review due to trivial changes (1)
- apps/provider-inventory/src/lib/jsonb-bigint/jsonb-bigint.spec.ts
3ebad7f to
6886338
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts (1)
62-65:⚠️ Potential issue | 🟠 Major | ⚡ Quick winGuard
BigInt()from fractional numbers to prevent runtime throws.Line 64 can still throw when
valueis finite/positive but non-integer (e.g.,1.5). That would crash rollup computation on malformed numeric input.Suggested fix
function clamp(value: number): bigint { if (!Number.isFinite(value) || value <= 0) return 0n; - return BigInt(value); + const normalized = Math.trunc(value); + return normalized <= 0 ? 0n : BigInt(normalized); }#!/bin/bash set -euo pipefail # Verify whether upstream schema guarantees integer `available` values. fd -i 'inventory*.ts' apps/provider-inventory/src rg -n -C3 '\bavailable\b|z\.number\(|z\.coerce\.number\(|\.int\(' apps/provider-inventory/src # Reconfirm all BigInt conversion sites in rollups. rg -n -C3 '\bclamp\s*\(|\bBigInt\s*\(' apps/provider-inventory/src/lib/compute-rollups🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts` around lines 62 - 65, The clamp function currently calls BigInt(value) which will throw for finite positive non-integer inputs (e.g., 1.5); update clamp to guard against fractional numbers by checking Number.isFinite and Number.isInteger (or use Math.trunc/Math.floor to coerce safely) before converting to BigInt so malformed fractional inputs don't crash rollup computation—modify the clamp function's BigInt conversion logic (the BigInt(...) call inside clamp) to either return 0n for non-integers or convert via a truncation step depending on the intended semantics.apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts (1)
58-64:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
projectRowfailures still terminate the whole stream.Line 60 is outside the per-message
try/catch, so a malformed message can still break the outer stream loop instead of being isolated and skipped.Suggested fix
for await (const message of stream) { if (signal.aborted) break; - const row = projectRow(message); try { + const row = projectRow(message); await this.#writer.upsertProvider(provider.owner, provider, row, attributes); } catch (error) { - this.#logger.error({ event: "STREAM_PROVIDER_WRITE_ERROR", owner: provider.owner, error }); + if (!signal.aborted) { + this.#logger.error({ event: "STREAM_PROVIDER_WRITE_ERROR", owner: provider.owner, error }); + } } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts` around lines 58 - 64, projectRow is being called outside the per-message try/catch so a malformed message can throw and break the outer for-await loop; wrap the projectRow(...) call in the same try/catch that surrounds this.#writer.upsertProvider(...) (or add a small try/catch just for projectRow) so any error from projectRow is caught, logged via this.#logger.error (include event: "STREAM_PROVIDER_PARSE_ERROR" and owner: provider.owner), and the loop continues (respecting signal.aborted) instead of terminating the stream.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts`:
- Around line 50-53: The cleanup in async method `#runStream` currently removes
entries from `#activeStreams` by owner unconditionally, which can remove a newer
controller if an older aborted stream finishes later; change the finalizer so it
only deletes when the stored controller for that owner is the same controller
instance being finalized (e.g., check if this.#activeStreams.get(owner) ===
controller before calling delete) to avoid removing a newer active controller.
---
Duplicate comments:
In `@apps/provider-inventory/src/lib/compute-rollups/compute-rollups.ts`:
- Around line 62-65: The clamp function currently calls BigInt(value) which will
throw for finite positive non-integer inputs (e.g., 1.5); update clamp to guard
against fractional numbers by checking Number.isFinite and Number.isInteger (or
use Math.trunc/Math.floor to coerce safely) before converting to BigInt so
malformed fractional inputs don't crash rollup computation—modify the clamp
function's BigInt conversion logic (the BigInt(...) call inside clamp) to either
return 0n for non-integers or convert via a truncation step depending on the
intended semantics.
In
`@apps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts`:
- Around line 58-64: projectRow is being called outside the per-message
try/catch so a malformed message can throw and break the outer for-await loop;
wrap the projectRow(...) call in the same try/catch that surrounds
this.#writer.upsertProvider(...) (or add a small try/catch just for projectRow)
so any error from projectRow is caught, logged via this.#logger.error (include
event: "STREAM_PROVIDER_PARSE_ERROR" and owner: provider.owner), and the loop
continues (respecting signal.aborted) instead of terminating the stream.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ae8cc2c0-4f0c-46db-adf6-6a80116a1b18
📒 Files selected for processing (4)
apps/provider-inventory/src/lib/compute-rollups/compute-rollups.tsapps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.tsapps/provider-inventory/src/services/start-server/app-initializer.tsapps/provider-inventory/src/services/stream-lifecycle-manager/stream-lifecycle-manager.service.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/provider-inventory/src/services/provider-inventory-writer/provider-inventory-writer.service.ts
…rite path Wire the first vertical cut of the streamer pipeline: chain poll → discovery scheduler → stream lifecycle manager → writer. Pure mappers (projectRow, computeRollups, reduceAttributes) are unit-tested; discovery loop uses recursive setTimeout with immediate first tick. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Switch to JS private fields, camelCase interface properties, injectable logger factory and Drizzle DB provider. Remove pointless attribute remap in reduceAttributes, DRY the upsert changeset, handle duplicate owners in audit records, and drop setup() wrappers from pure-function tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…load Configure postgres.js with custom JSON type handlers that serialize bigint as raw numeric literals and parse integers back as bigint using Node 24's reviver context. Inventory types now use bigint for all resource quantities, eliminating precision loss for large values. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Keep Inventory types as number since all practical resource values fit within Number.MAX_SAFE_INTEGER. The JSONB parser only promotes to bigint when String(value) !== context.source, acting as a safety net for edge cases rather than converting every integer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
reconcile now aborts active streams whose owner is absent from the latest provider list, preventing stale streams from living indefinitely. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
6886338 to
595cb93
Compare
The function was only extracting unique auditors after the remap removal. Compute auditedBy directly in upsertProvider and pass selfAttributes / signedAttributes from ChainProvider, removing the lib, its tests, and the ReducedAttributes type. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…butes Stream messages only carry inventory data — attributes come from the chain poll. Split the single upsertProvider into upsertInventory (called per stream message, writes inventory + rollups + online status) and upsertAttributes (called once per discovery tick, writes self/signed attributes + auditedBy). Removes the ReducedAttributes type. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Covers reconcile (open new, skip existing, stop removed), stream message processing (projectRow + upsertInventory per message), error handling (write errors logged and skipped, stream errors logged, AbortError suppressed), and shutdown (all streams aborted). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
220914e to
2eb2d69
Compare
|
The |
No, the responsibility of |
Why
Ref CON-303
What
Wire the first vertical cut of the streamer pipeline: chain poll → discovery scheduler → stream lifecycle manager → writer. Pure mappers (projectRow, computeRollups, reduceAttributes) are unit-tested; discovery loop uses recursive setTimeout with immediate first tick.
Summary by CodeRabbit
New Features
Configuration
Improvements