diff --git a/.changeset/scheduled-publishing-driver.md b/.changeset/scheduled-publishing-driver.md new file mode 100644 index 000000000..6a4d02c5a --- /dev/null +++ b/.changeset/scheduled-publishing-driver.md @@ -0,0 +1,31 @@ +--- +"emdash": minor +"@emdash-cms/cloudflare": minor +--- + +Drive scheduled publishing from a real heartbeat instead of request side effects (#1303). + +Content scheduled via the admin now actually transitions to `published` when its time arrives. Previously nothing promoted the row — `status` stayed `scheduled` and `published_at` stayed null forever. + +A new sweep (`publishDueContent`) promotes due content and runs alongside the existing cron tick and system cleanup: + +- **Node / single-process:** the timer-based scheduler already drives it — no action needed. +- **Cloudflare Workers:** a `scheduled()` handler driven by a Cron Trigger now runs the sweep. The request-driven `PiggybackScheduler` is gone, so there are no maintenance side effects on visitor requests. + +`@emdash-cms/cloudflare` ships a Worker entry that wraps Astro's handler with the `scheduled()` handler (`@emdash-cms/cloudflare/worker`, plus `createScheduledHandler()` for hand-assembled Workers). When a cache provider is configured, the handler also purges edge-cache tags for whatever it published, so stale snapshots produced before the scheduled time are evicted. + +**Migration for existing Cloudflare sites.** New sites get this from the templates. Existing deployments must update two files: + +```ts +// src/worker.ts +export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; +``` + +```jsonc +// wrangler.jsonc +"triggers": { "crons": ["* * * * *"] } +``` + +Without the Cron Trigger, scheduled publishing and plugin cron do not run on Workers. + +Scheduled publishing matches manual publishing exactly: it fires `content:afterPublish` hooks (search indexing, webhooks, syndication), and records the _scheduled_ time as `published_at` on first publication rather than the (later) sweep time. The sweep claims each row atomically before promoting it, so an entry unscheduled or rescheduled just before its time is never published, and overlapping sweeps can't double-publish. Local `astro dev` keeps running the timer-driven sweep even under the Cloudflare adapter (where production relies on the Cron Trigger). diff --git a/packages/cloudflare/package.json b/packages/cloudflare/package.json index 9a09ec5f4..9cbf125ce 100644 --- a/packages/cloudflare/package.json +++ b/packages/cloudflare/package.json @@ -41,6 +41,10 @@ "types": "./dist/sandbox/index.d.mts", "default": "./dist/sandbox/index.mjs" }, + "./worker": { + "types": "./dist/worker.d.mts", + "default": "./dist/worker.mjs" + }, "./plugins": { "types": "./dist/plugins/index.d.mts", "default": "./dist/plugins/index.mjs" @@ -77,12 +81,14 @@ "ulidx": "^2.4.1" }, "peerDependencies": { + "@astrojs/cloudflare": ">=12.0.0", "@cloudflare/workers-types": ">=4.0.0", "astro": ">=6.0.0-beta.0", "kysely": ">=0.28.17" }, "devDependencies": { "@arethetypeswrong/cli": "catalog:", + "@astrojs/cloudflare": "catalog:", "@cloudflare/workers-types": "catalog:", "publint": "catalog:", "tsdown": "catalog:", diff --git a/packages/cloudflare/src/worker.ts b/packages/cloudflare/src/worker.ts new file mode 100644 index 000000000..4afe92926 --- /dev/null +++ b/packages/cloudflare/src/worker.ts @@ -0,0 +1,80 @@ +/** + * Cloudflare Worker entry for EmDash sites. + * + * Wraps the Astro Cloudflare server handler with a `scheduled()` handler so a + * Cron Trigger drives scheduled publishing, plugin cron, and system cleanup + * without any request side effects. Re-exports the `PluginBridge` Durable + * Object so the sandbox binding resolves against the entry module. + * + * Templates use this as their entire `src/worker.ts`: + * + * export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; + * + * and add a Cron Trigger to wrangler.jsonc: + * + * "triggers": { "crons": ["* * * * *"] } + * + * The `@astrojs/cloudflare/entrypoints/server` import is resolved by the + * consuming app's Astro build (it pulls the build-time `virtual:astro:app` + * module), so this package keeps the adapter external. + */ + +// @ts-ignore - resolved against the consuming app's Astro build +import astroHandler from "@astrojs/cloudflare/entrypoints/server"; +import { createApp } from "astro/app/entrypoint"; +import { runScheduledTasks } from "emdash/middleware"; + +export { PluginBridge } from "./sandbox/index.js"; + +// The Astro App wraps the build manifest; reuse one per isolate so each tick +// doesn't re-resolve the cache provider. +let app: ReturnType | null = null; + +/** + * Purge edge-cache tags for content the sweep just published. Without a + * request there's no `locals.cache`, so we reach the configured cache provider + * through the Astro App pipeline — the same provider routes invalidate against. + * A no-op when no cache provider is configured. + */ +async function invalidatePublishedTags( + published: ReadonlyArray<{ collection: string; id: string }>, +): Promise { + if (published.length === 0) return; + app ??= createApp(); + const provider = await app.pipeline.getCacheProvider(); + if (!provider) return; + const tags = [...new Set(published.flatMap((ref) => [ref.collection, ref.id]))]; + await provider.invalidate({ tags }); +} + +/** + * Build a Worker `scheduled()` handler that runs EmDash's scheduled + * maintenance batch and purges edge-cache tags for anything it published. + * Exported for sites that assemble their own Worker object; most sites get it + * via this module's default export. + */ +export function createScheduledHandler(): ExportedHandlerScheduledHandler { + return (_controller, _env, ctx) => { + ctx.waitUntil( + runScheduledTasks() + .then(async ({ published }) => { + await invalidatePublishedTags(published); + if (published.length > 0) { + console.log(`[scheduled] Published ${published.length} scheduled item(s)`); + } + return undefined; + }) + .catch((error: unknown) => { + console.error("[scheduled] runScheduledTasks failed:", error); + }), + ); + }; +} + +// eslint-disable-next-line typescript/no-unsafe-type-assertion -- astroHandler is the adapter's { fetch } worker object; resolved at app-build time +const handler = astroHandler as ExportedHandler; + +export default { + ...handler, + scheduled: createScheduledHandler(), +} satisfies ExportedHandler; diff --git a/packages/cloudflare/tsdown.config.ts b/packages/cloudflare/tsdown.config.ts index 2e524c7b4..a067e41fe 100644 --- a/packages/cloudflare/tsdown.config.ts +++ b/packages/cloudflare/tsdown.config.ts @@ -10,6 +10,7 @@ export default defineConfig({ "src/storage/r2.ts", "src/auth/index.ts", "src/sandbox/index.ts", + "src/worker.ts", "src/plugins/index.ts", // Media provider runtimes "src/media/images-runtime.ts", @@ -21,5 +22,9 @@ export default defineConfig({ format: ["esm"], dts: true, clean: true, - external: ["cloudflare:workers", "cloudflare:email"], + // @astrojs/cloudflare's server entrypoint and `astro/app/entrypoint` both + // resolve the build-time `virtual:astro:app` module — only available in the + // consuming app's Astro build, never here. Keep them external so the bare + // imports survive to be resolved downstream. + external: ["cloudflare:workers", "cloudflare:email", /^@astrojs\/cloudflare/, /^astro($|\/)/], }); diff --git a/packages/core/src/api/handlers/content.ts b/packages/core/src/api/handlers/content.ts index 2e0a3897e..346e1b8c4 100644 --- a/packages/core/src/api/handlers/content.ts +++ b/packages/core/src/api/handlers/content.ts @@ -14,6 +14,7 @@ import { RevisionRepository } from "../../database/repositories/revision.js"; import { SeoRepository } from "../../database/repositories/seo.js"; import { EmDashValidationError, + ScheduledNotDueError, InvalidCursorError, type BylineSummary, type ContentBylineCredit, @@ -1267,13 +1268,13 @@ export async function handleContentPublish( db: Kysely, collection: string, id: string, - options: { publishedAt?: string } = {}, + options: { publishedAt?: string; requireScheduledDue?: boolean } = {}, ): Promise> { try { const item = await withTransaction(db, async (trx) => { const repo = new ContentRepository(trx); const resolvedId = (await resolveId(repo, collection, id)) ?? id; - return repo.publish(collection, resolvedId, options.publishedAt); + return repo.publish(collection, resolvedId, options.publishedAt, options.requireScheduledDue); }); const hasSeo = await collectionHasSeo(db, collection); @@ -1284,6 +1285,17 @@ export async function handleContentPublish( data: { item }, }; } catch (error) { + // The scheduled sweep gates publish on the row still being due; a row + // unscheduled in the meantime is a silent skip, not a failure. + if (error instanceof ScheduledNotDueError) { + return { + success: false, + error: { + code: "NOT_DUE", + message: error.message, + }, + }; + } if (error instanceof EmDashValidationError) { return { success: false, diff --git a/packages/core/src/astro/integration/virtual-modules.ts b/packages/core/src/astro/integration/virtual-modules.ts index f130c2309..812d393be 100644 --- a/packages/core/src/astro/integration/virtual-modules.ts +++ b/packages/core/src/astro/integration/virtual-modules.ts @@ -63,6 +63,9 @@ export const RESOLVED_VIRTUAL_SEED_ID = "\0" + VIRTUAL_SEED_ID; export const VIRTUAL_WAIT_UNTIL_ID = "virtual:emdash/wait-until"; export const RESOLVED_VIRTUAL_WAIT_UNTIL_ID = "\0" + VIRTUAL_WAIT_UNTIL_ID; +export const VIRTUAL_SCHEDULER_ID = "virtual:emdash/scheduler"; +export const RESOLVED_VIRTUAL_SCHEDULER_ID = "\0" + VIRTUAL_SCHEDULER_ID; + /** * Generates the config virtual module. */ @@ -413,6 +416,42 @@ export function generateWaitUntilModule(adapterName: string | undefined): string return `export const waitUntil = undefined;`; } +/** + * Generates the scheduler virtual module. + * + * Decides — at build time, from the Astro adapter — whether the runtime gets a + * long-lived timer heartbeat. A *production* Cloudflare build has no persistent + * timers, so the Worker's `scheduled()` handler (a Cron Trigger) drives + * `runScheduledTasks()` instead and this exports `null`. Every other case — any + * other adapter (Node, Bun), and crucially local `astro dev` even under the + * Cloudflare adapter (no Cron Trigger fires in dev) — gets a `NodeCronScheduler` + * factory so plugin cron, scheduled publishing, and cleanup still run. + * + * Keeping the adapter check here — rather than in core's runtime — means the + * runtime has no Cloudflare-specific code path; it just calls `createScheduler` + * if one was injected. Mirrors the wait-until module's approach. + */ +export function generateSchedulerModule( + adapterName: string | undefined, + command: "build" | "serve" | undefined, +): string { + // Only suppress the timer for an actual Cloudflare *build* — that artifact + // runs in workerd where a Cron Trigger drives scheduled work. In `serve` + // (local dev) nothing fires the Cron Trigger, so fall through to the timer. + if (adapterName === "@astrojs/cloudflare" && command !== "serve") { + return `// Serverless build: an external Cron Trigger drives scheduled work. +export const createScheduler = null; +`; + } + return `// Long-lived runtime (or local dev): drive scheduled work from an in-process timer. +import { NodeCronScheduler } from "emdash"; + +export function createScheduler(executor) { + return new NodeCronScheduler(executor); +} +`; +} + /** * Generates the seed virtual module. * Reads the user's seed file at build time (in Node context) and embeds it, diff --git a/packages/core/src/astro/integration/vite-config.ts b/packages/core/src/astro/integration/vite-config.ts index 669de9076..5c10e721f 100644 --- a/packages/core/src/astro/integration/vite-config.ts +++ b/packages/core/src/astro/integration/vite-config.ts @@ -42,8 +42,11 @@ import { RESOLVED_VIRTUAL_SEED_ID, VIRTUAL_WAIT_UNTIL_ID, RESOLVED_VIRTUAL_WAIT_UNTIL_ID, + VIRTUAL_SCHEDULER_ID, + RESOLVED_VIRTUAL_SCHEDULER_ID, generateSeedModule, generateWaitUntilModule, + generateSchedulerModule, generateConfigModule, generateDialectModule, generateStorageModule, @@ -203,6 +206,9 @@ export function createVirtualModulesPlugin(options: VitePluginOptions): Plugin { if (id === VIRTUAL_WAIT_UNTIL_ID) { return RESOLVED_VIRTUAL_WAIT_UNTIL_ID; } + if (id === VIRTUAL_SCHEDULER_ID) { + return RESOLVED_VIRTUAL_SCHEDULER_ID; + } }, load(id: string) { if (id === RESOLVED_VIRTUAL_CONFIG_ID) { @@ -271,6 +277,12 @@ export function createVirtualModulesPlugin(options: VitePluginOptions): Plugin { if (id === RESOLVED_VIRTUAL_WAIT_UNTIL_ID) { return generateWaitUntilModule(astroConfig.adapter?.name); } + // Generate scheduler module — a NodeCronScheduler factory on + // long-lived runtimes, or null under the Cloudflare adapter where + // a Cron Trigger drives scheduled work instead. + if (id === RESOLVED_VIRTUAL_SCHEDULER_ID) { + return generateSchedulerModule(astroConfig.adapter?.name, viteCommand); + } }, }; } diff --git a/packages/core/src/astro/middleware.ts b/packages/core/src/astro/middleware.ts index 0ec49008d..105611a21 100644 --- a/packages/core/src/astro/middleware.ts +++ b/packages/core/src/astro/middleware.ts @@ -25,6 +25,8 @@ import * as virtualSandboxRunnerModule from "virtual:emdash/sandbox-runner"; // @ts-ignore - virtual module import { sandboxedPlugins as virtualSandboxedPlugins } from "virtual:emdash/sandboxed-plugins"; // @ts-ignore - virtual module +import { createScheduler as virtualCreateScheduler } from "virtual:emdash/scheduler"; +// @ts-ignore - virtual module import { createStorage as virtualCreateStorage } from "virtual:emdash/storage"; import { @@ -37,6 +39,7 @@ import { type RuntimeDependencies, type SandboxedPluginEntry, type MediaProviderEntry, + type CreateSchedulerFn, } from "../emdash-runtime.js"; import { setI18nConfig } from "../i18n/config.js"; import type { Database, Storage } from "../index.js"; @@ -50,6 +53,7 @@ import { type RequestMetrics, runWithContext, } from "../request-context.js"; +import type { PublishedRef } from "../scheduled-publish.js"; import type { EmDashConfig } from "./integration/runtime.js"; import { createPublicPluginApiRouteHandler } from "./public-plugin-api-routes.js"; import type { EmDashHandlers } from "./types.js"; @@ -126,6 +130,7 @@ function buildDependencies(config: EmDashConfig): RuntimeDependencies { plugins: getPlugins(), createDialect: virtualCreateDialect as (config: Record) => unknown, createStorage: virtualCreateStorage as ((config: Record) => Storage) | null, + createScheduler: virtualCreateScheduler as CreateSchedulerFn | null, sandboxEnabled: sandboxModule.sandboxEnabled as boolean, sandboxBypassed: (sandboxModule.sandboxBypassed as boolean) ?? false, sandboxedPluginEntries: (virtualSandboxedPlugins as SandboxedPluginEntry[]) || [], @@ -184,6 +189,25 @@ async function getRuntime( } } +/** + * Run scheduled maintenance (cron tasks, scheduled publishing, system cleanup) + * outside any request. Resolves the runtime from the build-time virtual config + * and the cached singleton — the same instance request handlers use. + * + * Wired into a platform heartbeat that is not a request: the Cloudflare Worker's + * `scheduled()` handler (Cron Trigger) calls this. On Node the runtime's own + * timer-based scheduler already drives the same work, so this isn't needed there. + * + * Returns the content promoted by the publishing sweep so the caller can purge + * edge-cache tags for it. + */ +export async function runScheduledTasks(): Promise<{ published: PublishedRef[] }> { + const config = getConfig(); + if (!config) return { published: [] }; + const runtime = await getRuntime(config); + return runtime.runScheduledTasks(); +} + /** * Astro attaches AstroCookies to outgoing responses via a well-known global * symbol. Cloning a Response (`new Response(body, init)`) drops non-header diff --git a/packages/core/src/astro/types.ts b/packages/core/src/astro/types.ts index 3a2f51a8d..c1f152dbb 100644 --- a/packages/core/src/astro/types.ts +++ b/packages/core/src/astro/types.ts @@ -312,7 +312,7 @@ export interface EmDashHandlers { handleContentPublish: ( collection: string, id: string, - options?: { publishedAt?: string }, + options?: { publishedAt?: string; requireScheduledDue?: boolean }, ) => Promise; handleContentUnpublish: (collection: string, id: string) => Promise; diff --git a/packages/core/src/database/repositories/content.ts b/packages/core/src/database/repositories/content.ts index abb69af82..77240377b 100644 --- a/packages/core/src/database/repositories/content.ts +++ b/packages/core/src/database/repositories/content.ts @@ -12,7 +12,12 @@ import type { FindManyResult, ContentItem, } from "./types.js"; -import { EmDashValidationError, encodeCursor, decodeCursor } from "./types.js"; +import { + EmDashValidationError, + ScheduledNotDueError, + encodeCursor, + decodeCursor, +} from "./types.js"; // Regex pattern for ULID validation const ULID_PATTERN = /^[0-9A-Z]{26}$/; @@ -978,8 +983,21 @@ export class ContentRepository { * original date) and falls back to the current time on first publish. Pass * an explicit value to backdate a publish (e.g. when migrating content from * another CMS). + * + * `requireDue` (optional) gates the publish on the row still being due: + * `scheduled_at` non-null and in the past. Used by the scheduled-publishing + * sweep to avoid publishing content an editor unscheduled or rescheduled + * between selection and publish. It claims the row with a single conditional + * UPDATE (clearing `scheduled_at`) before any other write, so it is atomic + * even on D1 (no multi-statement transactions) and serialises against + * `unschedule()` and concurrent sweeps — no TOCTOU and no double publish. */ - async publish(type: string, id: string, publishedAt?: string): Promise { + async publish( + type: string, + id: string, + publishedAt?: string, + requireDue = false, + ): Promise { const tableName = getTableName(type); const now = new Date().toISOString(); @@ -988,71 +1006,133 @@ export class ContentRepository { throw new EmDashValidationError("Content item not found"); } - const revisionRepo = new RevisionRepository(this.db); - let revisionToPublish = existing.draftRevisionId || existing.liveRevisionId; - - if (!revisionToPublish) { - // No revision exists - create one from current data - const revision = await revisionRepo.create({ - collection: type, - entryId: id, - data: existing.data, - }); - revisionToPublish = revision.id; + // Scheduled sweep: atomically claim the row before any other write. A + // single conditional UPDATE is atomic per-statement on every dialect + // (it doesn't depend on a wrapping transaction, which D1 lacks). If the + // schedule was cleared or pushed to the future (unschedule/reschedule) + // or another sweep already claimed it, this affects 0 rows and we bail + // before promoting any revision — so the row can't be double-published. + let claimedScheduledAt: string | null = null; + if (requireDue) { + const claim = await sql` + UPDATE ${sql.ref(tableName)} + SET scheduled_at = NULL, + updated_at = ${now} + WHERE id = ${id} + AND scheduled_at IS NOT NULL + AND scheduled_at <= ${now} + AND deleted_at IS NULL + `.execute(this.db); + if ((claim.numAffectedRows ?? 0n) === 0n) { + throw new ScheduledNotDueError(); + } + // Remember what we cleared so we can put it back if the publish work + // below fails on a driver without transactions (see catch). + claimedScheduledAt = existing.scheduledAt; } - // Sync the revision's data into the content table columns - // so the content table always holds the published version - const revision = await revisionRepo.findById(revisionToPublish); - if (revision) { - await this.syncDataColumns(type, id, revision.data); + // Track whether the final publish write committed. On D1 the claim above + // is already durable (withTransaction is a no-op there), so if a later + // step throws we must restore the schedule — otherwise the row is left + // `scheduled` with `scheduled_at = NULL` and no sweep ever retries it. + let publishCommitted = false; + try { + const revisionRepo = new RevisionRepository(this.db); + let revisionToPublish = existing.draftRevisionId || existing.liveRevisionId; + + if (!revisionToPublish) { + // No revision exists - create one from current data + const revision = await revisionRepo.create({ + collection: type, + entryId: id, + data: existing.data, + }); + revisionToPublish = revision.id; + } + + // Sync the revision's data into the content table columns + // so the content table always holds the published version + const revision = await revisionRepo.findById(revisionToPublish); + if (revision) { + await this.syncDataColumns(type, id, revision.data); + + // Sync slug from revision if stored there + if (typeof revision.data._slug === "string") { + await sql` + UPDATE ${sql.ref(tableName)} + SET slug = ${revision.data._slug} + WHERE id = ${id} + `.execute(this.db); + } + } - // Sync slug from revision if stored there - if (typeof revision.data._slug === "string") { + if (publishedAt !== undefined) { + // Caller supplied an explicit timestamp, so we overwrite published_at + // directly (used to backdate a publish, e.g. for content migrations). await sql` UPDATE ${sql.ref(tableName)} - SET slug = ${revision.data._slug} + SET live_revision_id = ${revisionToPublish}, + draft_revision_id = NULL, + status = 'published', + scheduled_at = NULL, + published_at = ${publishedAt}, + updated_at = ${now} WHERE id = ${id} + AND deleted_at IS NULL + `.execute(this.db); + } else { + // No timestamp supplied — preserve existing published_at on + // idempotent re-publish, fall back to `now` on first publish. + await sql` + UPDATE ${sql.ref(tableName)} + SET live_revision_id = ${revisionToPublish}, + draft_revision_id = NULL, + status = 'published', + scheduled_at = NULL, + published_at = COALESCE(published_at, ${now}), + updated_at = ${now} + WHERE id = ${id} + AND deleted_at IS NULL `.execute(this.db); } - } + publishCommitted = true; - if (publishedAt !== undefined) { - // Caller supplied an explicit timestamp, so we overwrite published_at - // directly (used to backdate a publish, e.g. for content migrations). - await sql` - UPDATE ${sql.ref(tableName)} - SET live_revision_id = ${revisionToPublish}, - draft_revision_id = NULL, - status = 'published', - scheduled_at = NULL, - published_at = ${publishedAt}, - updated_at = ${now} - WHERE id = ${id} - AND deleted_at IS NULL - `.execute(this.db); - } else { - // No timestamp supplied — preserve existing published_at on - // idempotent re-publish, fall back to `now` on first publish. - await sql` - UPDATE ${sql.ref(tableName)} - SET live_revision_id = ${revisionToPublish}, - draft_revision_id = NULL, - status = 'published', - scheduled_at = NULL, - published_at = COALESCE(published_at, ${now}), - updated_at = ${now} - WHERE id = ${id} - AND deleted_at IS NULL - `.execute(this.db); - } + const updated = await this.findById(type, id); + if (!updated) { + throw new Error("Content not found"); + } - const updated = await this.findById(type, id); - if (!updated) { - throw new Error("Content not found"); + return updated; + } catch (error) { + // Best-effort schedule restore for the no-transaction (D1) case so a + // failed publish stays retryable. Skipped when the publish actually + // committed (the failure was afterwards). On SQLite/Postgres the + // enclosing transaction rolls the claim back, so this restore also + // rolls back — a harmless no-op. Never mask the original error. + if (requireDue && claimedScheduledAt && !publishCommitted) { + try { + // Only restore if the row still has pending work: either it's not + // published, or it's a published row that still has a draft change + // queued. This avoids re-adding a stale schedule (and triggering a + // redundant republish) when another actor fully published the row + // in the failure window — that publish clears draft_revision_id. + await sql` + UPDATE ${sql.ref(tableName)} + SET scheduled_at = ${claimedScheduledAt} + WHERE id = ${id} + AND scheduled_at IS NULL + AND deleted_at IS NULL + AND (status != 'published' OR draft_revision_id IS NOT NULL) + `.execute(this.db); + } catch (restoreError) { + console.error( + `[content] Failed to restore schedule for ${type}/${id} after publish failure:`, + restoreError, + ); + } + } + throw error; } - - return updated; } /** diff --git a/packages/core/src/database/repositories/types.ts b/packages/core/src/database/repositories/types.ts index fe50001da..6624ec2d1 100644 --- a/packages/core/src/database/repositories/types.ts +++ b/packages/core/src/database/repositories/types.ts @@ -227,3 +227,16 @@ export class EmDashValidationError extends Error { this.name = "EmDashValidationError"; } } + +/** + * Thrown by `publish()` when called with `requireDue` for a row that is no + * longer due (its `scheduled_at` was cleared or pushed into the future between + * selection and publish — e.g. an editor unscheduled it). Lets the scheduled + * sweep skip the row silently rather than treating it as a publish failure. + */ +export class ScheduledNotDueError extends Error { + constructor(message = "Content is no longer scheduled to publish") { + super(message); + this.name = "ScheduledNotDueError"; + } +} diff --git a/packages/core/src/emdash-runtime.ts b/packages/core/src/emdash-runtime.ts index aa095357e..b8c06d7fc 100644 --- a/packages/core/src/emdash-runtime.ts +++ b/packages/core/src/emdash-runtime.ts @@ -157,13 +157,12 @@ import { import { normalizeManifestRoute } from "./plugins/manifest-schema.js"; import { extractRequestMeta, sanitizeHeadersForSandbox } from "./plugins/request-meta.js"; import { PluginRouteRegistry, type RouteMeta } from "./plugins/routes.js"; -import { NodeCronScheduler } from "./plugins/scheduler/node.js"; -import { PiggybackScheduler } from "./plugins/scheduler/piggyback.js"; import type { CronScheduler } from "./plugins/scheduler/types.js"; import { PluginStateRepository } from "./plugins/state.js"; import { normalizeRegistryConfig } from "./registry/config.js"; import { requestCached } from "./request-cache.js"; import { getRequestContext } from "./request-context.js"; +import { publishDueContent, type PublishedRef } from "./scheduled-publish.js"; import { FTSManager } from "./search/fts-manager.js"; import { invalidateSiteSettingsCache } from "./settings/index.js"; @@ -236,6 +235,13 @@ export interface MediaProviderContext { storage: Storage | null; } +/** + * Builds the timer-based scheduler that drives cron ticks and maintenance. + * Injected via `virtual:emdash/scheduler` so the platform — not core — decides + * whether a long-lived heartbeat exists. + */ +export type CreateSchedulerFn = (executor: CronExecutor) => CronScheduler; + /** * Dependencies injected from virtual modules (middleware reads these) */ @@ -249,6 +255,16 @@ export interface RuntimeDependencies { sandboxEnabled: boolean; /** sandbox: false escape hatch - load sandboxed plugins in-process */ sandboxBypassed?: boolean; + /** + * Factory for the timer-based cron/maintenance heartbeat. Supplied by the + * generated `virtual:emdash/scheduler` module: a `NodeCronScheduler` factory + * on long-lived runtimes (Node/Bun), or `null` on serverless adapters where + * an external driver (e.g. the Cloudflare Worker's `scheduled()` Cron + * Trigger) calls `runScheduledTasks()` instead. When absent or null, the + * runtime starts no scheduler. Keeping the platform decision in the + * integration means core has no adapter-specific runtime checks. + */ + createScheduler?: CreateSchedulerFn | null; /** Media provider entries from virtual module */ mediaProviderEntries?: MediaProviderEntry[]; sandboxedPluginEntries: SandboxedPluginEntry[]; @@ -449,14 +465,55 @@ export class EmDashRuntime { } /** - * Tick the cron system from request context (piggyback mode). - * Call this from middleware on each request to ensure cron tasks - * execute even when no dedicated scheduler is available. + * Publish any content whose scheduled time has passed. + * Returns the items promoted so callers can invalidate their cache tags. + */ + async publishScheduled(): Promise { + return publishDueContent(this.db, (collection, id, options) => + this.handleContentPublish(collection, id, options), + ); + } + + /** + * Run the full scheduled-maintenance batch: cron tasks, scheduled + * publishing, and system cleanup. For request-less drivers — the + * Cloudflare `scheduled()` handler invokes this from a Cron Trigger. + * (On Node the timer-based scheduler drives the same work itself.) + * + * Each step is independent and non-fatal. Returns the content promoted + * by the publishing sweep so the caller can purge edge-cache tags. */ - tickCron(): void { - if (this.cronScheduler instanceof PiggybackScheduler) { - this.cronScheduler.onRequest(); + async runScheduledTasks(): Promise<{ published: PublishedRef[] }> { + if (this.cronExecutor) { + try { + await this.cronExecutor.tick(); + } catch (error) { + console.error("[cron] Tick failed:", error); + } + try { + await this.cronExecutor.recoverStaleLocks(); + } catch (error) { + console.error("[cron] Stale lock recovery failed:", error); + } + } + + let published: PublishedRef[] = []; + try { + // Route through the runtime wrapper so content:afterPublish hooks fire. + published = await publishDueContent(this.db, (collection, id, options) => + this.handleContentPublish(collection, id, options), + ); + } catch (error) { + console.error("[scheduled-publish] Sweep failed:", error); } + + try { + await runSystemCleanup(this.db, this.storage ?? undefined); + } catch (error) { + console.error("[cleanup] System cleanup failed:", error); + } + + return { published }; } /** @@ -1126,6 +1183,11 @@ export class EmDashRuntime { let cronExecutor: CronExecutor | null = null; let cronScheduler: CronScheduler | null = null; + // Populated with the constructed runtime just before this method returns, + // so the timer scheduler's cleanup can route scheduled publishing through + // the runtime wrapper (firing content:afterPublish hooks). The first tick + // is ≥1s out, well after the synchronous assignment below. + const runtimeRef: { current: EmDashRuntime | null } = { current: null }; await phase("rt.cron", "Cron init (recovery deferred post-response)", async () => { try { @@ -1151,46 +1213,58 @@ export class EmDashRuntime { } }); - // Detect platform and create appropriate scheduler. - // On Cloudflare Workers, setTimeout is available but unreliable for - // long durations — use PiggybackScheduler as default. - // In Node/Bun, use NodeCronScheduler with real timers. - const isWorkersRuntime = - typeof globalThis.navigator !== "undefined" && - globalThis.navigator.userAgent === "Cloudflare-Workers"; - - if (isWorkersRuntime) { - cronScheduler = new PiggybackScheduler(cronExecutor); - } else { - cronScheduler = new NodeCronScheduler(cronExecutor); - } - - // Register system cleanup to run alongside each scheduler tick. - // Pass storage so cleanupPendingUploads can delete orphaned files. - cronScheduler.setSystemCleanup(async () => { - try { - await runSystemCleanup(db, storage ?? undefined); - } catch (error) { - // Non-fatal -- individual cleanup failures are already logged - // by runSystemCleanup. This catches unexpected errors. - console.error("[cleanup] System cleanup failed:", error); - } - }); + // The platform decides whether a long-lived timer heartbeat exists. + // `createScheduler` is injected by the generated virtual:emdash/scheduler + // module: a NodeCronScheduler factory on Node/Bun, or null on serverless + // adapters (e.g. Cloudflare) where the Worker's `scheduled()` handler + // drives runScheduledTasks() instead. No adapter check lives here. + if (deps.createScheduler) { + const scheduler = deps.createScheduler(cronExecutor); + cronScheduler = scheduler; + + // Run scheduled publishing and system cleanup alongside each tick. + // Pass storage so cleanupPendingUploads can delete orphaned files. + scheduler.setSystemCleanup(async () => { + try { + // Route through the runtime so content:afterPublish hooks fire. + // Falls back to the raw handler if (improbably) the tick beats + // the post-construction ref assignment. + const runtime = runtimeRef.current; + await publishDueContent( + db, + runtime + ? (collection, id, options) => + runtime.handleContentPublish(collection, id, options) + : undefined, + ); + } catch (error) { + console.error("[scheduled-publish] Sweep failed:", error); + } + try { + await runSystemCleanup(db, storage ?? undefined); + } catch (error) { + // Non-fatal -- individual cleanup failures are already logged + // by runSystemCleanup. This catches unexpected errors. + console.error("[cleanup] System cleanup failed:", error); + } + }); - // Add cron reschedule callback (merges with existing factory options) - pipeline.setContextFactory({ - cronReschedule: () => cronScheduler?.reschedule(), - }); + // Add cron reschedule callback (merges with existing factory options) + pipeline.setContextFactory({ + cronReschedule: () => cronScheduler?.reschedule(), + }); - // Start the scheduler - await cronScheduler.start(); + // start() is void on the timer scheduler but the interface + // allows a promise (alarm-backed schedulers); we don't block on it. + void scheduler.start(); + } } catch (error) { console.warn("[cron] Failed to initialize cron system:", error); // Non-fatal — CMS works without cron } }); - return new EmDashRuntime({ + const runtime = new EmDashRuntime({ db, storage, // Include bypassed sandboxed plugins in configuredPlugins so route @@ -1213,6 +1287,10 @@ export class EmDashRuntime { runtimeDeps: deps, pipelineRef, }); + // Hand the constructed instance to the scheduler-cleanup closure so the + // timer-driven sweep can fire publish hooks (see runtimeRef above). + runtimeRef.current = runtime; + return runtime; } /** @@ -2501,7 +2579,7 @@ export class EmDashRuntime { async handleContentPublish( collection: string, id: string, - options: { publishedAt?: string } = {}, + options: { publishedAt?: string; requireScheduledDue?: boolean } = {}, ) { const result = await handleContentPublish(this.db, collection, id, options); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a64261226..3fd73dad5 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -201,6 +201,8 @@ export { PluginManager, createPluginManager, PluginRouteError, + // Scheduler (Node timer heartbeat — used by virtual:emdash/scheduler) + NodeCronScheduler, // Sandbox NoopSandboxRunner, SandboxNotAvailableError, @@ -253,6 +255,10 @@ export type { CollectionCommentSettings, StoredComment, + // Scheduler types + CronScheduler, + SystemCleanupFn, + // Sandbox runtime types SandboxRunner, SandboxedPluginInstance, diff --git a/packages/core/src/plugins/cron.ts b/packages/core/src/plugins/cron.ts index 851a852d1..c44b40726 100644 --- a/packages/core/src/plugins/cron.ts +++ b/packages/core/src/plugins/cron.ts @@ -34,8 +34,9 @@ export type RescheduleFn = () => void; /** * Executes overdue cron tasks. * - * Called by platform-specific schedulers (NodeCronScheduler, EmDashScheduler DO, - * PiggybackScheduler). Stateless — all state lives in the database. + * Called by the platform driver: the NodeCronScheduler timer on Node, or the + * Worker's `scheduled()` handler (via runScheduledTasks) on Cloudflare. + * Stateless — all state lives in the database. */ export class CronExecutor { constructor( diff --git a/packages/core/src/plugins/index.ts b/packages/core/src/plugins/index.ts index 51cb6e31c..a9e22d49a 100644 --- a/packages/core/src/plugins/index.ts +++ b/packages/core/src/plugins/index.ts @@ -63,6 +63,11 @@ export type { RouteResult, InvokeRouteOptions } from "./routes.js"; export { PluginManager, createPluginManager } from "./manager.js"; export type { PluginManagerOptions, PluginState } from "./manager.js"; +// Scheduler (Node timer-based heartbeat; consumed by the generated +// virtual:emdash/scheduler module on non-serverless adapters) +export { NodeCronScheduler } from "./scheduler/node.js"; +export type { CronScheduler, SystemCleanupFn } from "./scheduler/types.js"; + // Sandbox export { NoopSandboxRunner, diff --git a/packages/core/src/plugins/scheduler/piggyback.ts b/packages/core/src/plugins/scheduler/piggyback.ts deleted file mode 100644 index edc3bae68..000000000 --- a/packages/core/src/plugins/scheduler/piggyback.ts +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Piggyback cron scheduler — request-driven fallback. - * - * Checks for overdue tasks on each incoming request, debounced to at most - * once per 60 seconds. Fire-and-forget (does not block the request). - * - * Used on Cloudflare when no Durable Object binding is available, or - * during development when DO bindings aren't configured. - * - */ - -import type { CronExecutor } from "../cron.js"; -import type { CronScheduler, SystemCleanupFn } from "./types.js"; - -/** Minimum interval between tick attempts (ms) */ -const DEBOUNCE_MS = 60 * 1000; - -export class PiggybackScheduler implements CronScheduler { - private lastTickAt = 0; - private running = false; - private systemCleanup: SystemCleanupFn | null = null; - - constructor(private executor: CronExecutor) {} - - setSystemCleanup(fn: SystemCleanupFn): void { - this.systemCleanup = fn; - } - - start(): void { - this.running = true; - } - - stop(): void { - this.running = false; - } - - /** - * No-op for piggyback — tick happens on next request. - */ - reschedule(): void { - // Nothing to do — next request will check - } - - /** - * Call this from middleware on each request. - * Debounced: only actually ticks if enough time has passed. - */ - onRequest(): void { - if (!this.running) return; - - const now = Date.now(); - if (now - this.lastTickAt < DEBOUNCE_MS) return; - - this.lastTickAt = now; - - // Fire-and-forget — don't block the request - const tasks: Promise[] = [this.executor.tick(), this.executor.recoverStaleLocks()]; - if (this.systemCleanup) { - tasks.push(this.systemCleanup()); - } - - void Promise.allSettled(tasks).then((results) => { - for (const r of results) { - if (r.status === "rejected") { - console.error("[cron:piggyback] Tick task failed:", r.reason); - } - } - return undefined; - }); - } -} diff --git a/packages/core/src/scheduled-publish.ts b/packages/core/src/scheduled-publish.ts new file mode 100644 index 000000000..b3d489338 --- /dev/null +++ b/packages/core/src/scheduled-publish.ts @@ -0,0 +1,101 @@ +/** + * Scheduled publishing sweep + * + * Promotes content whose scheduled publish time has passed. Driven by the + * platform scheduler alongside cron ticks and system cleanup — never by a + * request. On Node the cron scheduler's maintenance pass calls it; on + * Cloudflare the Worker's `scheduled()` handler does. + * + * Like `runSystemCleanup`, each collection sweep is independent and non-fatal: + * one collection failing must not stop the rest. + */ + +import type { Kysely } from "kysely"; + +import { handleContentPublish } from "./api/handlers/content.js"; +import { ContentRepository } from "./database/repositories/content.js"; +import type { Database } from "./database/types.js"; +import { SchemaRegistry } from "./schema/registry.js"; + +/** A content item that was promoted to published by a sweep. */ +export interface PublishedRef { + collection: string; + id: string; +} + +/** + * Publishes a single content item. Mirrors the relevant subset of + * `handleContentPublish`'s return shape. Production callers pass + * `EmDashRuntime.handleContentPublish` so `content:afterPublish` hooks fire + * (search indexing, webhooks, syndication); the default falls back to the raw + * handler (no hooks) for callers that have only a `db`. + */ +export type ScheduledPublishFn = ( + collection: string, + id: string, + options: { publishedAt?: string; requireScheduledDue?: boolean }, +) => Promise<{ success: boolean; error?: { code?: string } }>; + +/** + * Publish every content item whose `scheduled_at` is in the past. + * + * Iterates all collections, finds due items (`findReadyToPublish` returns both + * scheduled drafts and published entries with pending scheduled changes), and + * publishes each. `publish()` clears `scheduled_at`, so a second sweep is a + * no-op — safe to run on every tick. + * + * Pass `publish` (the runtime's `handleContentPublish`) so publish hooks fire; + * without it the sweep falls back to the raw DB handler and hooks are skipped. + * + * Returns the items it promoted so request-less callers (the Cloudflare + * `scheduled()` handler) can invalidate edge-cache tags for them. + */ +export async function publishDueContent( + db: Kysely, + publish?: ScheduledPublishFn, +): Promise { + const published: PublishedRef[] = []; + + let collections; + try { + collections = await new SchemaRegistry(db).listCollections(); + } catch (error) { + console.error("[scheduled-publish] Failed to list collections:", error); + return published; + } + + const repo = new ContentRepository(db); + const doPublish: ScheduledPublishFn = + publish ?? ((collection, id, options) => handleContentPublish(db, collection, id, options)); + + for (const collection of collections) { + try { + const due = await repo.findReadyToPublish(collection.slug); + for (const item of due) { + // First publication of a scheduled draft should record the intended + // scheduled time, not the (later) sweep time. Items already published + // with pending draft changes keep their original published_at. + const publishedAt = item.publishedAt == null ? (item.scheduledAt ?? undefined) : undefined; + const result = await doPublish(collection.slug, item.id, { + publishedAt, + requireScheduledDue: true, + }); + if (result.success) { + published.push({ collection: collection.slug, id: item.id }); + } else if (result.error?.code === "NOT_DUE") { + // Unscheduled or rescheduled between selection and publish — the + // editor changed their mind; skip quietly, not a failure. + } else { + console.error( + `[scheduled-publish] Failed to publish ${collection.slug}/${item.id}:`, + result.error, + ); + } + } + } catch (error) { + console.error(`[scheduled-publish] Sweep failed for "${collection.slug}":`, error); + } + } + + return published; +} diff --git a/packages/core/src/virtual-modules.d.ts b/packages/core/src/virtual-modules.d.ts index fc1429668..9f71aaea0 100644 --- a/packages/core/src/virtual-modules.d.ts +++ b/packages/core/src/virtual-modules.d.ts @@ -132,6 +132,17 @@ declare module "virtual:emdash/wait-until" { export const waitUntil: ((promise: Promise) => void) | undefined; } +declare module "virtual:emdash/scheduler" { + import type { CreateSchedulerFn } from "./emdash-runtime.js"; + /** + * Factory for the timer-based cron/maintenance heartbeat. A + * `NodeCronScheduler` factory on long-lived runtimes (Node/Bun); `null` + * under serverless adapters (e.g. Cloudflare) where an external Cron + * Trigger drives scheduled work instead. + */ + export const createScheduler: CreateSchedulerFn | null; +} + declare module "virtual:emdash/admin-registry" { /** * Plugin admin module registry. diff --git a/packages/core/tests/unit/astro/integration/virtual-modules.test.ts b/packages/core/tests/unit/astro/integration/virtual-modules.test.ts index eaf8ef993..9130c7087 100644 --- a/packages/core/tests/unit/astro/integration/virtual-modules.test.ts +++ b/packages/core/tests/unit/astro/integration/virtual-modules.test.ts @@ -7,6 +7,7 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { generateConfigModule, generateDialectModule, + generateSchedulerModule, generateSeedModule, } from "../../../../src/astro/integration/virtual-modules.js"; @@ -67,6 +68,36 @@ describe("generateDialectModule", () => { }); }); +describe("generateSchedulerModule", () => { + it("disables the timer for a Cloudflare production build (Cron Trigger drives it)", () => { + const out = generateSchedulerModule("@astrojs/cloudflare", "build"); + expect(out).toContain("export const createScheduler = null"); + expect(out).not.toContain("NodeCronScheduler"); + }); + + it("keeps the Node timer in local dev even under the Cloudflare adapter", () => { + // No Cron Trigger fires in `astro dev`, so scheduled publishing/cron + // must still run via the in-process timer. + const out = generateSchedulerModule("@astrojs/cloudflare", "serve"); + expect(out).toContain('import { NodeCronScheduler } from "emdash"'); + expect(out).toContain("export function createScheduler(executor)"); + expect(out).not.toContain("createScheduler = null"); + }); + + it("emits a NodeCronScheduler factory for non-Cloudflare adapters", () => { + for (const cmd of ["build", "serve", undefined] as const) { + const out = generateSchedulerModule("@astrojs/node", cmd); + expect(out).toContain('import { NodeCronScheduler } from "emdash"'); + expect(out).not.toContain("createScheduler = null"); + } + }); + + it("emits a NodeCronScheduler factory when no adapter is configured", () => { + const out = generateSchedulerModule(undefined, "build"); + expect(out).toContain("export function createScheduler(executor)"); + }); +}); + describe("generateSeedModule", () => { let projectRoot: string; diff --git a/packages/core/tests/unit/astro/middleware-prerender.test.ts b/packages/core/tests/unit/astro/middleware-prerender.test.ts index 292505c8a..46171d5be 100644 --- a/packages/core/tests/unit/astro/middleware-prerender.test.ts +++ b/packages/core/tests/unit/astro/middleware-prerender.test.ts @@ -118,6 +118,7 @@ vi.mock( vi.mock("virtual:emdash/sandboxed-plugins", () => ({ sandboxedPlugins: [] }), { virtual: true }); vi.mock("virtual:emdash/storage", () => ({ createStorage: null }), { virtual: true }); vi.mock("virtual:emdash/wait-until", () => ({ waitUntil: undefined }), { virtual: true }); +vi.mock("virtual:emdash/scheduler", () => ({ createScheduler: null }), { virtual: true }); vi.mock("../../../src/emdash-runtime.js", () => ({ EmDashRuntime: { diff --git a/packages/core/tests/unit/scheduled-publish.test.ts b/packages/core/tests/unit/scheduled-publish.test.ts new file mode 100644 index 000000000..9a96c08df --- /dev/null +++ b/packages/core/tests/unit/scheduled-publish.test.ts @@ -0,0 +1,307 @@ +import type { Kysely } from "kysely"; +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; + +import { handleContentPublish } from "../../src/api/handlers/content.js"; +import type { EmDashConfig } from "../../src/astro/integration/runtime.js"; +import { ContentRepository } from "../../src/database/repositories/content.js"; +import { RevisionRepository } from "../../src/database/repositories/revision.js"; +import { ScheduledNotDueError } from "../../src/database/repositories/types.js"; +import type { Database } from "../../src/database/types.js"; +import { EmDashRuntime } from "../../src/emdash-runtime.js"; +import { createHookPipeline } from "../../src/plugins/hooks.js"; +import { publishDueContent, type ScheduledPublishFn } from "../../src/scheduled-publish.js"; +import { createPostFixture, createPageFixture } from "../utils/fixtures.js"; +import { setupTestDatabaseWithCollections, teardownTestDatabase } from "../utils/test-db.js"; + +function buildRuntime(db: Kysely): EmDashRuntime { + const config: EmDashConfig = {}; + const pipelineFactoryOptions = { db } as const; + const hooks = createHookPipeline([], pipelineFactoryOptions); + const runtimeDeps = { + config, + plugins: [], + // eslint-disable-next-line typescript/no-explicit-any -- match RuntimeDependencies signature + createDialect: (() => { + throw new Error("createDialect not used in this test"); + }) as any, + createStorage: null, + sandboxEnabled: false, + sandboxedPluginEntries: [], + createSandboxRunner: null, + }; + + return new EmDashRuntime({ + db, + storage: null, + configuredPlugins: [], + sandboxedPlugins: new Map(), + sandboxedPluginEntries: [], + hooks, + enabledPlugins: new Set(), + pluginStates: new Map(), + config, + mediaProviders: new Map(), + mediaProviderEntries: [], + cronExecutor: null, + cronScheduler: null, + emailPipeline: null, + allPipelinePlugins: [], + pipelineFactoryOptions, + runtimeDeps, + pipelineRef: { current: hooks }, + }); +} + +describe("publishDueContent()", () => { + let db: Kysely; + let repo: ContentRepository; + + beforeEach(async () => { + db = await setupTestDatabaseWithCollections(); + repo = new ContentRepository(db); + }); + + afterEach(async () => { + await teardownTestDatabase(db); + }); + + it("promotes a scheduled draft whose time has passed", async () => { + const post = await repo.create(createPostFixture()); + // schedule() rejects past dates, so set the past schedule directly — + // this is the state a post reaches once its future schedule arrives. + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + const published = await publishDueContent(db); + + expect(published).toEqual([{ collection: "post", id: post.id }]); + + const updated = await repo.findById("post", post.id); + expect(updated?.status).toBe("published"); + expect(updated?.publishedAt).toBeTruthy(); + expect(updated?.scheduledAt).toBeNull(); + }); + + it("leaves future-scheduled content untouched", async () => { + const post = await repo.create(createPostFixture()); + const future = new Date(Date.now() + 86_400_000).toISOString(); + await repo.schedule("post", post.id, future); + + const published = await publishDueContent(db); + + expect(published).toEqual([]); + const updated = await repo.findById("post", post.id); + expect(updated?.status).toBe("scheduled"); + }); + + it("records the scheduled time as published_at, not the (later) sweep time", async () => { + const post = await repo.create(createPostFixture()); + // Scheduled for the past; the sweep runs "now", which is later. + const scheduledFor = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: scheduledFor }); + + await publishDueContent(db); + + const updated = await repo.findById("post", post.id); + // First publication should preserve the intended publish time. + expect(updated?.publishedAt).toBe(scheduledFor); + }); + + it("routes each publish through the provided callback with requireScheduledDue", async () => { + const post = await repo.create(createPostFixture()); + const scheduledFor = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: scheduledFor }); + + const calls: Array<{ collection: string; id: string; options: unknown }> = []; + const spy: ScheduledPublishFn = (collection, id, options) => { + calls.push({ collection, id, options }); + return handleContentPublish(db, collection, id, options); + }; + + const published = await publishDueContent(db, spy); + + expect(published).toEqual([{ collection: "post", id: post.id }]); + expect(calls).toHaveLength(1); + expect(calls[0]?.options).toEqual({ + publishedAt: scheduledFor, + requireScheduledDue: true, + }); + }); + + it("skips (without failing) items the publish callback reports as NOT_DUE", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + // Simulate the unschedule-during-sweep race: the callback reports the + // item is no longer due. The sweep must treat this as a quiet skip. + const published = await publishDueContent(db, async () => ({ + success: false, + error: { code: "NOT_DUE" }, + })); + + expect(published).toEqual([]); + }); + + it("sweeps every collection and is idempotent across runs", async () => { + const post = await repo.create(createPostFixture()); + const page = await repo.create(createPageFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + await repo.update("page", page.id, { status: "scheduled", scheduledAt: past }); + + const first = await publishDueContent(db); + expect(first).toHaveLength(2); + expect(first.map((r) => r.collection).toSorted()).toEqual(["page", "post"]); + + // A second sweep finds nothing — publish cleared scheduled_at. + const second = await publishDueContent(db); + expect(second).toEqual([]); + }); +}); + +describe("EmDashRuntime.runScheduledTasks()", () => { + let db: Kysely; + let repo: ContentRepository; + + beforeEach(async () => { + db = await setupTestDatabaseWithCollections(); + repo = new ContentRepository(db); + }); + + afterEach(async () => { + await teardownTestDatabase(db); + }); + + // This is the exact method the Cloudflare scheduled() handler invokes via + // runScheduledTasks(). It must promote due content and report it. + it("promotes due content and returns it for cache invalidation", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + const runtime = buildRuntime(db); + const { published } = await runtime.runScheduledTasks(); + + expect(published).toEqual([{ collection: "post", id: post.id }]); + const updated = await repo.findById("post", post.id); + expect(updated?.status).toBe("published"); + }); +}); + +describe("ContentRepository.publish() requireDue gate", () => { + let db: Kysely; + let repo: ContentRepository; + + beforeEach(async () => { + db = await setupTestDatabaseWithCollections(); + repo = new ContentRepository(db); + }); + + afterEach(async () => { + await teardownTestDatabase(db); + }); + + it("publishes a still-due item", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + const result = await repo.publish("post", post.id, undefined, true); + expect(result.status).toBe("published"); + }); + + it("refuses to publish an item that was unscheduled (race guard)", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + // Editor unschedules between selection and publish. + await repo.unschedule("post", post.id); + + await expect(repo.publish("post", post.id, undefined, true)).rejects.toBeInstanceOf( + ScheduledNotDueError, + ); + + const updated = await repo.findById("post", post.id); + expect(updated?.status).toBe("draft"); + }); + + it("claims the schedule so a second (overlapping) publish bails — no double publish", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + // First claim wins and publishes. + const first = await repo.publish("post", post.id, undefined, true); + expect(first.status).toBe("published"); + + // A concurrent/duplicate sweep that already selected this row before the + // first claim cleared scheduled_at must now affect 0 rows and bail. + await expect(repo.publish("post", post.id, undefined, true)).rejects.toBeInstanceOf( + ScheduledNotDueError, + ); + }); + + it("refuses to publish an item rescheduled into the future", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + // Pushed out to the future before the sweep reaches it. + const future = new Date(Date.now() + 86_400_000).toISOString(); + await repo.update("post", post.id, { scheduledAt: future }); + + await expect(repo.publish("post", post.id, undefined, true)).rejects.toBeInstanceOf( + ScheduledNotDueError, + ); + }); + + it("restores the schedule if publish work fails after the claim (no-transaction path)", async () => { + const post = await repo.create(createPostFixture()); + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { status: "scheduled", scheduledAt: past }); + + // Force a failure AFTER the atomic claim has cleared scheduled_at. This + // repo is unwrapped (no withTransaction), mimicking D1 where the claim is + // already durable when later work throws. + const spy = vi + .spyOn(RevisionRepository.prototype, "findById") + .mockRejectedValueOnce(new Error("boom")); + + await expect(repo.publish("post", post.id, undefined, true)).rejects.toThrow("boom"); + spy.mockRestore(); + + const after = await repo.findById("post", post.id); + // Schedule put back so a later sweep retries — not silently dropped. + expect(after?.scheduledAt).toBe(past); + expect(after?.status).toBe("scheduled"); + }); + + it("does not re-add a schedule when the row was fully published in the failure window", async () => { + const post = await repo.create(createPostFixture()); + // Publish it: status=published, draft_revision_id cleared. This is the + // state a concurrent manual publish would leave the row in. + await repo.publish("post", post.id); + // A stray schedule on an already-published row with no pending draft. + const past = new Date(Date.now() - 60_000).toISOString(); + await repo.update("post", post.id, { scheduledAt: past }); + + const spy = vi + .spyOn(RevisionRepository.prototype, "findById") + .mockRejectedValueOnce(new Error("boom")); + + await expect(repo.publish("post", post.id, undefined, true)).rejects.toThrow("boom"); + spy.mockRestore(); + + const after = await repo.findById("post", post.id); + // Restore is suppressed — no redundant republish next sweep. + expect(after?.scheduledAt).toBeNull(); + expect(after?.status).toBe("published"); + }); + + it("ignores the gate when requireDue is false (manual publish path)", async () => { + const post = await repo.create(createPostFixture()); + // Plain draft, never scheduled. + const result = await repo.publish("post", post.id); + expect(result.status).toBe("published"); + }); +}); diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 1fb9c0c69..0f536c4f5 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -7,6 +7,8 @@ import { defineConfig } from "vitest/config"; // Astro integration's vite plugin uses at build time. const virtualStubs: Record = { "virtual:emdash/wait-until": "export const waitUntil = undefined;", + // No timer heartbeat under test — like the Cloudflare adapter's output. + "virtual:emdash/scheduler": "export const createScheduler = null;", // Default-export an empty config so modules that read top-level fields // (e.g. `virtualConfig?.i18n?.defaultLocale`) don't blow up on import. // Tests that need real config still `vi.mock(...)` their own. diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8656fd7bb..bbc7ee781 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1365,6 +1365,9 @@ importers: '@arethetypeswrong/cli': specifier: 'catalog:' version: 0.18.2 + '@astrojs/cloudflare': + specifier: 'catalog:' + version: 13.5.3(@types/node@25.9.1)(astro@6.0.1(@types/node@25.9.1)(aws4fetch@1.0.20)(jiti@2.7.0)(lightningcss@1.32.0)(rollup@4.55.2)(typescript@6.0.3)(yaml@2.9.0))(jiti@2.7.0)(lightningcss@1.32.0)(wrangler@4.95.0(@cloudflare/workers-types@4.20260305.1))(yaml@2.9.0) '@cloudflare/workers-types': specifier: 'catalog:' version: 4.20260305.1 @@ -11730,6 +11733,32 @@ snapshots: - workerd - yaml + '@astrojs/cloudflare@13.5.3(@types/node@25.9.1)(astro@6.0.1(@types/node@25.9.1)(aws4fetch@1.0.20)(jiti@2.7.0)(lightningcss@1.32.0)(rollup@4.55.2)(typescript@6.0.3)(yaml@2.9.0))(jiti@2.7.0)(lightningcss@1.32.0)(wrangler@4.95.0(@cloudflare/workers-types@4.20260305.1))(yaml@2.9.0)': + dependencies: + '@astrojs/internal-helpers': 0.9.1 + '@astrojs/underscore-redirects': 1.0.3 + '@cloudflare/vite-plugin': 1.36.3(vite@7.3.3(@types/node@25.9.1)(jiti@2.7.0)(lightningcss@1.32.0)(yaml@2.9.0))(wrangler@4.95.0(@cloudflare/workers-types@4.20260305.1)) + astro: 6.0.1(@types/node@25.9.1)(aws4fetch@1.0.20)(jiti@2.7.0)(lightningcss@1.32.0)(rollup@4.55.2)(typescript@6.0.3)(yaml@2.9.0) + piccolore: 0.1.3 + tinyglobby: 0.2.16 + vite: 7.3.3(@types/node@25.9.1)(jiti@2.7.0)(lightningcss@1.32.0)(yaml@2.9.0) + wrangler: 4.95.0(@cloudflare/workers-types@4.20260305.1) + transitivePeerDependencies: + - '@types/node' + - bufferutil + - jiti + - less + - lightningcss + - sass + - sass-embedded + - stylus + - sugarss + - terser + - tsx + - utf-8-validate + - workerd + - yaml + '@astrojs/cloudflare@13.5.3(@types/node@25.9.1)(astro@6.3.5(@types/node@25.9.1)(aws4fetch@1.0.20)(jiti@2.7.0)(lightningcss@1.32.0)(rollup@4.55.2)(yaml@2.9.0))(jiti@2.7.0)(lightningcss@1.32.0)(wrangler@4.95.0(@cloudflare/workers-types@4.20260305.1))(yaml@2.9.0)': dependencies: '@astrojs/internal-helpers': 0.9.1 diff --git a/templates/blog-cloudflare/src/worker.ts b/templates/blog-cloudflare/src/worker.ts index c0f1ddc47..df373a019 100644 --- a/templates/blog-cloudflare/src/worker.ts +++ b/templates/blog-cloudflare/src/worker.ts @@ -1,5 +1,4 @@ -import handler from "@astrojs/cloudflare/entrypoints/server"; - -export { PluginBridge } from "@emdash-cms/cloudflare/sandbox"; - -export default handler; +// Worker entry: Astro's fetch handler plus EmDash's scheduled() handler, which +// the Cron Trigger in wrangler.jsonc drives. PluginBridge is the sandbox +// Durable Object, re-exported here so its binding resolves. +export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; diff --git a/templates/blog-cloudflare/wrangler.jsonc b/templates/blog-cloudflare/wrangler.jsonc index e2018ec99..f6baeeca7 100644 --- a/templates/blog-cloudflare/wrangler.jsonc +++ b/templates/blog-cloudflare/wrangler.jsonc @@ -22,4 +22,8 @@ "binding": "LOADER", }, ], + // Drives scheduled publishing, plugin cron, and maintenance (see src/worker.ts) + "triggers": { + "crons": ["* * * * *"], + }, } diff --git a/templates/marketing-cloudflare/src/worker.ts b/templates/marketing-cloudflare/src/worker.ts index c0f1ddc47..df373a019 100644 --- a/templates/marketing-cloudflare/src/worker.ts +++ b/templates/marketing-cloudflare/src/worker.ts @@ -1,5 +1,4 @@ -import handler from "@astrojs/cloudflare/entrypoints/server"; - -export { PluginBridge } from "@emdash-cms/cloudflare/sandbox"; - -export default handler; +// Worker entry: Astro's fetch handler plus EmDash's scheduled() handler, which +// the Cron Trigger in wrangler.jsonc drives. PluginBridge is the sandbox +// Durable Object, re-exported here so its binding resolves. +export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; diff --git a/templates/marketing-cloudflare/wrangler.jsonc b/templates/marketing-cloudflare/wrangler.jsonc index 213ccee6d..c020df079 100644 --- a/templates/marketing-cloudflare/wrangler.jsonc +++ b/templates/marketing-cloudflare/wrangler.jsonc @@ -22,4 +22,8 @@ "binding": "LOADER", }, ], + // Drives scheduled publishing, plugin cron, and maintenance (see src/worker.ts) + "triggers": { + "crons": ["* * * * *"], + }, } diff --git a/templates/portfolio-cloudflare/src/worker.ts b/templates/portfolio-cloudflare/src/worker.ts index c0f1ddc47..df373a019 100644 --- a/templates/portfolio-cloudflare/src/worker.ts +++ b/templates/portfolio-cloudflare/src/worker.ts @@ -1,5 +1,4 @@ -import handler from "@astrojs/cloudflare/entrypoints/server"; - -export { PluginBridge } from "@emdash-cms/cloudflare/sandbox"; - -export default handler; +// Worker entry: Astro's fetch handler plus EmDash's scheduled() handler, which +// the Cron Trigger in wrangler.jsonc drives. PluginBridge is the sandbox +// Durable Object, re-exported here so its binding resolves. +export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; diff --git a/templates/portfolio-cloudflare/wrangler.jsonc b/templates/portfolio-cloudflare/wrangler.jsonc index 91ad1992e..62ee430e6 100644 --- a/templates/portfolio-cloudflare/wrangler.jsonc +++ b/templates/portfolio-cloudflare/wrangler.jsonc @@ -22,4 +22,8 @@ "binding": "LOADER", }, ], + // Drives scheduled publishing, plugin cron, and maintenance (see src/worker.ts) + "triggers": { + "crons": ["* * * * *"], + }, } diff --git a/templates/starter-cloudflare/src/worker.ts b/templates/starter-cloudflare/src/worker.ts index c0f1ddc47..df373a019 100644 --- a/templates/starter-cloudflare/src/worker.ts +++ b/templates/starter-cloudflare/src/worker.ts @@ -1,5 +1,4 @@ -import handler from "@astrojs/cloudflare/entrypoints/server"; - -export { PluginBridge } from "@emdash-cms/cloudflare/sandbox"; - -export default handler; +// Worker entry: Astro's fetch handler plus EmDash's scheduled() handler, which +// the Cron Trigger in wrangler.jsonc drives. PluginBridge is the sandbox +// Durable Object, re-exported here so its binding resolves. +export { default, PluginBridge } from "@emdash-cms/cloudflare/worker"; diff --git a/templates/starter-cloudflare/wrangler.jsonc b/templates/starter-cloudflare/wrangler.jsonc index e2018ec99..f6baeeca7 100644 --- a/templates/starter-cloudflare/wrangler.jsonc +++ b/templates/starter-cloudflare/wrangler.jsonc @@ -22,4 +22,8 @@ "binding": "LOADER", }, ], + // Drives scheduled publishing, plugin cron, and maintenance (see src/worker.ts) + "triggers": { + "crons": ["* * * * *"], + }, }