Skip to content

feat: resilient Watch RPC with reconnect and resourceVersion tracking#7188

Open
AdilFayyaz wants to merge 1 commit intoadil/apps-app-servicefrom
adil/apps-watch-reconnect
Open

feat: resilient Watch RPC with reconnect and resourceVersion tracking#7188
AdilFayyaz wants to merge 1 commit intoadil/apps-app-servicefrom
adil/apps-watch-reconnect

Conversation

@AdilFayyaz
Copy link
Copy Markdown

@AdilFayyaz AdilFayyaz commented Apr 9, 2026

Tracking issue

Depends on: #7176, #7175, #7166

Why are the changes needed?

K8s watches time out every ~5 minutes by default. The previous implementation closed the client's stream silently on disconnect with no reconnect, making Watch unreliable for long-lived connections.

What changes were proposed in this pull request?

  • Replaced the single-shot watch goroutine in AppK8sClient.Watch() with a reconnect loop (watchLoop + drainWatcher) that transparently reopens the K8s watch on unexpected closes or Error events
  • Added resourceVersion tracking — extracted from every Added/Modified/Deleted/Bookmark event and passed to the next watch call, ensuring no events are missed or replayed across reconnects
  • Added exponential backoff (1s → 2s → 4s → 30s max) between reconnect attempts; backoff resets on any successful event or Bookmark
  • K8s Error events are now logged with code/reason/message instead of being silently dropped

How was this patch tested?

  • go test ./app/internal/k8s/... -run TestWatch — 6 new tests covering:
    channel close reconnect, Error event reconnect, Bookmark RV propagation to next watch call, exponential backoff timing, ctx cancel stops the goroutine, initial watch error surfaces synchronously
  • go test ./app/... — full suite passes with no regressions

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com>
Comment on lines +239 to +245
if resourceVersion != "" {
opts = append(opts, &client.ListOptions{
Raw: &metav1.ListOptions{
ResourceVersion: resourceVersion,
AllowWatchBookmarks: true,
},
})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we still need to bookmark at first call?

Comment on lines +276 to +283
delay := state.nextBackoff()
logger.Warnf(ctx, "KService watch in namespace %s closed unexpectedly (attempt %d); reconnecting in %v",
ns, state.consecutiveErrors, delay)

select {
case <-ctx.Done():
return
case <-time.After(delay):
Copy link
Copy Markdown
Member

@pingsutw pingsutw Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need backoff for Kubernetes watch timeouts? After the app service has been running for 1 hour, it always waits for 30 seconds (max(30, 2^60/5)) every 5 minutes. I think we only need backoff for other errors

return true
}

c.updateResourceVersion(event, state)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the resource version after we successfully send the response? so the drainWatcher will process the same item in the next loop if it failed to send the event in the previous loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

added Merged changes that add new functionality flyte2

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants