Skip to content
Open

cleanup #1671

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ require (
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/exaring/otelpgx v0.6.2
github.com/fsnotify/fsevents v0.1.1
github.com/fsnotify/fsnotify v1.9.0
github.com/getsentry/sentry-go v0.16.0
github.com/go-errors/errors v1.4.2
github.com/go-redis/redis/extra/redisotel/v8 v8.11.5
Expand Down Expand Up @@ -142,7 +140,6 @@ require (
k8s.io/apimachinery v0.34.1
k8s.io/client-go v0.34.1
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa
namespacelabs.dev/go-ids v0.0.0-20221124082625-9fc72ee06af7
namespacelabs.dev/integrations v0.0.9-0.20251121143544-f44c8d144644
sigs.k8s.io/yaml v1.6.0
Expand Down Expand Up @@ -215,6 +212,7 @@ require (
github.com/exponent-io/jsonpath v0.0.0-20210407135951-1de76d718b3f // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fvbommel/sortorder v1.1.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,8 @@ github.com/foxcpp/go-mockdns v1.1.0 h1:jI0rD8M0wuYAxL7r/ynTrCQQq0BVqfB99Vgk7Dlme
github.com/foxcpp/go-mockdns v1.1.0/go.mod h1:IhLeSFGed3mJIAXPH2aiRQB+kqz7oqu8ld2qVbOu7Wk=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsevents v0.1.1 h1:/125uxJvvoSDDBPen6yUZbil8J9ydKZnnl3TWWmvnkw=
github.com/fsnotify/fsevents v0.1.1/go.mod h1:+d+hS27T6k5J8CRaPLKFgwKYcpS7GwW3Ule9+SC2ZRc=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fvbommel/sortorder v1.1.0 h1:fUmoe+HLsBTctBDoaBwpQo5N+nrCp8g/BjKb/6ZQmYw=
Expand Down Expand Up @@ -1206,7 +1203,6 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -1476,8 +1472,6 @@ k8s.io/kubectl v0.33.2/go.mod h1:8rC67FB8tVTYraovAGNi/idWIK90z2CHFNMmGJZJ3KI=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y=
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa h1:jj2kjs0Hvufj40wuhMAzoZUOwrwMDFg1gHZ49RiIv9w=
namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa/go.mod h1:e8NJRaInXRRm1+KPA6EkGEzdLJAgEvVSIKiLzpP97nI=
namespacelabs.dev/go-ids v0.0.0-20221124082625-9fc72ee06af7 h1:8NlnfPlzDSJr8TYV/qarIWwhjLd1gOXf3Jme0M/oGBM=
namespacelabs.dev/go-ids v0.0.0-20221124082625-9fc72ee06af7/go.mod h1:J+Sd+ngeffnCsaO/M7zgs2bR8Klq/ZBhS0+bbnDEH2M=
namespacelabs.dev/integrations v0.0.9-0.20251121143544-f44c8d144644 h1:bAyWDRD6/5n6Pmqp7jUOPCpmFes9ve/CN9VqwBBZhrQ=
Expand Down
5 changes: 0 additions & 5 deletions internal/cli/fncobra/ns/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"namespacelabs.dev/foundation/internal/compute"
"namespacelabs.dev/foundation/internal/console"
"namespacelabs.dev/foundation/internal/environment"
"namespacelabs.dev/foundation/internal/filewatcher"
"namespacelabs.dev/foundation/internal/fnapi"
"namespacelabs.dev/foundation/internal/frontend/cuefrontend/entity"
integrationparsing "namespacelabs.dev/foundation/internal/frontend/cuefrontend/integration/api"
Expand Down Expand Up @@ -67,7 +66,6 @@ func DoMain(name string, autoUpdate bool, registerCommands func(*cobra.Command))

fncobra.PushPreParse(rootCmd, func(ctx context.Context, args []string) error {
module.WireModuleLoader()
filewatcher.SetupFileWatcher()

binary.BuildGo = golang.GoBuilder
binary.BuildLLBGen = genbinary.LLBBinary
Expand Down Expand Up @@ -198,8 +196,6 @@ func DoMain(name string, autoUpdate bool, registerCommands func(*cobra.Command))
"Slack channel to send deployment notifications to.")
rootCmd.PersistentFlags().BoolVar(&gcloud.UseHostGCloudBinary, "gcloud_use_host_binary", gcloud.UseHostGCloudBinary,
"If set to true, uses a gcloud binary that is available at the host, rather than ns's builtin.")
rootCmd.PersistentFlags().BoolVar(&filewatcher.FileWatcherUsePolling, "filewatcher_use_polling",
filewatcher.FileWatcherUsePolling, "If set to true, uses polling to observe file system events.")
rootCmd.PersistentFlags().BoolVar(&k3d.IgnoreVersionCheck, "k3d_ignore_docker_version", k3d.IgnoreVersionCheck,
"If set to true, does not validate Docker's verison.")
rootCmd.PersistentFlags().BoolVar(&kubeops.ForceApply, "kubernetes_force_apply", kubeops.ForceApply, "Whether to force-apply an Apply.")
Expand Down Expand Up @@ -237,7 +233,6 @@ func DoMain(name string, autoUpdate bool, registerCommands func(*cobra.Command))
"use_head_orchestrator",
"update_orchestrator",
"gcloud_use_host_binary",
"filewatcher_use_polling",
"k3d_ignore_docker_version",
"kubernetes_force_apply",
"slack_token",
Expand Down
53 changes: 4 additions & 49 deletions internal/compute/continuous.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,12 @@ import (
// returned as an error by any function.
var ErrDoneSinking = errors.New("done sinking")

type ObserveNote string

const (
ObserveContinuing ObserveNote = "observe.continuing"
ObserveDone ObserveNote = "observe.done"
)

type Sinkable interface {
Inputs() *In
Updated(context.Context, Resolved) error
Cleanup(context.Context) error
}

type Versioned interface {
Observe(context.Context, func(ResultWithTimestamp[any], ObserveNote)) (func(), error)
}

type continuousKey string

const _continuousKey = continuousKey("fn.compute.continuous")
Expand Down Expand Up @@ -319,11 +308,10 @@ type observable struct {
inv *sinkInvocation
computable computeInstance

mu sync.Mutex
observers []onResult
revision uint64
latest ResultWithTimestamp[any]
listenerCancel func() // The value is `Versioned`, and we're listening to new versions.
mu sync.Mutex
observers []onResult
revision uint64
latest ResultWithTimestamp[any]
}

type observableUpdate struct {
Expand Down Expand Up @@ -413,40 +401,12 @@ func (o *observable) Loop(ctx context.Context) error {

func (o *observable) newValue(ctx context.Context, latest ResultWithTimestamp[any]) {
o.mu.Lock()

if versioned, ok := latest.Value.(Versioned); ok {
if o.listenerCancel != nil {
o.listenerCancel()
o.listenerCancel = nil
}
newListener, err := versioned.Observe(ctx, o.newVersion)
if err == nil {
// XXX report errors back
o.listenerCancel = newListener
} else {
fmt.Fprintln(console.Stderr(ctx), "failed to observe changes to value",
reflect.TypeOf(versioned).String(), latest.Digest.String(), err)
}
}

broadcast := o.doUpdate(latest)
o.mu.Unlock()

broadcast()
}

func (o *observable) newVersion(result ResultWithTimestamp[any], node ObserveNote) {
o.mu.Lock()
// XXX new versions are not cached.
broadcast := o.doUpdate(result)
if node == ObserveDone {
o.listenerCancel = nil
}
o.mu.Unlock()

broadcast()
}

func (o *observable) doUpdate(result ResultWithTimestamp[any]) func() {
o.revision++
result.revision = o.revision
Expand All @@ -466,11 +426,6 @@ func (o *observable) doUpdate(result ResultWithTimestamp[any]) func() {
if len(handledObservers) != len(observers) {
o.mu.Lock()
o.observers = handledObservers
if len(o.observers) == 0 && o.listenerCancel != nil {
// No more observers, cancel the listener.
o.listenerCancel()
o.listenerCancel = nil
}
o.mu.Unlock()
}
}
Expand Down
109 changes: 0 additions & 109 deletions internal/compute/continuous_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"os"
"testing"
"time"

"namespacelabs.dev/foundation/std/tasks"
"namespacelabs.dev/foundation/std/tasks/simplelog"
Expand All @@ -30,35 +29,6 @@ func TestSink(t *testing.T) {
}
}

func TestVersionedSink(t *testing.T) {
logLevel := 0
ctx := tasks.WithSink(context.Background(), simplelog.NewSink(os.Stdout, logLevel))

ch := make(chan int, 1)
x := &versionedStream{ch: ch}

ch <- 1

if err := Do(ctx, func(ctx context.Context) error {
return Continuously(ctx, &streamSinkable{
c: &testComputable{intStream: x},
t: t,
expected: 101,
onResult: func(got int) (int, bool) {
if got == 110 {
return 0, false
}

ch <- got - 100 + 1

return got + 1, true
},
}, nil)
}); err != nil {
t.Fatal(err)
}
}

type simpleSinkable struct {
c Computable[int]

Expand All @@ -80,39 +50,6 @@ func (ts simpleSinkable) Updated(ctx context.Context, deps Resolved) error {
}
func (ts simpleSinkable) Cleanup(context.Context) error { return nil }

type streamSinkable struct {
c Computable[int]

t *testing.T
expected int
onResult func(int) (int, bool)
}

func (ts *streamSinkable) Inputs() *In { return Inputs().Computable("c", ts.c) }
func (ts *streamSinkable) Updated(ctx context.Context, deps Resolved) error {
ts.t.Logf("started Updated w/ expected=%d", ts.expected)
defer ts.t.Log("left Updated")

vint := MustGetDepValue(deps, ts.c, "c")

ts.t.Logf("got value: %v", vint)

if vint != ts.expected {
return fmt.Errorf("expected %d, got %d", ts.expected, vint)
}

expected, shouldContinue := ts.onResult(vint)
if !shouldContinue {
return ErrDoneSinking
}

ts.expected = expected
ts.t.Logf("reset expected=%d", expected)

return nil
}
func (ts *streamSinkable) Cleanup(context.Context) error { return nil }

type testComputable struct {
intStream Computable[hasInt]

Expand Down Expand Up @@ -149,49 +86,3 @@ func (tc *testStream) Compute(context.Context, Resolved) (hasInt, error) {
type intWrapper int

func (i intWrapper) Int() int { return int(i) }

type versionedStream struct {
ch chan int
DoScoped[hasInt]
}

func (tc *versionedStream) Action() *tasks.ActionEvent { return tasks.Action("teststream") }
func (tc *versionedStream) Inputs() *In { return Inputs() }
func (tc *versionedStream) Output() Output {
return Output{NonDeterministic: true}
}
func (tc *versionedStream) Compute(context.Context, Resolved) (hasInt, error) {
return testValue{ch: tc.ch, v: <-tc.ch}, nil
}

type testValue struct {
ch chan int

v int
}

var _ Versioned = testValue{}

func (av testValue) Int() int { return av.v }

func (av testValue) Observe(ctx context.Context, f func(ResultWithTimestamp[any], ObserveNote)) (func(), error) {
cancel := make(chan struct{})

go func() {
for {
select {
case <-cancel:
return
case v := <-av.ch:
var rwt ResultWithTimestamp[any]
rwt.Value = testValue{v: v}
rwt.Completed = time.Now()
f(rwt, ObserveContinuing)
}
}
}()

return func() {
close(cancel)
}, nil
}
49 changes: 0 additions & 49 deletions internal/filewatcher/filewatcher.go

This file was deleted.

Loading
Loading