Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Add schedule retry and timeout fields

Revision ID: 4d6f8f8a9c21
Revises: 18b9095bc772
Create Date: 2026-05-22 03:10:00.000000

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "4d6f8f8a9c21"
down_revision: Union[str, Sequence[str], None] = "18b9095bc772"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
with op.batch_alter_table("schedules", schema=None) as batch_op:
batch_op.add_column(
sa.Column("backup_timeout_seconds", sa.Integer(), nullable=True)
)
batch_op.add_column(
sa.Column(
"backup_retry_count", sa.Integer(), nullable=False, server_default="0"
)
)
batch_op.add_column(
sa.Column("cloud_sync_timeout_seconds", sa.Integer(), nullable=True)
)
batch_op.add_column(
sa.Column(
"cloud_sync_retry_count",
sa.Integer(),
nullable=False,
server_default="0",
)
)


def downgrade() -> None:
"""Downgrade schema."""
with op.batch_alter_table("schedules", schema=None) as batch_op:
batch_op.drop_column("cloud_sync_retry_count")
batch_op.drop_column("cloud_sync_timeout_seconds")
batch_op.drop_column("backup_retry_count")
batch_op.drop_column("backup_timeout_seconds")
19 changes: 12 additions & 7 deletions src/borgitory/api/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ async def create_schedule(
source_path=schedule.source_path or "",
cloud_sync_config_id=schedule.cloud_sync_config_id,
prune_config_id=schedule.prune_config_id,
check_config_id=schedule.check_config_id,
notification_config_id=schedule.notification_config_id,
pre_job_hooks=schedule.pre_job_hooks,
post_job_hooks=schedule.post_job_hooks,
patterns=schedule.patterns,
backup_timeout_seconds=schedule.backup_timeout_seconds,
backup_retry_count=schedule.backup_retry_count,
cloud_sync_timeout_seconds=schedule.cloud_sync_timeout_seconds,
cloud_sync_retry_count=schedule.cloud_sync_retry_count,
)

if result.is_error or not result.schedule:
Expand Down Expand Up @@ -489,7 +494,7 @@ async def move_hook(
{"hook_type": hook_type, "hooks": current_hooks},
)

except ValueError, TypeError, KeyError:
except (ValueError, TypeError, KeyError):
return HTMLResponse(content='<div class="space-y-4"></div>')


Expand Down Expand Up @@ -519,7 +524,7 @@ async def remove_hook_field(
{"hook_type": hook_type, "hooks": current_hooks},
)

except ValueError, TypeError, KeyError:
except (ValueError, TypeError, KeyError):
return HTMLResponse(content='<div class="space-y-4"></div>')


Expand All @@ -537,7 +542,7 @@ async def get_hooks_modal(
# Get data from the actual form field names
pre_hooks_json = str(json_data.get("pre_job_hooks", "[]"))
post_hooks_json = str(json_data.get("post_job_hooks", "[]"))
except ValueError, TypeError, KeyError:
except (ValueError, TypeError, KeyError):
pre_hooks_json = "[]"
post_hooks_json = "[]"

Expand Down Expand Up @@ -580,7 +585,7 @@ async def save_hooks(
try:
pre_count = len(json.loads(pre_hooks_json)) if pre_hooks_json else 0
post_count = len(json.loads(post_hooks_json)) if post_hooks_json else 0
except json.JSONDecodeError, TypeError:
except (json.JSONDecodeError, TypeError):
pre_count = 0
post_count = 0

Expand Down Expand Up @@ -663,7 +668,7 @@ async def move_pattern(
{"patterns": current_patterns},
)

except ValueError, TypeError, KeyError:
except (ValueError, TypeError, KeyError):
return HTMLResponse(content='<div class="space-y-4"></div>')


Expand All @@ -690,7 +695,7 @@ async def remove_pattern_field(
{"patterns": current_patterns},
)

except ValueError, TypeError, KeyError:
except (ValueError, TypeError, KeyError):
return HTMLResponse(content='<div class="space-y-4"></div>')


Expand Down Expand Up @@ -741,7 +746,7 @@ async def save_patterns(

try:
total_count = len(json.loads(patterns_json)) if patterns_json else 0
except json.JSONDecodeError, TypeError:
except (json.JSONDecodeError, TypeError):
total_count = 0

return templates.TemplateResponse(
Expand Down
8 changes: 8 additions & 0 deletions src/borgitory/models/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ class Schedule(Base):
pre_job_hooks: Mapped[str | None] = mapped_column(Text, nullable=True)
post_job_hooks: Mapped[str | None] = mapped_column(Text, nullable=True)
patterns: Mapped[str | None] = mapped_column(Text, nullable=True)
backup_timeout_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True)
backup_retry_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
cloud_sync_timeout_seconds: Mapped[int | None] = mapped_column(
Integer, nullable=True
)
cloud_sync_retry_count: Mapped[int] = mapped_column(
Integer, nullable=False, default=0
)

repository: Mapped["Repository"] = relationship(
"Repository", back_populates="schedules"
Expand Down
61 changes: 61 additions & 0 deletions src/borgitory/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ class ScheduleCreate(ScheduleBase):
pre_job_hooks: Optional[str] = None
post_job_hooks: Optional[str] = None
patterns: Optional[str] = None
backup_timeout_seconds: Optional[int] = Field(None, gt=0)
backup_retry_count: int = Field(default=0, ge=0)
cloud_sync_timeout_seconds: Optional[int] = Field(None, gt=0)
cloud_sync_retry_count: int = Field(default=0, ge=0)

@field_validator("cloud_sync_config_id", mode="before")
@classmethod
Expand Down Expand Up @@ -383,6 +387,21 @@ def validate_patterns(cls, v: Union[str, None]) -> Optional[str]:
raise ValueError(f"Invalid patterns configuration: {error_msg}")
return v.strip()

@field_validator(
"backup_timeout_seconds",
"backup_retry_count",
"cloud_sync_timeout_seconds",
"cloud_sync_retry_count",
mode="before",
)
@classmethod
def validate_optional_numeric_fields(
cls, v: Union[str, int, None]
) -> Optional[int]:
if v == "" or v is None:
return None
return int(v)


class ScheduleUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=128)
Expand All @@ -397,6 +416,10 @@ class ScheduleUpdate(BaseModel):
pre_job_hooks: Optional[str] = None
post_job_hooks: Optional[str] = None
patterns: Optional[str] = None
backup_timeout_seconds: Optional[int] = Field(None, gt=0)
backup_retry_count: Optional[int] = Field(None, ge=0)
cloud_sync_timeout_seconds: Optional[int] = Field(None, gt=0)
cloud_sync_retry_count: Optional[int] = Field(None, ge=0)

@field_validator("pre_job_hooks", mode="before")
@classmethod
Expand Down Expand Up @@ -482,6 +505,21 @@ def validate_notification_config_id(cls, v: Union[str, int, None]) -> Optional[i
return None
return int(v)

@field_validator(
"backup_timeout_seconds",
"backup_retry_count",
"cloud_sync_timeout_seconds",
"cloud_sync_retry_count",
mode="before",
)
@classmethod
def validate_optional_numeric_fields(
cls, v: Union[str, int, None]
) -> Optional[int]:
if v == "" or v is None:
return None
return int(v)


class Schedule(ScheduleBase):
id: int = Field(gt=0)
Expand All @@ -493,6 +531,10 @@ class Schedule(ScheduleBase):
created_at: datetime
cloud_sync_config_id: Optional[int] = Field(None, gt=0)
prune_config_id: Optional[int] = Field(None, gt=0)
backup_timeout_seconds: Optional[int] = Field(None, gt=0)
backup_retry_count: int = Field(default=0, ge=0)
cloud_sync_timeout_seconds: Optional[int] = Field(None, gt=0)
cloud_sync_retry_count: int = Field(default=0, ge=0)

model_config = {
"from_attributes": True,
Expand Down Expand Up @@ -619,6 +661,10 @@ class BackupRequest(BaseModel):
pre_job_hooks: Optional[str] = None
post_job_hooks: Optional[str] = None
patterns: Optional[str] = None
backup_timeout_seconds: Optional[int] = Field(None, gt=0)
backup_retry_count: Optional[int] = Field(None, ge=0)
cloud_sync_timeout_seconds: Optional[int] = Field(None, gt=0)
cloud_sync_retry_count: Optional[int] = Field(None, ge=0)

@field_validator("dry_run", mode="before")
@classmethod
Expand Down Expand Up @@ -674,6 +720,21 @@ def validate_patterns(cls, v: Union[str, None]) -> Optional[str]:
raise ValueError(f"Invalid patterns configuration: {error_msg}")
return v.strip()

@field_validator(
"backup_timeout_seconds",
"backup_retry_count",
"cloud_sync_timeout_seconds",
"cloud_sync_retry_count",
mode="before",
)
@classmethod
def validate_optional_numeric_fields(
cls, v: Union[str, int, None]
) -> Optional[int]:
if v == "" or v is None:
return None
return int(v)


class CloudSyncConfigBase(BaseModel):
name: str = Field(
Expand Down
4 changes: 4 additions & 0 deletions src/borgitory/services/jobs/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,14 @@ async def create_backup_job(
repository_name=repository.name,
include_backup=True,
backup_params=backup_params,
backup_timeout_seconds=backup_request.backup_timeout_seconds,
backup_retry_count=backup_request.backup_retry_count,
prune_config_id=backup_request.prune_config_id,
check_config_id=backup_request.check_config_id,
include_cloud_sync=backup_request.cloud_sync_config_id is not None,
cloud_sync_config_id=backup_request.cloud_sync_config_id,
cloud_sync_timeout_seconds=backup_request.cloud_sync_timeout_seconds,
cloud_sync_retry_count=backup_request.cloud_sync_retry_count,
notification_config_id=backup_request.notification_config_id,
pre_job_hooks=backup_request.pre_job_hooks,
post_job_hooks=backup_request.post_job_hooks,
Expand Down
66 changes: 59 additions & 7 deletions src/borgitory/services/jobs/task_executors/backup_task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
JobEventBroadcasterProtocol,
)
from borgitory.protocols.command_protocols import ProcessExecutorProtocol
from borgitory.protocols.command_protocols import ProcessResult
from borgitory.protocols.job_output_manager_protocol import JobOutputManagerProtocol
from borgitory.protocols.job_database_manager_protocol import JobDatabaseManagerProtocol
from borgitory.services.jobs.broadcaster.event_type import EventType
Expand Down Expand Up @@ -141,14 +142,65 @@ def task_output_callback(line: str) -> None:
additional_args=additional_args,
environment_overrides=env_overrides,
)
process = await self.job_executor.start_process(
borg_command.command, borg_command.environment
)
timeout_value = params.get("timeout")
timeout_seconds = int(str(timeout_value)) if timeout_value else None
retry_count_value = params.get("retry_count")
retry_count = int(str(retry_count_value)) if retry_count_value else 0
total_attempts = max(1, retry_count + 1)

result = None
for attempt in range(1, total_attempts + 1):
if total_attempts > 1:
task_output_callback(
f"Backup attempt {attempt} of {total_attempts} started"
)

# Monitor the process (outside context manager since it's long-running)
result = await self.job_executor.monitor_process_output(
process, output_callback=task_output_callback
)
process = await self.job_executor.start_process(
borg_command.command, borg_command.environment
)

try:
if timeout_seconds:
result = await asyncio.wait_for(
self.job_executor.monitor_process_output(
process, output_callback=task_output_callback
),
timeout=float(timeout_seconds),
)
else:
result = await self.job_executor.monitor_process_output(
process, output_callback=task_output_callback
)
except asyncio.TimeoutError:
await self.job_executor.terminate_process(process)
timeout_error = (
f"Backup timed out after {timeout_seconds}s "
f"(attempt {attempt}/{total_attempts})"
)
task_output_callback(timeout_error)
logger.warning(timeout_error)
if attempt < total_attempts:
continue
result = None

if result and result.return_code == 0:
break

if attempt < total_attempts:
task_output_callback(
f"Backup attempt {attempt} failed, retrying..."
)

if result is None:
result = ProcessResult(
return_code=124,
stdout=b"",
stderr=b"",
error=(
f"Backup timed out after {timeout_seconds}s "
f"for all {total_attempts} attempt(s)"
),
)

logger.info(
f"Backup process completed with return code: {result.return_code}"
Expand Down
Loading