Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 94 additions & 88 deletions osidb/sync_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import models, transaction
from django.db.models import F, Q
from django.utils import timezone
from rhubarb.tasks import LockableTaskWithArgs

Expand Down Expand Up @@ -324,97 +325,102 @@ def check_for_reschedules(cls):
This method needs to be called occasionally to check if any of the existing sync managers
need to re-schedule tasks for any reason (like previous failure).
"""
for sync_manager in SyncManager.objects.filter(name=cls.__name__):
# TODO: Find a cause and remove this workaround OSIDB-3131
# TODO: Should be fixed, check from time to time to see if this problem is logged
if (
sync_manager.last_scheduled_dt is None
and sync_manager.last_started_dt is not None
):
logger.info(
f"{sync_manager.__class__.__name__} {sync_manager.sync_id}: "
f"Started but not scheduled, this should NEVER happen"
)
continue
sync_managers = SyncManager.objects.filter(name=cls.__name__)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The move from Python-level filtering to DB queries is a solid improvement, but I think there's still a potential bottleneck: if many sync managers match the reschedule conditions, we're still calling reschedule() for each one individually (multiple DB queries + a broker call per sync manager). On stage, where we're seeing OOM kills due to a large volume of failed sync managers, this could still be problematic.

Would it make sense to cap each reschedule check to a reasonable batch size? Since most collectors run every minute, we'd eventually process all pending reschedules across successive runs.

To prevent starvation (the same rows getting picked every time while others wait indefinitely), we could order candidates by last_scheduled_dt ascending, oldest-waiting items get rescheduled first, and once rescheduled their timestamp moves to the back of the queue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good idea. Scheduling in batches makes sense. Will convert the rescheduler into a capped queue-like system.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a cap for each check and sorted each one with their respective datetimes. Thoughts on the limit/sorting order?

processed = set()

# SCHEDULED, DID NOT START
# 1) Scheduled at least once before
# 2) Scheduled for more than MAX_SCHEDULE_DELAY
# 3) Not started after scheduled (or ever)
#
# | MAX_SCHEDULE_DELAY |
# |-------------------------------|---//-------?
# Scheduled NOW Started
#
if (
sync_manager.last_scheduled_dt is not None
and timezone.now() - sync_manager.last_scheduled_dt
> cls.MAX_SCHEDULE_DELAY
and (
sync_manager.last_started_dt is None
or sync_manager.last_started_dt < sync_manager.last_scheduled_dt
)
):
cls.reschedule(
sync_manager.sync_id, "Sync did not start after MAX_SCHEDULE_DELAY"
)
continue
def reschedule(sm_fields, msg=None, msg_fn=None):
for sm in sm_fields:
sync_id = sm["sync_id"]
if sync_id in processed:
continue

# STARTED, DID NOT FINISH
# 1) Started at least once before
# 2) Running for more than MAX_RUN_LENGTH
# 3) Not finish after it started (or ever)
# 4) Not failed after it started (or ever)
# 5) Not scheduled after started
#
# | MAX_RUN_LENGTH |
# |-------------------------------|---//-------------?----------------?
# Started NOW Success / Failure Scheduled
#

if (
sync_manager.last_started_dt is not None
and timezone.now() - sync_manager.last_started_dt > cls.MAX_RUN_LENGTH
and (
sync_manager.last_finished_dt is None
or sync_manager.last_finished_dt < sync_manager.last_started_dt
)
and (
sync_manager.last_failed_dt is None
or sync_manager.last_failed_dt < sync_manager.last_started_dt
)
and sync_manager.last_scheduled_dt < sync_manager.last_started_dt
):
cls.reschedule(
sync_manager.sync_id, "Sync did not finish after MAX_RUN_LENGTH"
)
continue
reason = msg_fn(sm) if msg_fn is not None else msg
cls.reschedule(sync_id, reason)
processed.add(sync_id)

# STARTED, FAILED, NOT PERMANENTLY
# 1) Started at least once before
# 2) Failed recently
# 3) Not permanent failure
# 3) Failed more than FAIL_RESCHEDULE_DELAY ago
# 4) Was not scheduled after last failure
#
# | FAIL_RESCHEDULE_DELAY |
# |-----------|------------------------------|---//----------?
# Started Fail (not permanent?) NOW Scheduled
#

if (
sync_manager.last_started_dt is not None
and 0 < sync_manager.last_consecutive_failures
and not sync_manager.permanently_failed
and timezone.now() - sync_manager.last_failed_dt
> cls.FAIL_RESCHEDULE_DELAY
and sync_manager.last_scheduled_dt < sync_manager.last_failed_dt
):
cls.reschedule(
sync_manager.sync_id,
f"Failed {sync_manager.last_consecutive_failures} times",
)
continue
# TODO: Find a cause and remove this workaround OSIDB-3131
# TODO: Should be fixed, check from time to time to see if this problem is logged
started_not_scheduled_ids = sync_managers.filter(
last_scheduled_dt__isnull=True, last_started_dt__isnull=False
).values_list("sync_id", flat=True)

for sync_id in started_not_scheduled_ids:
logger.info(
f"{cls.__name__} {sync_id}: "
f"Started but not scheduled, this should NEVER happen"
)

processed.add(sync_id)

# SCHEDULED, DID NOT START
# 1) Scheduled at least once before
# 2) Scheduled for more than MAX_SCHEDULE_DELAY
# 3) Not started after scheduled (or ever)
#
# | MAX_SCHEDULE_DELAY |
# |-------------------------------|---//-------?
# Scheduled NOW Started
#
scheduled_not_started = sync_managers.filter(
Q(last_started_dt__isnull=True)
| Q(last_started_dt__lt=F("last_scheduled_dt")),
last_scheduled_dt__isnull=False,
last_scheduled_dt__lt=timezone.now() - cls.MAX_SCHEDULE_DELAY,
).values("sync_id")

reschedule(
scheduled_not_started,
"Sync did not start after MAX_SCHEDULE_DELAY",
)

# STARTED, DID NOT FINISH
# 1) Started at least once before
# 2) Running for more than MAX_RUN_LENGTH
# 3) Not finish after it started (or ever)
# 4) Not failed after it started (or ever)
# 5) Not scheduled after started
#
# | MAX_RUN_LENGTH |
# |-------------------------------|---//-------------?----------------?
# Started NOW Success / Failure Scheduled
#
started_not_finished = sync_managers.filter(
Q(last_finished_dt__isnull=True)
| Q(last_finished_dt__lt=F("last_started_dt")),
Q(last_failed_dt__isnull=True) | Q(last_failed_dt__lt=F("last_started_dt")),
last_started_dt__isnull=False,
last_started_dt__lt=timezone.now() - cls.MAX_RUN_LENGTH,
last_scheduled_dt__lt=F("last_started_dt"),
).values("sync_id")

reschedule(
started_not_finished,
"Sync did not finish after MAX_RUN_LENGTH",
)

# STARTED, FAILED, NOT PERMANENTLY
# 1) Started at least once before
# 2) Failed recently
# 3) Not permanent failure
# 4) Failed more than FAIL_RESCHEDULE_DELAY ago
# 5) Was not scheduled after last failure
#
# | FAIL_RESCHEDULE_DELAY |
# |-----------|------------------------------|---//----------?
# Started Fail (not permanent?) NOW Scheduled
#
started_failed_not_permanently = sync_managers.filter(
last_started_dt__isnull=False,
last_consecutive_failures__gt=0,
permanently_failed=False,
last_failed_dt__lt=timezone.now() - cls.FAIL_RESCHEDULE_DELAY,
last_scheduled_dt__lt=F("last_failed_dt"),
).values("sync_id", "last_consecutive_failures")

reschedule(
started_failed_not_permanently,
msg_fn=lambda sm: f"Failed {sm['last_consecutive_failures']} times",
)

@classmethod
def is_in_progress(cls, sync_id):
Expand Down
Loading