Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 155 additions & 19 deletions app/internal/k8s/app_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -171,51 +173,185 @@ func (c *AppK8sClient) Delete(ctx context.Context, appID *flyteapp.Identifier) e
return nil
}

// watchBackoff controls the reconnect timing for Watch. Declared as vars so
// tests can override them without sleeping.
var (
watchBackoffInitial = 1 * time.Second
watchBackoffMax = 30 * time.Second
watchBackoffFactor = 2.0
)

// watchState holds mutable reconnect state for a single Watch call.
// It is goroutine-local — no mutex needed.
type watchState struct {
// lastResourceVersion is the most recent RV seen from any event or Bookmark.
// Passed to openWatch on reconnect so K8s resumes from exactly where we left off.
lastResourceVersion string
backoff time.Duration
consecutiveErrors int
}

func (s *watchState) nextBackoff() time.Duration {
d := s.backoff
if d == 0 {
d = watchBackoffInitial
}
s.backoff = time.Duration(math.Min(float64(d)*watchBackoffFactor, float64(watchBackoffMax)))
return d
}

func (s *watchState) resetBackoff() {
s.backoff = watchBackoffInitial
s.consecutiveErrors = 0
}

// Watch returns a channel of WatchResponse events for KServices in the given
// project/domain scope. If appName is non-empty, only events for that specific
// app are returned. The channel is closed when ctx is cancelled or the
// underlying watch terminates.
// app are returned. The channel is closed only when ctx is cancelled.
//
// The goroutine reconnects transparently when the underlying K8s watch closes
// unexpectedly, tracking resourceVersion to resume without gaps or replays.
func (c *AppK8sClient) Watch(ctx context.Context, project, domain, appName string) (<-chan *flyteapp.WatchResponse, error) {
ns := appNamespace(project, domain)

labels := map[string]string{labelAppManaged: "true"}
if appName != "" {
labels[labelAppName] = strings.ToLower(appName)
}

watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{},
// Open the first watcher eagerly so initial errors (RBAC, missing CRD) are
// returned synchronously before spawning the goroutine.
watcher, err := c.openWatch(ctx, ns, labels, "")
if err != nil {
return nil, err
}

ch := make(chan *flyteapp.WatchResponse, 64)
go c.watchLoop(ctx, ns, labels, watcher, ch)
return ch, nil
}

// openWatch starts a K8s watch from resourceVersion (empty = watch from now).
func (c *AppK8sClient) openWatch(ctx context.Context, ns string, labels map[string]string, resourceVersion string) (k8swatch.Interface, error) {
opts := []client.ListOption{
client.InNamespace(ns),
client.MatchingLabels(labels),
)
}
if resourceVersion != "" {
opts = append(opts, &client.ListOptions{
Raw: &metav1.ListOptions{
ResourceVersion: resourceVersion,
AllowWatchBookmarks: true,
},
})
Comment on lines +239 to +245
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we still need to bookmark at first call?

}
watcher, err := c.k8sClient.Watch(ctx, &servingv1.ServiceList{}, opts...)
if err != nil {
return nil, fmt.Errorf("failed to start KService watch in namespace %s: %w", ns, err)
}
return watcher, nil
}

ch := make(chan *flyteapp.WatchResponse, 64)
go func() {
defer close(ch)
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return
case event, ok := <-watcher.ResultChan():
if !ok {
return
// watchLoop is the reconnect loop. It drains watcher until it closes, then
// reopens with exponential backoff. Closes ch only when ctx is cancelled.
func (c *AppK8sClient) watchLoop(
ctx context.Context,
ns string,
labels map[string]string,
watcher k8swatch.Interface,
ch chan<- *flyteapp.WatchResponse,
) {
defer close(ch)
defer watcher.Stop()

state := &watchState{backoff: watchBackoffInitial}

for {
reconnect := c.drainWatcher(ctx, watcher, ch, state)
if !reconnect {
return // ctx cancelled
}

watcher.Stop()
state.consecutiveErrors++
delay := state.nextBackoff()
logger.Warnf(ctx, "KService watch in namespace %s closed unexpectedly (attempt %d); reconnecting in %v",
ns, state.consecutiveErrors, delay)

select {
case <-ctx.Done():
return
case <-time.After(delay):
Comment on lines +276 to +283
Copy link
Copy Markdown
Member

@pingsutw pingsutw Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need backoff for Kubernetes watch timeouts? After the app service has been running for 1 hour, it always waits for 30 seconds (max(30, 2^60/5)) every 5 minutes. I think we only need backoff for other errors

}

newWatcher, err := c.openWatch(ctx, ns, labels, state.lastResourceVersion)
if err != nil {
logger.Errorf(ctx, "Failed to reopen KService watch in namespace %s: %v", ns, err)
// Use an immediately-closed watcher so the loop retries with further backoff.
watcher = k8swatch.NewEmptyWatch()
continue
}
watcher = newWatcher
}
}

// drainWatcher processes events from watcher until the channel closes or ctx is done.
// Returns true if reconnect is needed, false if ctx was cancelled.
func (c *AppK8sClient) drainWatcher(
ctx context.Context,
watcher k8swatch.Interface,
ch chan<- *flyteapp.WatchResponse,
state *watchState,
) bool {
for {
select {
case <-ctx.Done():
return false
case event, ok := <-watcher.ResultChan():
if !ok {
return true
}

c.updateResourceVersion(event, state)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the resource version after we successfully send the response? so the drainWatcher will process the same item in the next loop if it failed to send the event in the previous loop.


switch event.Type {
case k8swatch.Error:
if status, ok := event.Object.(*metav1.Status); ok {
logger.Warnf(ctx, "KService watch received error event (code=%d reason=%s): %s; will reconnect",
status.Code, status.Reason, status.Message)
} else {
logger.Warnf(ctx, "KService watch received error event (type %T); will reconnect", event.Object)
}
return true
case k8swatch.Bookmark:
// resourceVersion already updated — nothing to forward.
state.resetBackoff()
default:
resp := c.kserviceEventToWatchResponse(ctx, event)
if resp == nil {
continue
}
state.resetBackoff()
select {
case ch <- resp:
case <-ctx.Done():
return
return false
}
}
}
}()
return ch, nil
}
}

// updateResourceVersion extracts and stores the latest resourceVersion from a watch event.
// Called before event type dispatch so both normal events and Bookmarks checkpoint the position.
func (c *AppK8sClient) updateResourceVersion(event k8swatch.Event, state *watchState) {
switch event.Type {
case k8swatch.Added, k8swatch.Modified, k8swatch.Deleted, k8swatch.Bookmark:
if ksvc, ok := event.Object.(*servingv1.Service); ok {
if rv := ksvc.GetResourceVersion(); rv != "" {
state.lastResourceVersion = rv
}
}
}
}

// kserviceEventToWatchResponse maps a K8s watch event to a flyteapp.WatchResponse.
Expand Down
Loading
Loading