From dcffb50ba1738180bc034e31418cdfa160298383 Mon Sep 17 00:00:00 2001 From: sssciel Date: Mon, 27 Apr 2026 13:10:19 +0300 Subject: [PATCH] cluster: add DataCollector/Publisher go-storage implementation With tt's move to external libraries, including go-storage, type implementations were added on top of go-storage drivers. Closes TNTP-7385 --- go.mod | 4 +- go.sum | 10 +- lib/cluster/errors.go | 10 + lib/cluster/etcd_gs.go | 253 +++++++++ lib/cluster/etcd_gs_test.go | 824 ++++++++++++++++++++++++++++ lib/cluster/go.mod | 58 +- lib/cluster/go.sum | 22 +- lib/cluster/tarantool_gs.go | 223 ++++++++ lib/cluster/tarantool_gs_test.go | 87 +++ test/integration/aeon/server/go.mod | 2 +- test/integration/aeon/server/go.sum | 4 +- 11 files changed, 1455 insertions(+), 42 deletions(-) create mode 100644 lib/cluster/etcd_gs.go create mode 100644 lib/cluster/etcd_gs_test.go create mode 100644 lib/cluster/tarantool_gs.go create mode 100644 lib/cluster/tarantool_gs_test.go diff --git a/go.mod b/go.mod index b42240f7b..e1495cbb2 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( go.etcd.io/etcd/tests/v3 v3.6.8 golang.org/x/crypto v0.49.0 golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa - golang.org/x/sys v0.42.0 + golang.org/x/sys v0.43.0 golang.org/x/term v0.41.0 google.golang.org/grpc v1.79.3 google.golang.org/protobuf v1.36.11 @@ -115,6 +115,8 @@ require ( github.com/spf13/pflag v1.0.10 // indirect github.com/tarantool/go-iproto v1.1.0 // indirect github.com/tarantool/go-openssl v1.2.2 // indirect + github.com/tarantool/go-option v1.0.0 // indirect + github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 // indirect github.com/tarantool/go-tlsdialer v1.0.2 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 72b71a5ef..21098e2f4 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY= +github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -288,8 +290,12 @@ github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7 github.com/tarantool/go-openssl v0.0.8-0.20230307065445-720eeb389195/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A= github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k= github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo= +github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08= github.com/tarantool/go-prompt v1.0.1 h1:88Yer6gCFylqGRrdWwikNFVbklRQsqKF7mycvGdDcj0= github.com/tarantool/go-prompt v1.0.1/go.mod h1:9Vuvi60Bk+3yaXqgYaXNTpLbwPPaaEOeaUgpFW1jqTU= +github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 h1:seeJOwJeWBtnouVHgAptoj+5qpHoMZV/Xv4m/pGjWJg= +github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8/go.mod h1:lM/UPkuzeggynwtmIHD5OCqdz5H2RHsXX6HaOzYBCzk= github.com/tarantool/go-tarantool v1.12.3 h1:GXabowmrTSW225xFEjX4t+8PlccVDCeGB5OM1VLbBXE= github.com/tarantool/go-tarantool v1.12.3/go.mod h1:QRiXv0jnxwgxHtr9ZmifSr/eRba76gTUBgp69pDMX1U= github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE= @@ -445,8 +451,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= diff --git a/lib/cluster/errors.go b/lib/cluster/errors.go index e1972b2bc..a372b847f 100644 --- a/lib/cluster/errors.go +++ b/lib/cluster/errors.go @@ -1,9 +1,19 @@ package cluster import ( + "errors" "fmt" ) +var ( + errDataMissing = errors.New("data does not exist") + errWrongRevision = errors.New("wrong revision") + + errFetchData = errors.New("failed to fetch data") + errPutData = errors.New("failed to put data") + errPublishData = errors.New("failed to publish data") +) + // NotExistError error type for non-existing path. type NotExistError struct { path []string diff --git a/lib/cluster/etcd_gs.go b/lib/cluster/etcd_gs.go new file mode 100644 index 000000000..7da4471af --- /dev/null +++ b/lib/cluster/etcd_gs.go @@ -0,0 +1,253 @@ +package cluster + +import ( + "context" + "fmt" + "time" + + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" +) + +// GSEtcdAllCollector collects data from etcd via go-storage for a whole prefix. +type GSEtcdAllCollector struct { + store storage.Storage + prefix string + timeout time.Duration +} + +// NewGSEtcdAllCollector creates a new go-storage-based collector for etcd from the whole prefix. +func NewGSEtcdAllCollector( + store storage.Storage, + prefix string, + timeout time.Duration, +) GSEtcdAllCollector { + return GSEtcdAllCollector{ + store: store, + prefix: prefix, + timeout: timeout, + } +} + +// Collect collects a configuration from the specified prefix with the specified timeout. +func (collector GSEtcdAllCollector) Collect() ([]Data, error) { + prefix := getConfigPrefix(collector.prefix) + ctx := context.Background() + if collector.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, collector.timeout) + defer cancel() + } + + kvs, err := collector.store.Range(ctx, storage.WithPrefix(prefix)) + if err != nil { + return nil, fmt.Errorf("%w from etcd: %w", errFetchData, err) + } + + if len(kvs) == 0 { + return nil, CollectEmptyError{"etcd", prefix} + } + + data := make([]Data, 0, len(kvs)) + for _, kv := range kvs { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + + return data, nil +} + +// GSEtcdKeyCollector collects data from etcd via go-storage for a specific key. +type GSEtcdKeyCollector struct { + store storage.Storage + prefix string + key string + timeout time.Duration +} + +// NewGSEtcdKeyCollector creates a new go-storage-based collector for etcd from a key from a prefix. +func NewGSEtcdKeyCollector( + store storage.Storage, + prefix, key string, + timeout time.Duration, +) GSEtcdKeyCollector { + return GSEtcdKeyCollector{ + store: store, + prefix: prefix, + key: key, + timeout: timeout, + } +} + +// Collect collects a configuration from the specified path with the specified timeout. +func (collector GSEtcdKeyCollector) Collect() ([]Data, error) { + key := getConfigKey(collector.prefix, collector.key) + ctx := context.Background() + if collector.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, collector.timeout) + defer cancel() + } + + resp, err := collector.store.Tx(ctx).Then(operation.Get([]byte(key))).Commit() + if err != nil { + return nil, fmt.Errorf("%w from etcd: %w", errFetchData, err) + } + + data := make([]Data, 0, len(resp.Results)) + for _, result := range resp.Results { + for _, kv := range result.Values { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + } + + switch len(data) { + case 0: + return nil, fmt.Errorf("a configuration data not found in etcd for key %q", key) + case 1: + return []Data{data[0]}, nil + default: + return nil, fmt.Errorf("too many responses (%v) from etcd for key %q", data, key) + } +} + +// GSEtcdAllDataPublisher publishes data into etcd via go-storage to a prefix. +type GSEtcdAllDataPublisher struct { + store storage.Storage + prefix string + timeout time.Duration +} + +// NewGSEtcdAllDataPublisher creates a new go-storage-based etcd publisher to a prefix. +func NewGSEtcdAllDataPublisher( + store storage.Storage, + prefix string, + timeout time.Duration, +) GSEtcdAllDataPublisher { + return GSEtcdAllDataPublisher{ + store: store, + prefix: prefix, + timeout: timeout, + } +} + +// Publish publishes the configuration into etcd to the given prefix. +func (publisher GSEtcdAllDataPublisher) Publish(revision int64, data []byte) error { + if revision != 0 { + return fmt.Errorf("%w into etcd: target revision %d is not supported", + errPublishData, revision) + } + if data == nil { + return fmt.Errorf("%w into etcd: %w", errPublishData, errDataMissing) + } + + prefix := getConfigPrefix(publisher.prefix) + key := prefix + "all" + ctx := context.Background() + if publisher.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, publisher.timeout) + defer cancel() + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + kvs, err := publisher.store.Range(ctx, storage.WithPrefix(prefix)) + if err != nil { + return fmt.Errorf("%w from etcd: %w", errFetchData, err) + } + + predicates := make([]predicate.Predicate, 0, len(kvs)) + ops := make([]operation.Operation, 0, len(kvs)+1) + for _, kv := range kvs { + if string(kv.Key) == key { + continue + } + + predicates = append(predicates, predicate.VersionEqual(kv.Key, kv.ModRevision)) + ops = append(ops, operation.Delete(kv.Key)) + } + ops = append(ops, operation.Put([]byte(key), data)) + + txn := publisher.store.Tx(ctx) + if len(predicates) > 0 { + txn = txn.If(predicates...) + } + resp, err := txn.Then(ops...).Commit() + if err != nil { + return fmt.Errorf("%w into etcd: %w", errPutData, err) + } + if resp.Succeeded { + return nil + } + } +} + +// GSEtcdKeyDataPublisher publishes data into etcd via go-storage for a prefix and key. +type GSEtcdKeyDataPublisher struct { + store storage.Storage + prefix string + key string + timeout time.Duration +} + +// NewGSEtcdKeyDataPublisher creates a new go-storage-based etcd publisher for a prefix and key. +func NewGSEtcdKeyDataPublisher( + store storage.Storage, + prefix, key string, + timeout time.Duration, +) GSEtcdKeyDataPublisher { + return GSEtcdKeyDataPublisher{ + store: store, + prefix: prefix, + key: key, + timeout: timeout, + } +} + +// Publish publishes the configuration into etcd to the given prefix and key. +func (publisher GSEtcdKeyDataPublisher) Publish(revision int64, data []byte) error { + if data == nil { + return fmt.Errorf("%w into etcd: %w", errPublishData, errDataMissing) + } + + key := getConfigKey(publisher.prefix, publisher.key) + ctx := context.Background() + if publisher.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, publisher.timeout) + defer cancel() + } + + var predicates []predicate.Predicate + if revision != 0 { + predicates = append(predicates, predicate.VersionEqual([]byte(key), revision)) + } + + txn := publisher.store.Tx(ctx) + if predicates != nil { + txn = txn.If(predicates...) + } + resp, err := txn.Then(operation.Put([]byte(key), data)).Commit() + if err != nil { + return fmt.Errorf("%w into etcd: %w", errPutData, err) + } + if !resp.Succeeded { + return fmt.Errorf("%w into etcd: %w", errPutData, errWrongRevision) + } + + return nil +} diff --git a/lib/cluster/etcd_gs_test.go b/lib/cluster/etcd_gs_test.go new file mode 100644 index 000000000..af91f4323 --- /dev/null +++ b/lib/cluster/etcd_gs_test.go @@ -0,0 +1,824 @@ +package cluster_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/kv" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" + "github.com/tarantool/go-storage/tx" + "github.com/tarantool/go-storage/watch" + + "github.com/tarantool/tt/lib/cluster" + "github.com/tarantool/tt/lib/connect" +) + +type MockGSDriver struct { + Ctxs []context.Context + PredicatesCalls [][]predicate.Predicate + ThenOpsCalls [][]operation.Operation + ElseOpsCalls [][]operation.Operation + Responses []tx.Response + Errors []error + ExecuteFunc func( + ctx context.Context, + predicates []predicate.Predicate, + thenOps []operation.Operation, + elseOps []operation.Operation, + ) (tx.Response, error) +} + +func (d *MockGSDriver) Execute( + ctx context.Context, + predicates []predicate.Predicate, + thenOps []operation.Operation, + elseOps []operation.Operation, +) (tx.Response, error) { + d.Ctxs = append(d.Ctxs, ctx) + d.PredicatesCalls = append(d.PredicatesCalls, predicates) + d.ThenOpsCalls = append(d.ThenOpsCalls, thenOps) + d.ElseOpsCalls = append(d.ElseOpsCalls, elseOps) + + if d.ExecuteFunc != nil { + return d.ExecuteFunc(ctx, predicates, thenOps, elseOps) + } + + callID := len(d.Ctxs) - 1 + resp := tx.Response{Succeeded: true} + if callID < len(d.Responses) { + resp = d.Responses[callID] + } + + var err error + if callID < len(d.Errors) { + err = d.Errors[callID] + } + + return resp, err +} + +func (d *MockGSDriver) Tx(ctx context.Context) tx.Tx { + return storage.NewStorage(mockGSDriverAdapter{driver: d}).Tx(ctx) +} + +func (d *MockGSDriver) Range( + ctx context.Context, + opts ...storage.RangeOption, +) ([]kv.KeyValue, error) { + return storage.NewStorage(mockGSDriverAdapter{driver: d}).Range(ctx, opts...) +} + +func (d *MockGSDriver) Watch( + context.Context, + []byte, + ...watch.Option, +) <-chan watch.Event { + ch := make(chan watch.Event) + close(ch) + + return ch +} + +type mockGSDriverAdapter struct { + driver *MockGSDriver +} + +func (a mockGSDriverAdapter) Execute( + ctx context.Context, + predicates []predicate.Predicate, + thenOps []operation.Operation, + elseOps []operation.Operation, +) (tx.Response, error) { + return a.driver.Execute(ctx, predicates, thenOps, elseOps) +} + +func (a mockGSDriverAdapter) Watch( + context.Context, + []byte, + ...watch.Option, +) (<-chan watch.Event, func(), error) { + return nil, func() {}, nil +} + +func newGetResponse(values []kv.KeyValue) tx.Response { + return tx.Response{ + Succeeded: true, + Results: []tx.RequestResponse{ + {Values: values}, + }, + } +} + +func assertDeadline(t *testing.T, ctx context.Context, timeout time.Duration) { + t.Helper() + + expected := time.Now().Add(timeout) + deadline, ok := ctx.Deadline() + if timeout == 0 { + assert.False(t, ok) + return + } + + assert.True(t, ok) + assert.InDelta(t, expected.Unix(), deadline.Unix(), 1) +} + +func TestNewGSEtcdAllCollector(t *testing.T) { + var collector cluster.DataCollector + + collector = cluster.NewGSEtcdAllCollector(&MockGSDriver{}, "", 0) + + assert.NotNil(t, collector) +} + +func TestGSEtcdAllCollector_Collect_driver_inputs(t *testing.T) { + cases := []struct { + Prefix string + Key string + }{ + {"", "/config/"}, + {"////", "/config/"}, + {"foo", "foo/config/"}, + {"/foo/bar", "/foo/bar/config/"}, + {"/foo/bar////", "/foo/bar/config/"}, + } + for _, tc := range cases { + t.Run(tc.Prefix, func(t *testing.T) { + mock := &MockGSDriver{ + Errors: []error{fmt.Errorf("any")}, + } + cluster.NewGSEtcdAllCollector(mock, tc.Prefix, 0).Collect() + + require.Len(t, mock.Ctxs, 1) + require.Len(t, mock.ThenOpsCalls, 1) + require.Empty(t, mock.PredicatesCalls[0]) + require.Empty(t, mock.ElseOpsCalls[0]) + require.Len(t, mock.ThenOpsCalls[0], 1) + + op := mock.ThenOpsCalls[0][0] + assert.Equal(t, operation.TypeGet, op.Type()) + assert.Equal(t, []byte(tc.Key), op.Key()) + assert.True(t, op.IsPrefix()) + }) + } +} + +func TestGSEtcdCollectors_Collect_timeout(t *testing.T) { + cases := []time.Duration{0, 60 * time.Second} + + for _, tc := range cases { + collectors := []struct { + Name string + Collector cluster.DataCollector + Mock *MockGSDriver + }{ + { + Name: "all", + Collector: cluster.NewGSEtcdAllCollector(&MockGSDriver{Errors: []error{fmt.Errorf("any")}}, "/foo", tc), + Mock: &MockGSDriver{Errors: []error{fmt.Errorf("any")}}, + }, + { + Name: "key", + Collector: cluster.NewGSEtcdKeyCollector(&MockGSDriver{Errors: []error{fmt.Errorf("any")}}, "/foo", "key", tc), + Mock: &MockGSDriver{Errors: []error{fmt.Errorf("any")}}, + }, + } + for i := range collectors { + collectors[i].Collector = map[string]cluster.DataCollector{ + "all": cluster.NewGSEtcdAllCollector(collectors[i].Mock, "/foo", tc), + "key": cluster.NewGSEtcdKeyCollector(collectors[i].Mock, "/foo", "key", tc), + }[collectors[i].Name] + } + for _, c := range collectors { + t.Run(c.Name+fmt.Sprint(tc), func(t *testing.T) { + c.Collector.Collect() + + require.Len(t, c.Mock.Ctxs, 1) + assertDeadline(t, c.Mock.Ctxs[0], tc) + }) + } + } +} + +func TestGSEtcdAllCollector_Collect_merge(t *testing.T) { + cases := []struct { + Kvs []kv.KeyValue + Expected []cluster.Data + }{ + { + Kvs: []kv.KeyValue{ + { + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, + }, + }, + Expected: []cluster.Data{{ + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, + }}, + }, + { + Kvs: []kv.KeyValue{ + { + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, + }, + { + Key: []byte("k"), + Value: []byte("f: b\nb: b\nc: b\n"), + ModRevision: 2, + }, + }, + Expected: []cluster.Data{ + { + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, + }, + { + Source: "k", + Value: []byte("f: b\nb: b\nc: b\n"), + Revision: 2, + }, + }, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + mock := &MockGSDriver{ + Responses: []tx.Response{newGetResponse(tc.Kvs)}, + } + config, err := cluster.NewGSEtcdAllCollector(mock, "foo", 0).Collect() + + assert.NoError(t, err) + require.NotNil(t, config) + assert.Equal(t, tc.Expected, config) + }) + } +} + +func TestGSEtcdCollectors_Collect_error(t *testing.T) { + cases := []struct { + Name string + Collector cluster.DataCollector + }{ + {"all", cluster.NewGSEtcdAllCollector(&MockGSDriver{Errors: []error{fmt.Errorf("any")}}, "/foo", 0)}, + {"key", cluster.NewGSEtcdKeyCollector(&MockGSDriver{Errors: []error{fmt.Errorf("any")}}, "/foo", "key", 0)}, + } + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + config, err := tc.Collector.Collect() + + assert.ErrorContains(t, err, "failed to fetch data from etcd:") + assert.ErrorContains(t, err, "any") + assert.Nil(t, config) + }) + } +} + +func TestGSEtcdCollectors_Collect_empty(t *testing.T) { + mock := &MockGSDriver{ + Responses: []tx.Response{newGetResponse(nil)}, + } + cases := []struct { + Name string + Collector cluster.DataCollector + }{ + {"all", cluster.NewGSEtcdAllCollector(mock, "/foo", 0)}, + {"key", cluster.NewGSEtcdKeyCollector(mock, "/foo", "key", 0)}, + } + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + config, err := tc.Collector.Collect() + assert.Error(t, err) + assert.Nil(t, config) + }) + } +} + +func TestNewGSEtcdKeyCollector(t *testing.T) { + var collector cluster.DataCollector + + collector = cluster.NewGSEtcdKeyCollector(&MockGSDriver{}, "", "", 0) + + assert.NotNil(t, collector) +} + +func TestGSEtcdKeyCollector_Collect_driver_inputs(t *testing.T) { + cases := []struct { + Prefix string + Key string + Expected string + IsPrefix bool + }{ + {"", "", "/config/", true}, + {"////", "//", "/config///", true}, + {"foo", "foo", "foo/config/foo", false}, + {"/foo/bar", "/foo", "/foo/bar/config//foo", false}, + {"/foo/bar////", "//foo//", "/foo/bar/config///foo//", true}, + } + for _, tc := range cases { + t.Run(tc.Expected, func(t *testing.T) { + mock := &MockGSDriver{ + Errors: []error{fmt.Errorf("any")}, + } + cluster.NewGSEtcdKeyCollector(mock, tc.Prefix, tc.Key, 0).Collect() + + require.Len(t, mock.Ctxs, 1) + require.Len(t, mock.ThenOpsCalls, 1) + require.Empty(t, mock.PredicatesCalls[0]) + require.Empty(t, mock.ElseOpsCalls[0]) + require.Len(t, mock.ThenOpsCalls[0], 1) + + op := mock.ThenOpsCalls[0][0] + assert.Equal(t, operation.TypeGet, op.Type()) + assert.Equal(t, []byte(tc.Expected), op.Key()) + assert.Equal(t, tc.IsPrefix, op.IsPrefix()) + }) + } +} + +func TestGSEtcdKeyCollector_Collect_key(t *testing.T) { + mock := &MockGSDriver{ + Responses: []tx.Response{newGetResponse([]kv.KeyValue{ + { + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, + }, + })}, + } + expected := []cluster.Data{{ + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, + }} + + config, err := cluster.NewGSEtcdKeyCollector(mock, "foo", "key", 0).Collect() + + assert.NoError(t, err) + require.NotNil(t, config) + assert.Equal(t, expected, config) +} + +func TestGSEtcdKeyCollector_Collect_too_many(t *testing.T) { + mock := &MockGSDriver{ + Responses: []tx.Response{newGetResponse([]kv.KeyValue{ + { + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, + }, + { + Key: []byte("k"), + Value: []byte("f: b\nb: b\nc: b\n"), + ModRevision: 2, + }, + })}, + } + + config, err := cluster.NewGSEtcdKeyCollector(mock, "foo", "key", 0).Collect() + + assert.ErrorContains(t, err, "too many responses") + require.Nil(t, config) +} + +func TestNewGSEtcdAllDataPublisher(t *testing.T) { + var publisher cluster.DataPublisher + + publisher = cluster.NewGSEtcdAllDataPublisher(nil, "", 0) + + assert.NotNil(t, publisher) +} + +func TestGSEtcdAllDataPublisher_Publish_get_inputs(t *testing.T) { + cases := []struct { + Prefix string + Key string + }{ + {"", "/config/"}, + {"////", "/config/"}, + {"foo", "foo/config/"}, + {"/foo/bar", "/foo/bar/config/"}, + {"/foo/bar////", "/foo/bar/config/"}, + } + data := []byte("foo bar") + + for _, tc := range cases { + t.Run(tc.Prefix, func(t *testing.T) { + mock := &MockGSDriver{} + cluster.NewGSEtcdAllDataPublisher(mock, tc.Prefix, 0).Publish(0, data) + + require.Len(t, mock.Ctxs, 2) + require.Len(t, mock.ThenOpsCalls, 2) + require.Len(t, mock.ThenOpsCalls[0], 1) + assert.Equal(t, operation.TypeGet, mock.ThenOpsCalls[0][0].Type()) + assert.Equal(t, []byte(tc.Key), mock.ThenOpsCalls[0][0].Key()) + assert.True(t, mock.ThenOpsCalls[0][0].IsPrefix()) + }) + } +} + +func TestGSEtcdAllDataPublisher_Publish_txn_inputs(t *testing.T) { + cases := []struct { + Name string + Mock *MockGSDriver + IfLen int + ThenLen int + }{ + { + Name: "no get keys", + Mock: &MockGSDriver{}, + IfLen: 0, + ThenLen: 1, + }, + { + Name: "get keys", + Mock: &MockGSDriver{ + Responses: []tx.Response{newGetResponse([]kv.KeyValue{ + {Key: []byte("foo"), ModRevision: 1}, + {Key: []byte("bar"), ModRevision: 2}, + {Key: []byte("baz"), ModRevision: 3}, + })}, + }, + IfLen: 3, + ThenLen: 4, + }, + { + Name: "get keys with target", + Mock: &MockGSDriver{ + Responses: []tx.Response{newGetResponse([]kv.KeyValue{ + {Key: []byte("foo"), ModRevision: 1}, + {Key: []byte("bar"), ModRevision: 2}, + {Key: []byte("/foo/config/all"), ModRevision: 3}, + })}, + }, + IfLen: 2, + ThenLen: 3, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + publisher := cluster.NewGSEtcdAllDataPublisher(tc.Mock, "/foo", 0) + err := publisher.Publish(0, []byte{}) + + require.NoError(t, err) + require.Len(t, tc.Mock.PredicatesCalls, 2) + require.Len(t, tc.Mock.ThenOpsCalls, 2) + assert.Len(t, tc.Mock.PredicatesCalls[1], tc.IfLen) + assert.Len(t, tc.Mock.ThenOpsCalls[1], tc.ThenLen) + assert.Len(t, tc.Mock.ElseOpsCalls[1], 0) + + for i, op := range tc.Mock.ThenOpsCalls[1] { + if i == len(tc.Mock.ThenOpsCalls[1])-1 { + assert.Equal(t, operation.TypePut, op.Type()) + assert.Equal(t, []byte("/foo/config/all"), op.Key()) + } else { + assert.Equal(t, operation.TypeDelete, op.Type()) + } + } + }) + } +} + +func TestGSEtcdDataPublishers_Publish_data_nil(t *testing.T) { + cases := []struct { + Name string + Publisher cluster.DataPublisher + }{ + {"all", cluster.NewGSEtcdAllDataPublisher(nil, "", 0)}, + {"key", cluster.NewGSEtcdKeyDataPublisher(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + err := tc.Publisher.Publish(0, nil) + + assert.EqualError(t, err, + "failed to publish data into etcd: data does not exist") + }) + } +} + +func TestGSEtcdDataPublishers_Publish_publisher_nil(t *testing.T) { + cases := []struct { + Name string + Publisher cluster.DataPublisher + }{ + {"all", cluster.NewGSEtcdAllDataPublisher(nil, "", 0)}, + {"key", cluster.NewGSEtcdKeyDataPublisher(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + assert.Panics(t, func() { + tc.Publisher.Publish(0, []byte{}) + }) + }) + } +} + +func TestGSEtcdAllDataPublisher_Publish_errors(t *testing.T) { + cases := []struct { + Name string + Mock storage.Storage + Expected string + }{ + { + Name: "no error", + Mock: &MockGSDriver{}, + Expected: "", + }, + { + Name: "get error", + Mock: &MockGSDriver{ + Errors: []error{fmt.Errorf("get")}, + }, + Expected: "failed to fetch data from etcd: failed to execute ops: get", + }, + { + Name: "execute error", + Mock: &MockGSDriver{ + ExecuteFunc: func( + _ context.Context, + _ []predicate.Predicate, + thenOps []operation.Operation, + _ []operation.Operation, + ) (tx.Response, error) { + if len(thenOps) == 1 && thenOps[0].Type() == operation.TypeGet { + return newGetResponse(nil), nil + } + + return tx.Response{}, fmt.Errorf("execute") + }, + }, + Expected: "failed to put data into etcd: tx execute failed: execute", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + publisher := cluster.NewGSEtcdAllDataPublisher(tc.Mock, "prefix", 0) + err := publisher.Publish(0, []byte{}) + if tc.Expected != "" { + assert.EqualError(t, err, tc.Expected) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGSEtcdAllDataPublisher_Publish_revision(t *testing.T) { + mock := &MockGSDriver{} + publisher := cluster.NewGSEtcdAllDataPublisher(mock, "prefix", 0) + err := publisher.Publish(1, []byte{}) + assert.EqualError(t, err, + "failed to publish data into etcd: target revision 1 is not supported") +} + +func TestGSEtcdAllDataPublisher_Publish_timeout(t *testing.T) { + cases := []time.Duration{0, 60 * time.Second} + + for _, tc := range cases { + t.Run(fmt.Sprint(tc), func(t *testing.T) { + mock := &MockGSDriver{} + publisher := cluster.NewGSEtcdAllDataPublisher(mock, "prefix", tc) + err := publisher.Publish(0, []byte{}) + + require.NoError(t, err) + require.Len(t, mock.Ctxs, 2) + assert.Equal(t, mock.Ctxs[0], mock.Ctxs[1]) + assertDeadline(t, mock.Ctxs[0], tc) + assertDeadline(t, mock.Ctxs[1], tc) + }) + } +} + +func TestGSEtcdAllDataPublisher_Publish_timeout_exit(t *testing.T) { + mock := &MockGSDriver{ + ExecuteFunc: func( + _ context.Context, + _ []predicate.Predicate, + thenOps []operation.Operation, + _ []operation.Operation, + ) (tx.Response, error) { + if len(thenOps) == 1 && thenOps[0].Type() == operation.TypeGet { + return newGetResponse(nil), nil + } + + return tx.Response{Succeeded: false}, nil + }, + } + + before := time.Now() + timeout := 100 * time.Millisecond + delta := 30 * time.Millisecond + publisher := cluster.NewGSEtcdAllDataPublisher(mock, "prefix", timeout) + err := publisher.Publish(0, []byte{}) + assert.EqualError(t, err, "context deadline exceeded") + assert.InDelta(t, timeout, time.Since(before), float64(delta)) +} + +func TestNewGSEtcdKeyDataPublisher(t *testing.T) { + var publisher cluster.DataPublisher + + publisher = cluster.NewGSEtcdKeyDataPublisher(nil, "", "", 0) + + assert.NotNil(t, publisher) +} + +func TestGSEtcdKeyDataPublisher_Publish_inputs(t *testing.T) { + cases := []struct { + Prefix string + Key string + Expected string + }{ + {"", "foo", "/config/foo"}, + {"////", "foo", "/config/foo"}, + {"foo", "foo", "foo/config/foo"}, + {"/foo/bar", "foo", "/foo/bar/config/foo"}, + {"/foo/bar////", "foo", "/foo/bar/config/foo"}, + {"/foo/bar////", "", "/foo/bar/config/"}, + {"/foo/bar////", "//foo//", "/foo/bar/config///foo//"}, + } + data := []byte("foo bar") + + for _, tc := range cases { + t.Run(tc.Expected, func(t *testing.T) { + mock := &MockGSDriver{} + publisher := cluster.NewGSEtcdKeyDataPublisher(mock, tc.Prefix, tc.Key, 0) + err := publisher.Publish(0, data) + + require.NoError(t, err) + require.Len(t, mock.Ctxs, 1) + require.Len(t, mock.ThenOpsCalls, 1) + assert.Equal(t, + []operation.Operation{operation.Put([]byte(tc.Expected), data)}, + mock.ThenOpsCalls[0]) + assert.Nil(t, mock.PredicatesCalls[0]) + assert.Nil(t, mock.ElseOpsCalls[0]) + }) + } +} + +func TestGSEtcdKeyDataPublisher_Publish_modRevision(t *testing.T) { + prefix := "/foo" + key := "key" + modRevision := int64(5) + data := []byte("foo bar") + expected := "/foo/config/key" + mock := &MockGSDriver{} + publisher := cluster.NewGSEtcdKeyDataPublisher(mock, prefix, key, 0) + + err := publisher.Publish(modRevision, data) + require.NoError(t, err) + require.Len(t, mock.Ctxs, 1) + assert.Equal(t, + []operation.Operation{operation.Put([]byte(expected), data)}, + mock.ThenOpsCalls[0]) + require.Len(t, mock.PredicatesCalls[0], 1) + assert.Equal(t, []byte(expected), mock.PredicatesCalls[0][0].Key()) + assert.Equal(t, predicate.OpEqual, mock.PredicatesCalls[0][0].Operation()) + assert.Equal(t, predicate.TargetVersion, mock.PredicatesCalls[0][0].Target()) + assert.Equal(t, modRevision, mock.PredicatesCalls[0][0].Value()) + assert.Nil(t, mock.ElseOpsCalls[0]) +} + +func TestGSEtcdKeyDataPublisher_Publish_error(t *testing.T) { + mock := &MockGSDriver{ + Errors: []error{fmt.Errorf("foo")}, + } + publisher := cluster.NewGSEtcdKeyDataPublisher(mock, "", "", 0) + err := publisher.Publish(0, []byte{}) + + assert.EqualError(t, err, "failed to put data into etcd: tx execute failed: foo") +} + +func TestGSEtcdKeyDataPublisher_Publish_wrong_revision(t *testing.T) { + mock := &MockGSDriver{ + Responses: []tx.Response{{Succeeded: false}}, + } + publisher := cluster.NewGSEtcdKeyDataPublisher(mock, "", "", 0) + err := publisher.Publish(1, []byte{}) + + assert.EqualError(t, err, "failed to put data into etcd: wrong revision") +} + +func TestGSEtcdKeyDataPublisher_Publish_timeout(t *testing.T) { + cases := []time.Duration{0, 60 * time.Second} + mock := &MockGSDriver{ + Errors: []error{fmt.Errorf("foo")}, + } + + for _, tc := range cases { + t.Run(fmt.Sprint(tc), func(t *testing.T) { + mock.Ctxs = nil + mock.PredicatesCalls = nil + mock.ThenOpsCalls = nil + mock.ElseOpsCalls = nil + + publisher := cluster.NewGSEtcdKeyDataPublisher(mock, "", "", tc) + publisher.Publish(0, []byte{}) + + require.Len(t, mock.Ctxs, 1) + assertDeadline(t, mock.Ctxs[0], tc) + }) + } +} + +func TestGSMakeEtcdOptsFromUriOpts(t *testing.T) { + cases := []struct { + Name string + UriOpts connect.UriOpts + Expected cluster.EtcdOpts + }{ + { + Name: "empty", + UriOpts: connect.UriOpts{}, + Expected: cluster.EtcdOpts{}, + }, + { + Name: "ignored", + UriOpts: connect.UriOpts{ + Host: "foo", + Prefix: "foo", + Params: map[string]string{ + "key": "bar", + "name": "zoo", + }, + Ciphers: "foo:bar:ciphers", + }, + Expected: cluster.EtcdOpts{}, + }, + { + Name: "skip_host_verify", + UriOpts: connect.UriOpts{ + SkipHostVerify: true, + }, + Expected: cluster.EtcdOpts{ + SkipHostVerify: true, + }, + }, + { + Name: "skip_peer_verify", + UriOpts: connect.UriOpts{ + SkipPeerVerify: true, + }, + Expected: cluster.EtcdOpts{ + SkipHostVerify: true, + }, + }, + { + Name: "full", + UriOpts: connect.UriOpts{ + Endpoint: "foo", + Host: "host", + Prefix: "prefix", + Params: map[string]string{ + "key": "key", + "name": "instance", + }, + Username: "username", + Password: "password", + KeyFile: "key_file", + CertFile: "cert_file", + CaPath: "ca_path", + CaFile: "ca_file", + SkipHostVerify: true, + SkipPeerVerify: true, + Timeout: 2 * time.Second, + }, + Expected: cluster.EtcdOpts{ + Endpoints: []string{"foo"}, + Username: "username", + Password: "password", + KeyFile: "key_file", + CertFile: "cert_file", + CaPath: "ca_path", + CaFile: "ca_file", + SkipHostVerify: true, + Timeout: 2 * time.Second, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + etcdOpts := cluster.MakeEtcdOptsFromUriOpts(tc.UriOpts) + + assert.Equal(t, tc.Expected, etcdOpts) + }) + } +} diff --git a/lib/cluster/go.mod b/lib/cluster/go.mod index 7387d8025..1def7e4b2 100644 --- a/lib/cluster/go.mod +++ b/lib/cluster/go.mod @@ -5,17 +5,29 @@ go 1.25.7 require ( github.com/mitchellh/mapstructure v1.5.0 github.com/stretchr/testify v1.11.1 + github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 github.com/tarantool/go-tarantool/v2 v2.4.2 github.com/tarantool/tt/lib/connect v0.0.0-0 github.com/tarantool/tt/lib/dial v0.0.0-0 go.etcd.io/etcd/api/v3 v3.6.8 go.etcd.io/etcd/client/pkg/v3 v3.6.8 go.etcd.io/etcd/client/v3 v3.6.8 + go.etcd.io/etcd/tests/v3 v3.6.5 go.uber.org/zap v1.27.1 google.golang.org/grpc v1.79.1 gopkg.in/yaml.v3 v3.0.1 ) +require ( + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect + github.com/mattn/go-pointer v0.0.1 // indirect + github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect + github.com/tarantool/go-iproto v1.1.0 // indirect + github.com/tarantool/go-openssl v1.2.1 // indirect + github.com/tarantool/tt v1.3.1 // indirect + golang.org/x/term v0.40.0 // indirect +) + require ( github.com/VividCortex/ewma v1.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -23,12 +35,17 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cheggaaa/pb/v3 v3.1.6 // indirect + github.com/coreos/go-semver v0.3.1 // indirect + github.com/coreos/go-systemd/v22 v22.7.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fatih/color v1.18.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/uuid v1.6.0 // indirect @@ -36,16 +53,15 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jonboulle/clockwork v0.5.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-pointer v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect @@ -53,18 +69,18 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/soheilhy/cmux v0.1.5 // indirect - github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect - github.com/tarantool/go-iproto v1.1.0 // indirect - github.com/tarantool/go-openssl v1.2.1 // indirect - github.com/tarantool/tt v1.3.1 // indirect + github.com/tarantool/go-option v1.0.0 // indirect + github.com/tarantool/go-tlsdialer v1.0.2 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect + github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.4.3 // indirect - go.etcd.io/etcd/etcdctl/v3 v3.6.8 // indirect - go.etcd.io/etcd/pkg/v3 v3.6.8 // indirect - go.etcd.io/etcd/server/v3 v3.6.8 // indirect + go.etcd.io/etcd/etcdctl/v3 v3.6.5 // indirect + go.etcd.io/etcd/pkg/v3 v3.6.5 // indirect + go.etcd.io/etcd/server/v3 v3.6.5 // indirect go.etcd.io/gofail v0.2.0 // indirect go.etcd.io/raft/v3 v3.6.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect @@ -76,32 +92,18 @@ require ( go.opentelemetry.io/otel/sdk v1.39.0 // indirect go.opentelemetry.io/otel/trace v1.39.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect - golang.org/x/crypto v0.48.0 // indirect - golang.org/x/term v0.40.0 // indirect - golang.org/x/time v0.9.0 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect -) - -require ( - github.com/coreos/go-semver v0.3.1 // indirect - github.com/coreos/go-systemd/v22 v22.7.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/tarantool/go-tlsdialer v1.0.2 // indirect - github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - go.etcd.io/etcd/tests/v3 v3.6.8 go.uber.org/multierr v1.11.0 // indirect + golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.50.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect + golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260223185530-2f722ef697dc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260223185530-2f722ef697dc // indirect google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) replace github.com/tarantool/tt/lib/connect => ../connect diff --git a/lib/cluster/go.sum b/lib/cluster/go.sum index 759bc0e8d..c830c8a28 100644 --- a/lib/cluster/go.sum +++ b/lib/cluster/go.sum @@ -43,6 +43,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY= +github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg= github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -151,6 +153,10 @@ github.com/tarantool/go-iproto v1.1.0 h1:HULVOIHsiehI+FnHfM7wMDntuzUddO09DKqu2Wn github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo= github.com/tarantool/go-openssl v1.2.1 h1:WUVTeEPuBAXbrBjvJZ3ynzk/Sv4DL47V/ehWea9czjA= github.com/tarantool/go-openssl v1.2.1/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ= +github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo= +github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08= +github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 h1:seeJOwJeWBtnouVHgAptoj+5qpHoMZV/Xv4m/pGjWJg= +github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8/go.mod h1:lM/UPkuzeggynwtmIHD5OCqdz5H2RHsXX6HaOzYBCzk= github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE= github.com/tarantool/go-tarantool/v2 v2.4.2/go.mod h1:MTbhdjFc3Jl63Lgi/UJr5D+QbT+QegqOzsNJGmaw7VM= github.com/tarantool/go-tlsdialer v1.0.2 h1:TiOkihvC2ufLbOqJcJLuQ9I7W5bsZtmnT7KHF/t8n4s= @@ -175,14 +181,14 @@ go.etcd.io/etcd/client/pkg/v3 v3.6.8 h1:Qs/5C0LNFiqXxYf2GU8MVjYUEXJ6sZaYOz0zEqQg go.etcd.io/etcd/client/pkg/v3 v3.6.8/go.mod h1:GsiTRUZE2318PggZkAo6sWb6l8JLVrnckTNfbG8PWtw= go.etcd.io/etcd/client/v3 v3.6.8 h1:B3G76t1UykqAOrbio7s/EPatixQDkQBevN8/mwiplrY= go.etcd.io/etcd/client/v3 v3.6.8/go.mod h1:MVG4BpSIuumPi+ELF7wYtySETmoTWBHVcDoHdVupwt8= -go.etcd.io/etcd/etcdctl/v3 v3.6.8 h1:iO3zX9xuY/X2Y+fnt6mUR7o/18iBGYpd/lhs/4FqHCc= -go.etcd.io/etcd/etcdctl/v3 v3.6.8/go.mod h1:8X8SvxOc5kPQ0e+jbSx3RgKzTNQ3O8rBuQEoDKuQFX0= -go.etcd.io/etcd/pkg/v3 v3.6.8 h1:Xe+LIL974spy8b4nEx3H0KMr1ofq3r0kh6FbU3aw4es= -go.etcd.io/etcd/pkg/v3 v3.6.8/go.mod h1:TRibVNe+FqJIe1abOAA1PsuQ4wqO87ZaOoprg09Tn8c= -go.etcd.io/etcd/server/v3 v3.6.8 h1:U2strdSEy1U8qcSzRIdkYpvOPtBy/9i/IfaaCI9flZ4= -go.etcd.io/etcd/server/v3 v3.6.8/go.mod h1:88dCtwUnSirkUoJbflQxxWXqtBSZa6lSG0Kuej+dois= -go.etcd.io/etcd/tests/v3 v3.6.8 h1:FUiUGXUhLkU1WZMebKWdwtM7KtzAGuUSQEUV8Sq2+QA= -go.etcd.io/etcd/tests/v3 v3.6.8/go.mod h1:U1ioDy7TXzz2UXhSQfbJ3++PsryNwiniHtdbXZPprX0= +go.etcd.io/etcd/etcdctl/v3 v3.6.5 h1:p1uG0/xYNpOogy6cR1rqvZoQhcQseh552DVAAbICVR8= +go.etcd.io/etcd/etcdctl/v3 v3.6.5/go.mod h1:h3m9lUyEsevVvIyphoDBS3z0VL+c6zVJXwsL4vZ9g+E= +go.etcd.io/etcd/pkg/v3 v3.6.5 h1:byxWB4AqIKI4SBmquZUG1WGtvMfMaorXFoCcFbVeoxM= +go.etcd.io/etcd/pkg/v3 v3.6.5/go.mod h1:uqrXrzmMIJDEy5j00bCqhVLzR5jEJIwDp5wTlLwPGOU= +go.etcd.io/etcd/server/v3 v3.6.5 h1:4RbUb1Bd4y1WkBHmuF+cZII83JNQMuNXzyjwigQ06y0= +go.etcd.io/etcd/server/v3 v3.6.5/go.mod h1:PLuhyVXz8WWRhzXDsl3A3zv/+aK9e4A9lpQkqawIaH0= +go.etcd.io/etcd/tests/v3 v3.6.5 h1:4GcDq8+3FJfEEml52jqevuBbhZgkybbfT/JZtBFMHmA= +go.etcd.io/etcd/tests/v3 v3.6.5/go.mod h1:AMKnnWNuEYZWUNvHv8e+JlcQTDNtg7TTG2I8d15CT5I= go.etcd.io/gofail v0.2.0 h1:p19drv16FKK345a09a1iubchlw/vmRuksmRzgBIGjcA= go.etcd.io/gofail v0.2.0/go.mod h1:nL3ILMGfkXTekKI3clMBNazKnjUZjYLKmBHzsVAnC1o= go.etcd.io/raft/v3 v3.6.0 h1:5NtvbDVYpnfZWcIHgGRk9DyzkBIXOi8j+DDp1IcnUWQ= diff --git a/lib/cluster/tarantool_gs.go b/lib/cluster/tarantool_gs.go new file mode 100644 index 000000000..50e92f891 --- /dev/null +++ b/lib/cluster/tarantool_gs.go @@ -0,0 +1,223 @@ +package cluster + +import ( + "context" + "fmt" + "time" + + "github.com/tarantool/go-storage" + "github.com/tarantool/go-storage/operation" + "github.com/tarantool/go-storage/predicate" +) + +// GSTarantoolAllCollector collects data from tarantool via go-storage for a whole prefix. +type GSTarantoolAllCollector struct { + store storage.Storage + prefix string + timeout time.Duration +} + +// NewGSTarantoolAllCollector creates a new go-storage-based collector for tarantool from the whole prefix. +func NewGSTarantoolAllCollector( + store storage.Storage, + prefix string, + timeout time.Duration, +) GSTarantoolAllCollector { + return GSTarantoolAllCollector{ + store: store, + prefix: prefix, + timeout: timeout, + } +} + +// Collect collects a configuration from the specified prefix with the specified timeout. +func (collector GSTarantoolAllCollector) Collect() ([]Data, error) { + prefix := getConfigPrefix(collector.prefix) + ctx := context.Background() + if collector.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, collector.timeout) + defer cancel() + } + + kvs, err := collector.store.Range(ctx, storage.WithPrefix(prefix)) + if err != nil { + return nil, fmt.Errorf("%w from tarantool: %w", errFetchData, err) + } + + if len(kvs) == 0 { + return nil, CollectEmptyError{"tarantool", prefix} + } + + data := make([]Data, 0, len(kvs)) + for _, kv := range kvs { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + + return data, nil +} + +// GSTarantoolKeyCollector collects data from tarantool via go-storage for a specific key. +type GSTarantoolKeyCollector struct { + store storage.Storage + prefix string + key string + timeout time.Duration +} + +// NewGSTarantoolKeyCollector creates a new go-storage-based collector for tarantool from a key from a prefix. +func NewGSTarantoolKeyCollector( + store storage.Storage, + prefix, key string, + timeout time.Duration, +) GSTarantoolKeyCollector { + return GSTarantoolKeyCollector{ + store: store, + prefix: prefix, + key: key, + timeout: timeout, + } +} + +// Collect collects a configuration from the specified path with the specified timeout. +func (collector GSTarantoolKeyCollector) Collect() ([]Data, error) { + key := getConfigKey(collector.prefix, collector.key) + ctx := context.Background() + if collector.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, collector.timeout) + defer cancel() + } + + resp, err := collector.store.Tx(ctx).Then(operation.Get([]byte(key))).Commit() + if err != nil { + return nil, fmt.Errorf("%w from tarantool: %w", errFetchData, err) + } + + data := make([]Data, 0, len(resp.Results)) + for _, result := range resp.Results { + for _, kv := range result.Values { + data = append(data, Data{ + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, + }) + } + } + + switch len(data) { + case 0: + return nil, fmt.Errorf("a configuration data not found in tarantool for key %q", key) + case 1: + return []Data{data[0]}, nil + default: + return nil, fmt.Errorf("too many responses (%v) from tarantool for key %q", data, key) + } +} + +// GSTarantoolAllDataPublisher publishes data into tarantool via go-storage to a prefix. +type GSTarantoolAllDataPublisher struct { + store storage.Storage + prefix string + timeout time.Duration +} + +// NewGSTarantoolAllDataPublisher creates a new go-storage-based tarantool publisher to a prefix. +func NewGSTarantoolAllDataPublisher( + store storage.Storage, + prefix string, + timeout time.Duration, +) GSTarantoolAllDataPublisher { + return GSTarantoolAllDataPublisher{ + store: store, + prefix: prefix, + timeout: timeout, + } +} + +// Publish publishes the configuration into tarantool to the given prefix. +func (publisher GSTarantoolAllDataPublisher) Publish(revision int64, data []byte) error { + if revision != 0 { + return fmt.Errorf("%w into tarantool: target revision %d is not supported", + errPublishData, revision) + } + if data == nil { + return fmt.Errorf("%w into tarantool: %w", errPublishData, errDataMissing) + } + + prefix := getConfigPrefix(publisher.prefix) + key := getConfigKey(publisher.prefix, "all") + ctx := context.Background() + if publisher.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, publisher.timeout) + defer cancel() + } + + _, err := publisher.store.Tx(ctx).Then( + operation.Delete([]byte(prefix)), + operation.Put([]byte(key), data), + ).Commit() + if err != nil { + return fmt.Errorf("%w into tarantool: %w", errPutData, err) + } + + return nil +} + +// GSTarantoolKeyDataPublisher publishes data into tarantool via go-storage for a prefix and key. +type GSTarantoolKeyDataPublisher struct { + store storage.Storage + prefix string + key string + timeout time.Duration +} + +// NewGSTarantoolKeyDataPublisher creates a new go-storage-based tarantool publisher for a prefix and key. +func NewGSTarantoolKeyDataPublisher( + store storage.Storage, + prefix, key string, + timeout time.Duration, +) GSTarantoolKeyDataPublisher { + return GSTarantoolKeyDataPublisher{ + store: store, + prefix: prefix, + key: key, + timeout: timeout, + } +} + +// Publish publishes the configuration into tarantool to the given prefix and key. +func (publisher GSTarantoolKeyDataPublisher) Publish(revision int64, data []byte) error { + if data == nil { + return fmt.Errorf("%w into tarantool: %w", errPublishData, errDataMissing) + } + + key := getConfigKey(publisher.prefix, publisher.key) + ctx := context.Background() + if publisher.timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, publisher.timeout) + defer cancel() + } + + var predicates []predicate.Predicate + if revision != 0 { + predicates = append(predicates, predicate.VersionEqual([]byte(key), revision)) + } + + txn := publisher.store.Tx(ctx) + if predicates != nil { + txn = txn.If(predicates...) + } + _, err := txn.Then(operation.Put([]byte(key), data)).Commit() + if err != nil { + return fmt.Errorf("%w into tarantool: %w", errPutData, err) + } + + return nil +} diff --git a/lib/cluster/tarantool_gs_test.go b/lib/cluster/tarantool_gs_test.go new file mode 100644 index 000000000..b17bb3a5e --- /dev/null +++ b/lib/cluster/tarantool_gs_test.go @@ -0,0 +1,87 @@ +package cluster_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/tarantool/tt/lib/cluster" +) + +func TestNewGSTarantoolCollectors_Collect_nil_evaler(t *testing.T) { + cases := []struct { + Name string + Collector cluster.DataCollector + }{ + {"any", cluster.NewGSTarantoolAllCollector(nil, "", 0)}, + {"key", cluster.NewGSTarantoolKeyCollector(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + assert.Panics(t, func() { + tc.Collector.Collect() + }) + }) + } +} + +func TestNewGSTarantoolDataPublishers(t *testing.T) { + cases := []struct { + Name string + Publisher cluster.DataPublisher + }{ + {"all", cluster.NewGSTarantoolAllDataPublisher(nil, "", 0)}, + {"key", cluster.NewGSTarantoolKeyDataPublisher(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + assert.NotNil(t, tc.Publisher) + }) + } +} + +func TestAllGSTarantoolDataPublisher_Publish_revision(t *testing.T) { + publisher := cluster.NewGSTarantoolAllDataPublisher(nil, "", 0) + err := publisher.Publish(1, []byte{}) + assert.EqualError( + t, err, "failed to publish data into tarantool: target revision 1 is not supported") +} + +func TestGSTarantoolDataPublishers_Publish_data_nil(t *testing.T) { + cases := []struct { + Name string + Publisher cluster.DataPublisher + }{ + {"all", cluster.NewGSTarantoolAllDataPublisher(nil, "", 0)}, + {"key", cluster.NewGSTarantoolKeyDataPublisher(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + err := tc.Publisher.Publish(0, nil) + + assert.EqualError(t, err, + "failed to publish data into tarantool: data does not exist") + }) + } +} + +func TestNewGSTarantoolDataPublishers_Publish_nil_evaler(t *testing.T) { + cases := []struct { + Name string + Publisher cluster.DataPublisher + }{ + {"all", cluster.NewGSTarantoolAllDataPublisher(nil, "", 0)}, + {"key", cluster.NewGSTarantoolKeyDataPublisher(nil, "", "", 0)}, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + assert.Panics(t, func() { + tc.Publisher.Publish(0, []byte{}) + }) + }) + } +} diff --git a/test/integration/aeon/server/go.mod b/test/integration/aeon/server/go.mod index c91f4feef..86121a8e9 100644 --- a/test/integration/aeon/server/go.mod +++ b/test/integration/aeon/server/go.mod @@ -9,7 +9,7 @@ require ( require ( golang.org/x/net v0.52.0 // indirect - golang.org/x/sys v0.42.0 // indirect + golang.org/x/sys v0.43.0 // indirect golang.org/x/text v0.35.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260316180232-0b37fe3546d5 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/test/integration/aeon/server/go.sum b/test/integration/aeon/server/go.sum index 6116cc970..5f353eb1c 100644 --- a/test/integration/aeon/server/go.sum +++ b/test/integration/aeon/server/go.sum @@ -24,8 +24,8 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=