Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.36.0 contains a new database migration, version 7, but running it is **only necessary if you're using SQLite**. It converts all uses of `json` to `jsonb`. If using Postgres, version 7 is a no-op. You can run it now or wait for another migration in the future and run the two together.

See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-up --database-url "$DATABASE_URL"
```

If not using River's internal migration system, the raw SQL can alternatively be dumped with:

```shell
go install github.com/riverqueue/river/cmd/river@latest
river migrate-get --database-url sqlite:// --version 6 --up > river7.up.sql
river migrate-get --database-url sqlite:// --version 6 --down > river7.down.sql
```

### Changed

- Convert SQLite JSON columns to JSONB (including migration). [PR #1224](https://github.com/riverqueue/river/pull/1224).

## [0.35.0] - 2026-04-18

### Changed
Expand Down
2 changes: 1 addition & 1 deletion riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ func MigrationLineMainTruncateTables(version int) []string {
return []string{"river_job", "river_leader"}
case 4:
return []string{"river_job", "river_leader", "river_queue"}
case 0, 5, 6:
case 0, 5, 6, 7:
return []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- No-op migration to keep version numbers in sync across all drivers.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- No-op migration to keep version numbers in sync across all drivers.
-- The SQLite driver uses this version for a JSONB conversion.
2 changes: 2 additions & 0 deletions riverdriver/riverdrivertest/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func exerciseMigration[TTx any](ctx context.Context, t *testing.T,
driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 5))
require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"},
driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 6))
require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"},
driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 7))
require.Equal(t, []string{"river_job", "river_leader", "river_queue", "river_client", "river_client_queue"},
driver.GetMigrationTruncateTables(riverdriver.MigrationLineMain, 0))
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- No-op migration to keep version numbers in sync across all drivers.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- No-op migration to keep version numbers in sync across all drivers.
-- The SQLite driver uses this version for a JSONB conversion.
2 changes: 1 addition & 1 deletion riverdriver/riversqlite/internal/dbsqlc/river_client.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE river_client (
id text PRIMARY KEY NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
metadata blob NOT NULL DEFAULT (json('{}')),
metadata jsonb NOT NULL DEFAULT (jsonb('{}')),
paused_at timestamp,
updated_at timestamp NOT NULL,
CONSTRAINT name_length CHECK (length(id) > 0 AND length(id) < 128)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TABLE river_client_queue (
name text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
max_workers integer NOT NULL DEFAULT 0,
metadata blob NOT NULL DEFAULT (json('{}')),
metadata jsonb NOT NULL DEFAULT (jsonb('{}')),
num_jobs_completed integer NOT NULL DEFAULT 0,
num_jobs_running integer NOT NULL DEFAULT 0,
updated_at timestamp NOT NULL,
Expand Down
42 changes: 21 additions & 21 deletions riverdriver/riversqlite/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
CREATE TABLE river_job (
id integer PRIMARY KEY, -- SQLite makes this autoincrementing automatically
args blob NOT NULL DEFAULT '{}',
args jsonb NOT NULL DEFAULT (jsonb('{}')),
attempt integer NOT NULL DEFAULT 0,
attempted_at timestamp,
attempted_by blob, -- JSON array of strings
attempted_by jsonb, -- JSON array of strings
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
errors blob, -- JSON array of error objects
errors jsonb, -- JSON array of error objects
finalized_at timestamp,
kind text NOT NULL,
max_attempts integer NOT NULL,
metadata blob NOT NULL DEFAULT (json('{}')),
metadata jsonb NOT NULL DEFAULT (jsonb('{}')),
priority integer NOT NULL DEFAULT 1,
queue text NOT NULL DEFAULT 'default',
state text NOT NULL DEFAULT 'available',
scheduled_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
tags blob NOT NULL DEFAULT (json('[]')), -- JSON array of strings
tags jsonb NOT NULL DEFAULT (jsonb('[]')), -- JSON array of strings
unique_key blob,
unique_states integer,
CONSTRAINT finalized_or_finalized_at_null CHECK (
Expand Down Expand Up @@ -44,7 +44,7 @@ SET
finalized_at = CASE WHEN state = 'running' THEN finalized_at ELSE coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')) END,
-- Mark the job as cancelled by query so that the rescuer knows not to
-- rescue it, even if it gets stuck in the running state:
metadata = json_set(metadata, '$.cancel_attempted_at', cast(@cancel_attempted_at AS text))
metadata = jsonb_set(metadata, '$.cancel_attempted_at', cast(@cancel_attempted_at AS text))
WHERE id = @id
AND state NOT IN ('cancelled', 'completed', 'discarded')
AND finalized_at IS NULL
Expand Down Expand Up @@ -219,12 +219,12 @@ INSERT INTO /* TEMPLATE: schema */river_job(
coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')),
@kind,
@max_attempts,
json(cast(@metadata AS blob)),
jsonb(@metadata),
@priority,
@queue,
coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')),
@state,
json(cast(@tags AS blob)),
jsonb(@tags),
CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END,
@unique_states
)
Expand Down Expand Up @@ -265,12 +265,12 @@ INSERT INTO /* TEMPLATE: schema */river_job(
coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')),
@kind,
@max_attempts,
json(cast(@metadata AS blob)),
jsonb(@metadata),
@priority,
@queue,
coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')),
@state,
json(cast(@tags AS blob)),
jsonb(@tags),
CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END,
@unique_states
)
Expand Down Expand Up @@ -313,18 +313,18 @@ INSERT INTO /* TEMPLATE: schema */river_job(
@args,
@attempt,
cast(sqlc.narg('attempted_at') as text),
CASE WHEN length(cast(@attempted_by AS blob)) = 0 THEN NULL ELSE json(@attempted_by) END,
CASE WHEN length(cast(@attempted_by AS blob)) = 0 THEN NULL ELSE jsonb(@attempted_by) END,
coalesce(cast(sqlc.narg('created_at') AS text), datetime('now', 'subsec')),
CASE WHEN length(cast(@errors AS blob)) = 0 THEN NULL ELSE @errors END,
cast(sqlc.narg('finalized_at') as text),
@kind,
@max_attempts,
json(cast(@metadata AS blob)),
jsonb(@metadata),
@priority,
@queue,
coalesce(cast(sqlc.narg('scheduled_at') AS text), datetime('now', 'subsec')),
@state,
json(cast(@tags AS blob)),
jsonb(@tags),
CASE WHEN length(cast(@unique_key AS blob)) = 0 THEN NULL ELSE @unique_key END,
@unique_states
) RETURNING *;
Expand Down Expand Up @@ -358,10 +358,10 @@ LIMIT @max;
-- name: JobRescue :exec
UPDATE /* TEMPLATE: schema */river_job
SET
errors = json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(@error AS blob))),
errors = jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(@error)),
finalized_at = cast(sqlc.narg('finalized_at') as text),
scheduled_at = @scheduled_at,
metadata = json_set(
metadata = jsonb_set(
metadata,
'$."river:rescue_count"',
coalesce(
Expand Down Expand Up @@ -444,7 +444,7 @@ RETURNING *;

-- name: JobScheduleSetDiscarded :many
UPDATE /* TEMPLATE: schema */river_job
SET metadata = json_patch(metadata, json('{"unique_key_conflict": "scheduler_discarded"}')),
SET metadata = jsonb_patch(metadata, jsonb('{"unique_key_conflict": "scheduler_discarded"}')),
finalized_at = coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec')),
state = 'discarded'
WHERE id IN (sqlc.slice('id'))
Expand All @@ -454,7 +454,7 @@ RETURNING *;
-- for JobSetStateIfRunning to use when falling back to non-running jobs.
-- name: JobSetMetadataIfNotRunning :one
UPDATE /* TEMPLATE: schema */river_job
SET metadata = json_patch(metadata, json(cast(@metadata_updates AS blob)))
SET metadata = jsonb_patch(metadata, jsonb(@metadata_updates))
WHERE id = @id
AND state != 'running'
RETURNING *;
Expand All @@ -472,15 +472,15 @@ SET
THEN @attempt
ELSE attempt END,
errors = CASE WHEN cast(@errors_do_update AS boolean)
THEN json_insert(coalesce(errors, json('[]')), '$[#]', json(cast(@error AS blob)))
THEN jsonb_insert(coalesce(errors, jsonb('[]')), '$[#]', jsonb(@error))
ELSE errors END,
finalized_at = CASE WHEN /* should_cancel */((@state = 'retryable' OR @state = 'scheduled') AND (metadata -> 'cancel_attempted_at') iS NOT NULL)
THEN coalesce(cast(sqlc.narg('now') AS text), datetime('now', 'subsec'))
WHEN cast(@finalized_at_do_update AS boolean)
THEN @finalized_at
ELSE finalized_at END,
metadata = CASE WHEN cast(@metadata_do_merge AS boolean)
THEN json_patch(metadata, json(cast(@metadata_updates AS blob)))
THEN jsonb_patch(metadata, jsonb(@metadata_updates))
ELSE metadata END,
scheduled_at = CASE WHEN /* NOT should_cancel */(cast(@state AS text) <> 'retryable' AND @state <> 'scheduled' OR (metadata -> 'cancel_attempted_at') IS NULL) AND cast(@scheduled_at_do_update AS boolean)
THEN @scheduled_at
Expand All @@ -495,7 +495,7 @@ RETURNING *;
-- name: JobUpdate :one
UPDATE /* TEMPLATE: schema */river_job
SET
metadata = CASE WHEN cast(@metadata_do_merge AS boolean) THEN json_patch(metadata, json(cast(@metadata AS blob))) ELSE metadata END
metadata = CASE WHEN cast(@metadata_do_merge AS boolean) THEN jsonb_patch(metadata, jsonb(@metadata)) ELSE metadata END
WHERE id = @id
RETURNING *;

Expand All @@ -510,7 +510,7 @@ SET
errors = CASE WHEN cast(@errors_do_update AS boolean) THEN @errors ELSE errors END,
finalized_at = CASE WHEN cast(@finalized_at_do_update AS boolean) THEN @finalized_at ELSE finalized_at END,
max_attempts = CASE WHEN cast(@max_attempts_do_update AS boolean) THEN @max_attempts ELSE max_attempts END,
metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN json(cast(@metadata AS blob)) ELSE metadata END,
metadata = CASE WHEN cast(@metadata_do_update AS boolean) THEN jsonb(@metadata) ELSE metadata END,
state = CASE WHEN cast(@state_do_update AS boolean) THEN @state ELSE state END
WHERE id = @id
RETURNING *;
Loading
Loading