Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 49 additions & 32 deletions rivetkit-typescript/packages/rivetkit/tests/driver/actor-db.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @ts-nocheck

import { describe, expect, test, vi } from "vitest";
import { describe, expect, test } from "vitest";
import {
describeDriverMatrix,
SQLITE_DRIVER_MATRIX_OPTIONS,
Expand All @@ -13,6 +13,7 @@ const CHUNK_SIZE = 4096;
const LARGE_PAYLOAD_SIZE = 32768;
const HIGH_VOLUME_COUNT = 1000;
const SLEEP_WAIT_MS = 150;
const WASM_REMOTE_SLEEP_RECOVERY_TIMEOUT_MS = 30_000;
const LIFECYCLE_POLL_INTERVAL_MS = 25;
const LIFECYCLE_POLL_ATTEMPTS = 40;
const REAL_TIMER_DB_TIMEOUT_MS = 180_000;
Expand Down Expand Up @@ -53,50 +54,50 @@ function isActorStoppingDbError(error: unknown): boolean {
);
}

async function retryActorStoppingDbError<T>(
driverTestConfig: DriverTestConfig,
fn: () => Promise<T>,
timeoutMs: number,
): Promise<T> {
const deadline = Date.now() + timeoutMs;
for (;;) {
try {
return await fn();
} catch (error) {
if (!isActorStoppingDbError(error) || Date.now() >= deadline) {
throw error;
}
await waitFor(driverTestConfig, 100);
}
}
}

async function runWithActorStoppingRetry(
_driverTestConfig: DriverTestConfig,
driverTestConfig: DriverTestConfig,
fn: () => Promise<void>,
): Promise<void> {
// Wait for the actor to leave the `stopping` window. The driver does not
// surface a "ready again" signal, so we poll the user function and only
// retry on the specific `actor stopping: database accessed` error. Any
// other failure short-circuits.
await vi.waitFor(
async () => {
try {
await fn();
} catch (error) {
if (isActorStoppingDbError(error)) {
throw error;
}
throw new (class extends Error {
override name = "AbortRetry";
})(error instanceof Error ? error.message : String(error));
}
},
{ timeout: SLEEP_WAIT_MS * 4, interval: 100 },
await retryActorStoppingDbError(
driverTestConfig,
fn,
SLEEP_WAIT_MS * 4,
);
}

async function expectIntegrityCheckOk(
_driverTestConfig: DriverTestConfig,
driverTestConfig: DriverTestConfig,
integrityCheck: () => Promise<string>,
): Promise<void> {
// Same lifecycle window as `runWithActorStoppingRetry`: the integrity
// check is read-only against the SQLite db, so polling it does not hold
// the actor awake.
await vi.waitFor(
await retryActorStoppingDbError(
driverTestConfig,
async () => {
try {
expect((await integrityCheck()).toLowerCase()).toBe("ok");
} catch (error) {
if (isActorStoppingDbError(error)) {
throw error;
}
throw error;
}
expect((await integrityCheck()).toLowerCase()).toBe("ok");
},
{ timeout: SLEEP_WAIT_MS * 8, interval: 100 },
driverTestConfig.runtime === "wasm"
? WASM_REMOTE_SLEEP_RECOVERY_TIMEOUT_MS
: SLEEP_WAIT_MS * 8,
);
}

Expand Down Expand Up @@ -276,6 +277,7 @@ describeDriverMatrix("Actor Db", (driverTestConfig) => {
expect(
sleepEvent.timestamp - sleepRequestedAt,
).toBeLessThanOrEqual(SLEEP_OBSERVER_TIMEOUT_MS);
await waitFor(driverTestConfig, SLEEP_WAIT_MS);

const countAfterWake = await actor.getCount();
expect(countAfterWake).toBe(baselineCount);
Expand Down Expand Up @@ -508,11 +510,18 @@ describeDriverMatrix("Actor Db", (driverTestConfig) => {
c,
driverTestConfig,
);
const actorKey = `db-${variant}-integrity-${crypto.randomUUID()}`;
const observerKey = `${actorKey}-observer`;
const observer = client.lifecycleObserver.getOrCreate([
observerKey,
]);
await observer.clearEvents();
const actor = getDbActor(client, variant).getOrCreate([
`db-${variant}-integrity-${crypto.randomUUID()}`,
actorKey,
]);

await actor.reset();
await actor.configureLifecycleObserver(observerKey);
await runWithActorStoppingRetry(
driverTestConfig,
async () =>
Expand All @@ -526,7 +535,15 @@ describeDriverMatrix("Actor Db", (driverTestConfig) => {
async () => await actor.integrityCheck(),
);

const sleepRequestedAt = Date.now();
await actor.triggerSleep();
await waitForDbLifecycleEvent(
driverTestConfig,
observer,
actorKey,
"sleep",
sleepRequestedAt,
);
await expectIntegrityCheckOk(
driverTestConfig,
async () => await actor.integrityCheck(),
Expand Down
Loading