[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921
[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921Samrat002 wants to merge 3 commits intoapache:masterfrom
Conversation
…nt after all resources are ready
|
@1996fanrui PTAL whenever time. |
pnowojski
left a comment
There was a problem hiding this comment.
Thanks for the contribution. I've left a couple of comments, however I don't have context to review whether this is properly integrated with AdatpiveScheduler and DefaultStateTransitionManager. Would be great for someone else to take a look as well.
| public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED = | ||
| key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled") | ||
| .booleanType() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to true.
There was a problem hiding this comment.
Earlier, I chose a defensive approach. There is no compelling reason to keep it false.
Updated default value to true
| waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); | ||
| final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; | ||
| LOG.info( | ||
| "Waiting for {} slot(s) to become available after scale down.", | ||
| expectedFreeSlotCount); | ||
| waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); |
There was a problem hiding this comment.
If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right?
I think you should make sure that the timeout in waitForRunningTasks waitForAvailableSlots (or CI timeout 4h) is longer than env.enableCheckpointing(Duration.ofHours(1).toMillis());. So either, decrease the timeout in waiting to < 30 minutes, or increase checkpointing interval to 24h (CI will be killed after 4h AFAIR).
There was a problem hiding this comment.
Updated checkpointing to 24 hours. also added assertions for min-pause
| + "rather than waiting for the next periodic checkpoint. " | ||
| + "This reduces rescaling latency, especially when checkpoint intervals are large. " | ||
| + "The active trigger respects %s and will not trigger if a checkpoint is already in progress.", | ||
| text("execution.checkpointing.min-pause")) |
There was a problem hiding this comment.
Might probably be:
code(CheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.key()))
| resourceStabilizationTimeout, | ||
| firstChangeEventTimestamp, | ||
| maxTriggerDelay)); | ||
| transitionContext.requestActiveCheckpointTrigger(); |
There was a problem hiding this comment.
Why this call is needed here? ISn't it enough to call it in org.apache.flink.runtime.scheduler.adaptive.DefaultStateTransitionManager.Stabilizing#onChange ?
|
|
||
| private void progressToStabilized(Temporal firstChangeEventTimestamp) { | ||
| progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay)); | ||
| transitionContext.requestActiveCheckpointTrigger(); |
There was a problem hiding this comment.
Now it looks the method is called on many places. Wondering if we could control when it is called only in Phases. So moving this to Stabilized phase?
| "When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, " | ||
| + "rather than waiting for the next periodic checkpoint. " | ||
| + "This reduces rescaling latency, especially when checkpoint intervals are large. " | ||
| + "The active trigger respects %s and will not trigger if a checkpoint is already in progress.", |
There was a problem hiding this comment.
Does it really respect the min-pause? What I see it only respects that it doesn't trigger a new checkpoint when another is in a progress.
There was a problem hiding this comment.
Ahh check for minpause was missing . PTAL
|
|
||
| /** | ||
| * Requests the context to actively trigger a checkpoint to expedite rescaling. Called when | ||
| * the {@link DefaultStateTransitionManager} enters a phase that is ready to accept {@link |
There was a problem hiding this comment.
Is it true? I see that the method is called on more places: entering Stabilizing, entering Stabilized, and on each onChange event while in Stabilizing
There was a problem hiding this comment.
i have fixed the problem now. PTAL at the revised version
| * <li>No checkpoint must be currently in progress or being triggered | ||
| * </ul> | ||
| */ | ||
| private void triggerCheckpointForRescale() { |
There was a problem hiding this comment.
We need to cover all possible paths by tests.
|
@flinkbot run azure |
|
@ztison @pnowojski PTAL . i have addressed to review comments added Unit tests , made the IT more robust and ensured minpause is respected |
Thanks for incorporating our improvements. I was on a vacation the last few days so I haven't responded. I am back, I will check the PR today or tomorrow. |
ztison
left a comment
There was a problem hiding this comment.
I see some issues with retry logic.
| * satisfy the configured {@code minPauseBetweenCheckpoints}. This can be used by callers that | ||
| * trigger non-periodic checkpoints but still wish to respect the min-pause constraint. | ||
| */ | ||
| public boolean isMinPauseBetweenCheckpointsSatisfied() { |
| return; | ||
| } | ||
|
|
||
| if (!checkpointCoordinator.isMinPauseBetweenCheckpointsSatisfied()) { |
There was a problem hiding this comment.
If you return a remaining time to the next checkpoint instead of boolean, then you can directly use it in following call of context.runIfState( this, this::requestActiveCheckpointTrigger, remainingTimeToSatisfyMinPause) and you can get rid of hardcoded ACTIVE_CHECKPOINT_RETRY_DELAY .
| .warn( | ||
| "Active checkpoint trigger for rescale failed, scheduling retry.", | ||
| throwable); | ||
| context.runIfState( |
There was a problem hiding this comment.
Do we really want to introduce this endless cycle? I feel we should just log it and give it up. If the checkpoint is failing there probably is different issue with job and we shouldn't try it again and again without any retry cap.
| getLogger() | ||
| .debug( | ||
| "Skipping active checkpoint trigger for rescale: min pause between checkpoints not satisfied, scheduling retry."); | ||
| context.runIfState( |
There was a problem hiding this comment.
If we get e.g. 10 onChange events then we will schedule this method 10 times. We should have some kind of deduplication.
What is the purpose of the change
FLIP-461 introduced checkpoint-synchronized rescaling where the Adaptive Scheduler waits for a checkpoint to complete before rescaling. However, it passively waits for the next periodic checkpoint, which can delay rescaling significantly when checkpoint intervals are large (e.g., 10 minutes).
This PR makes the Adaptive Scheduler actively trigger a checkpoint when resources change and rescaling is desired. The trigger fires at the right time. ie, when the
DefaultStateTransitionManagerenters the Stabilizing or Stabilized phase (i.e., when the resource gate is open and the scheduler is waiting for the checkpoint gate). The feature is controlled by a new configuration optionjobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled(default: false).The feature respects
execution.checkpointing.min-pause, skips if a checkpoint is already in progress, and only fires when parallelism has actually changed.Brief change log
Verifying this change
This change added tests and can be verified as follows:
RescaleOnCheckpointITCase#testRescaleWithActiveCheckpointTriggerthat starts a job with checkpointing interval of 1 hour, maxTriggerDelay set to infinity, and no manual triggerCheckpoint() call.Does this pull request potentially affect one of the following parts:
Documentation