Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,13 @@ jobs:
- name: Installing dependencies (Python)
run: uv sync --all-extras

- name: Running tests
- name: Running tests (parallel)
run: |
uv run pytest -s -v --cov=chancy --cov-report=xml
uv run pytest -n auto -v --cov=chancy --cov-report=xml

- name: Running Django ORM tests (sequential)
run: |
uv run pytest tests/contrib/django/test_models.py -v --cov=chancy --cov-report=xml --cov-append

- name: Uploading coverage
uses: codecov/codecov-action@v4
Expand Down
4 changes: 2 additions & 2 deletions chancy/contrib/django/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Chancy documentation for details.
"""

import asyncio
import inspect
from datetime import datetime, timezone
from functools import cached_property
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -192,7 +192,7 @@ async def aenqueue(

wrapper_func = (
ASYNC_WRAPPER_FUNC
if asyncio.iscoroutinefunction(task.func)
if inspect.iscoroutinefunction(task.func)
else WRAPPER_FUNC
)

Expand Down
6 changes: 5 additions & 1 deletion chancy/executors/asyncex.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import inspect
from asyncio import CancelledError

from chancy import Reference
Expand Down Expand Up @@ -47,7 +48,7 @@ def __len__(self):
async def _job_wrapper(self, job: QueuedJob):
try:
func, kwargs = Executor.get_function_and_kwargs(job)
if not asyncio.iscoroutinefunction(func):
if not inspect.iscoroutinefunction(func):
raise ValueError(
f"Function {job.func!r} is not an async function, which is"
f" required for the AsyncExecutor. Please use the"
Expand Down Expand Up @@ -75,6 +76,9 @@ async def cancel(self, ref: Reference):
task.cancel()
return

def get_running_jobs(self) -> list["QueuedJob"]:
return list(self.jobs.values())

async def stop(self):
"""
Stop the executor, giving it a chance to clean up any resources it
Expand Down
32 changes: 32 additions & 0 deletions chancy/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,35 @@ async def cancel(self, ref: Reference):
:param ref: The reference to the job to cancel.
"""

def get_running_job(self, ref: Reference) -> QueuedJob | None:
"""
Get a running job by its reference.

:param ref: The reference to the job to find.
:return: The job if found, None otherwise.
"""
for job in self.get_running_jobs():
if job.id == ref.identifier:
return job
return None

@abc.abstractmethod
def get_running_jobs(self) -> list[QueuedJob]:
"""
Get all jobs currently running in this executor.

:return: A list of running jobs.
"""

def is_job_running(self, ref: Reference) -> bool:
"""
Check if a job is currently running in this executor.

:param ref: The reference to the job to check.
:return: True if the job is running, False otherwise.
"""
return self.get_running_job(ref) is not None

@abc.abstractmethod
def get_default_concurrency(self) -> int:
"""
Expand Down Expand Up @@ -244,5 +273,8 @@ async def cancel(self, ref: Reference):
future.cancel()
return

def get_running_jobs(self) -> list[QueuedJob]:
return list(self.jobs.values())

def __len__(self):
return len(self.jobs)
6 changes: 4 additions & 2 deletions chancy/executors/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import functools
import inspect
import multiprocessing
import os
import warnings
Expand All @@ -15,6 +16,7 @@
from asyncio import Future, CancelledError
from concurrent.futures import ProcessPoolExecutor
from typing import Callable, Any
from uuid import UUID

from chancy import Reference
from chancy.executors.base import ConcurrentExecutor
Expand Down Expand Up @@ -138,7 +140,7 @@ async def push(self, job: QueuedJob) -> Future:

return future

async def _handle_timeout(self, job_id: str, time_limit: int):
async def _handle_timeout(self, job_id: UUID, time_limit: int):
try:
await asyncio.sleep(time_limit)
pid = self.pids_for_job.get(job_id)
Expand Down Expand Up @@ -190,7 +192,7 @@ def job_wrapper(cls, job: QueuedJob, pids_for_job) -> tuple[QueuedJob, Any]:
)
)

if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
Expand Down
5 changes: 3 additions & 2 deletions chancy/executors/sub.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import asyncio
import inspect
import os
import threading
import functools
from concurrent.futures import Future
Expand Down Expand Up @@ -122,7 +123,7 @@ def job_wrapper(cls, job: QueuedJob) -> tuple[QueuedJob, Any]:
timer.start()

try:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
Expand Down
3 changes: 2 additions & 1 deletion chancy/executors/thread.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import os
import threading
from concurrent.futures import ThreadPoolExecutor, Future
Expand Down Expand Up @@ -77,7 +78,7 @@ def job_wrapper(self, job: QueuedJob) -> tuple[QueuedJob, Any]:
timer.start()

try:
if asyncio.iscoroutinefunction(func):
if inspect.iscoroutinefunction(func):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
Expand Down
5 changes: 3 additions & 2 deletions chancy/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class State(enum.Enum):
SUCCEEDED = "succeeded"

#: The unique identifier for this job instance.
id: str
id: UUID
#: The time at which this job was created.
created_at: datetime
#: The time at which this job was started, if it has been started.
Expand All @@ -237,8 +237,9 @@ class State(enum.Enum):

@classmethod
def unpack(cls, data: dict) -> "QueuedJob":
id_ = data["id"]
return cls(
id=str(data["id"]),
id=id_ if isinstance(id_, UUID) else UUID(id_),
func=data["func"],
kwargs=data["kwargs"],
priority=data["priority"],
Expand Down
9 changes: 7 additions & 2 deletions chancy/migrations/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,15 @@ async def up(self, migrator: Migrator, cursor: AsyncCursor[DictRow]):
expires_at TIMESTAMPTZ NOT NULL
);
ALTER TABLE {leader}
ADD CONSTRAINT leader_worker_id_unique UNIQUE
ADD CONSTRAINT {leader_worker_id_unique} UNIQUE
(worker_id);
"""
).format(leader=sql.Identifier(f"{migrator.prefix}leader"))
).format(
leader=sql.Identifier(f"{migrator.prefix}leader"),
leader_worker_id_unique=sql.Identifier(
f"{migrator.prefix}leader_worker_id_unique"
),
)
)

await cursor.execute(
Expand Down
62 changes: 62 additions & 0 deletions chancy/migrations/v7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from psycopg import AsyncCursor, sql
from psycopg.rows import DictRow

from chancy.migrate import Migration, Migrator


class V7Migration(Migration):
async def up(self, migrator: Migrator, cursor: AsyncCursor[DictRow]):
"""
Rename the leader_worker_id_unique constraint to use the prefix.

This fixes an oversight where the constraint name was hardcoded
instead of using the configurable prefix, which prevented running
multiple Chancy instances with different prefixes in the same database.
"""
old_name = "leader_worker_id_unique"
new_name = f"{migrator.prefix}leader_worker_id_unique"

# Check if the old unprefixed constraint exists
await cursor.execute(
"""
SELECT 1 FROM pg_constraint
WHERE conname = %s
""",
(old_name,),
)
if await cursor.fetchone():
await cursor.execute(
sql.SQL(
"ALTER TABLE {table} RENAME CONSTRAINT {old} TO {new}"
).format(
table=sql.Identifier(f"{migrator.prefix}leader"),
old=sql.Identifier(old_name),
new=sql.Identifier(new_name),
)
)

async def down(self, migrator: Migrator, cursor: AsyncCursor[DictRow]):
"""
Rename the constraint back to the unprefixed name.
"""
old_name = f"{migrator.prefix}leader_worker_id_unique"
new_name = "leader_worker_id_unique"

# Check if the prefixed constraint exists
await cursor.execute(
"""
SELECT 1 FROM pg_constraint
WHERE conname = %s
""",
(old_name,),
)
if await cursor.fetchone():
await cursor.execute(
sql.SQL(
"ALTER TABLE {table} RENAME CONSTRAINT {old} TO {new}"
).format(
table=sql.Identifier(f"{migrator.prefix}leader"),
old=sql.Identifier(old_name),
new=sql.Identifier(new_name),
)
)
6 changes: 3 additions & 3 deletions chancy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,20 @@ def import_string(name):
return getattr(module, func_name)


def chancy_uuid() -> str:
def chancy_uuid() -> uuid.UUID:
"""
Generate a UUID suitable for use as a job ID.

.. note::

It's UUID7, kinda, since the draft keeps changing.

:return: str
:return: UUID
"""
t = (time.time_ns() // 100) & 0xFFFFFFFFFFFFFF
rand = secrets.randbits(62)
uuid7 = (t << 68) | (7 << 64) | (2 << 62) | rand
return f"{uuid7:032x}"
return uuid.UUID(f"{uuid7:032x}")


def json_dumps(obj, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dev = [
"pytest-django>=4.8.0",
"pre-commit>=4.3.0",
"ruff>=0.14.2",
"pytest-xdist>=3.8.0",
]

[project.scripts]
Expand Down
48 changes: 45 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
from typing import AsyncIterator

import pytest
Expand All @@ -8,6 +9,36 @@
from chancy import Chancy, Worker


@pytest.fixture(scope="session")
def xdist_worker() -> str | None:
"""Returns the xdist worker ID (e.g., 'gw0') or None if not running parallel."""
return os.environ.get("PYTEST_XDIST_WORKER")


@pytest.fixture(scope="session")
def test_prefix(xdist_worker) -> str:
"""
Unique table prefix for parallel test isolation.

Returns 'chancy_' normally, or 'chancy_gw0_' etc. when running with xdist.
"""
if xdist_worker is None:
return "chancy_"
return f"chancy_{xdist_worker}_"


@pytest.fixture(scope="session")
def test_suffix(xdist_worker) -> str:
"""
Unique suffix for test resources (tables, schemas) that need isolation.

Returns '' normally, or '_gw0' etc. when running with xdist.
"""
if xdist_worker is None:
return ""
return f"_{xdist_worker}"


@pytest.fixture(scope="session")
def event_loop_policy():
# Since psycopg's asyncio implementation cannot use the default
Expand All @@ -18,28 +49,39 @@ def event_loop_policy():


@pytest_asyncio.fixture()
async def chancy(request):
async def chancy(request, test_prefix):
"""
Provides a Chancy application instance with an open connection pool
to the test database.

When running with pytest-xdist, each worker gets a unique table prefix
to enable parallel test execution without conflicts.
"""
params = getattr(request, "param", {})
# Allow tests to override prefix, but default to worker-specific prefix
if "prefix" not in params:
params = {**params, "prefix": test_prefix}

async with Chancy(
"postgresql://postgres:localtest@localhost:8190/postgres",
**getattr(request, "param", {}),
**params,
) as chancy:
await chancy.migrate()
yield chancy
await chancy.migrate(to_version=0)


@pytest.fixture
def chancy_just_app():
def chancy_just_app(test_prefix):
"""
Provides just a configured chancy instance with no open connection pool
or migrations.

When running with pytest-xdist, uses a unique table prefix for isolation.
"""
return Chancy(
"postgresql://postgres:localtest@localhost:8190/postgres",
prefix=test_prefix,
)


Expand Down
4 changes: 2 additions & 2 deletions tests/contrib/django/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

@pytest.mark.django_db
@pytest.mark.asyncio
async def test_django_style_connection(settings):
async with Chancy(settings.DATABASES["default"]) as app:
async def test_django_style_connection(settings, test_prefix):
async with Chancy(settings.DATABASES["default"], prefix=test_prefix) as app:
await app.migrate()
await app.migrate(to_version=0)
Loading
Loading