Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
go.etcd.io/etcd/tests/v3 v3.6.8
golang.org/x/crypto v0.49.0
golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa
golang.org/x/sys v0.42.0
golang.org/x/sys v0.43.0
golang.org/x/term v0.41.0
google.golang.org/grpc v1.79.3
google.golang.org/protobuf v1.36.11
Expand Down Expand Up @@ -115,6 +115,8 @@ require (
github.com/spf13/pflag v1.0.10 // indirect
github.com/tarantool/go-iproto v1.1.0 // indirect
github.com/tarantool/go-openssl v1.2.2 // indirect
github.com/tarantool/go-option v1.0.0 // indirect
github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 // indirect
github.com/tarantool/go-tlsdialer v1.0.2 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/gojuno/minimock/v3 v3.4.7 h1:vhE5zpniyPDRT0DXd5s3DbtZJVlcbmC5k80izYtj9lY=
github.com/gojuno/minimock/v3 v3.4.7/go.mod h1:QxJk4mdPrVyYUmEZGc2yD2NONpqM/j4dWhsy9twjFHg=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -288,8 +290,12 @@ github.com/tarantool/go-iproto v1.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7
github.com/tarantool/go-openssl v0.0.8-0.20230307065445-720eeb389195/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
github.com/tarantool/go-openssl v1.2.2 h1:GaGMNsa68ZqoNgrMF7Ke4s9+tXGfc5ulfCUm3/Jb7/k=
github.com/tarantool/go-openssl v1.2.2/go.mod h1:EwX1pKIGypLxkY49vKIIR4LTT+94DhKiunCqU2gEzLQ=
github.com/tarantool/go-option v1.0.0 h1:+Etw0i3TjsXvADTo5rfZNCfsXe3BfHOs+iVfIrl0Nlo=
github.com/tarantool/go-option v1.0.0/go.mod h1:lXzzeZtL+rPUtLOCDP6ny3FemFBjruG9aHKzNN2bS08=
github.com/tarantool/go-prompt v1.0.1 h1:88Yer6gCFylqGRrdWwikNFVbklRQsqKF7mycvGdDcj0=
github.com/tarantool/go-prompt v1.0.1/go.mod h1:9Vuvi60Bk+3yaXqgYaXNTpLbwPPaaEOeaUgpFW1jqTU=
github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8 h1:seeJOwJeWBtnouVHgAptoj+5qpHoMZV/Xv4m/pGjWJg=
github.com/tarantool/go-storage v1.1.3-0.20260421111102-abe1dd9a9ca8/go.mod h1:lM/UPkuzeggynwtmIHD5OCqdz5H2RHsXX6HaOzYBCzk=
github.com/tarantool/go-tarantool v1.12.3 h1:GXabowmrTSW225xFEjX4t+8PlccVDCeGB5OM1VLbBXE=
github.com/tarantool/go-tarantool v1.12.3/go.mod h1:QRiXv0jnxwgxHtr9ZmifSr/eRba76gTUBgp69pDMX1U=
github.com/tarantool/go-tarantool/v2 v2.4.2 h1:rkzYtFhLJLA9RDIhjzN93MJBN5PBxHW4+soq+RB90gE=
Expand Down Expand Up @@ -445,8 +451,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
Expand Down
10 changes: 10 additions & 0 deletions lib/cluster/errors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package cluster

import (
"errors"
"fmt"
)

var (
errDataMissing = errors.New("data does not exist")
errWrongRevision = errors.New("wrong revision")

errFetchData = errors.New("failed to fetch data")
errPutData = errors.New("failed to put data")
errPublishData = errors.New("failed to publish data")
)

// NotExistError error type for non-existing path.
type NotExistError struct {
path []string
Expand Down
253 changes: 253 additions & 0 deletions lib/cluster/etcd_gs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package cluster

import (
"context"
"fmt"
"time"

"github.com/tarantool/go-storage"
"github.com/tarantool/go-storage/operation"
"github.com/tarantool/go-storage/predicate"
)

// GSEtcdAllCollector collects data from etcd via go-storage for a whole prefix.
type GSEtcdAllCollector struct {
store storage.Storage
prefix string
timeout time.Duration
}

// NewGSEtcdAllCollector creates a new go-storage-based collector for etcd from the whole prefix.
func NewGSEtcdAllCollector(
store storage.Storage,
prefix string,
timeout time.Duration,
) GSEtcdAllCollector {
return GSEtcdAllCollector{
store: store,
prefix: prefix,
timeout: timeout,
}
}

// Collect collects a configuration from the specified prefix with the specified timeout.
func (collector GSEtcdAllCollector) Collect() ([]Data, error) {
prefix := getConfigPrefix(collector.prefix)
ctx := context.Background()
if collector.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, collector.timeout)
defer cancel()
}

kvs, err := collector.store.Range(ctx, storage.WithPrefix(prefix))
if err != nil {
return nil, fmt.Errorf("%w from etcd: %w", errFetchData, err)
}

if len(kvs) == 0 {
return nil, CollectEmptyError{"etcd", prefix}
}

data := make([]Data, 0, len(kvs))
for _, kv := range kvs {
data = append(data, Data{
Source: string(kv.Key),
Value: kv.Value,
Revision: kv.ModRevision,
})
}

return data, nil
}

// GSEtcdKeyCollector collects data from etcd via go-storage for a specific key.
type GSEtcdKeyCollector struct {
store storage.Storage
prefix string
key string
timeout time.Duration
}

// NewGSEtcdKeyCollector creates a new go-storage-based collector for etcd from a key from a prefix.
func NewGSEtcdKeyCollector(
store storage.Storage,
prefix, key string,
timeout time.Duration,
) GSEtcdKeyCollector {
return GSEtcdKeyCollector{
store: store,
prefix: prefix,
key: key,
timeout: timeout,
}
}

// Collect collects a configuration from the specified path with the specified timeout.
func (collector GSEtcdKeyCollector) Collect() ([]Data, error) {
key := getConfigKey(collector.prefix, collector.key)
ctx := context.Background()
if collector.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, collector.timeout)
defer cancel()
}

resp, err := collector.store.Tx(ctx).Then(operation.Get([]byte(key))).Commit()
if err != nil {
return nil, fmt.Errorf("%w from etcd: %w", errFetchData, err)
}

data := make([]Data, 0, len(resp.Results))
for _, result := range resp.Results {
for _, kv := range result.Values {
data = append(data, Data{
Source: string(kv.Key),
Value: kv.Value,
Revision: kv.ModRevision,
})
}
}

switch len(data) {
case 0:
return nil, fmt.Errorf("a configuration data not found in etcd for key %q", key)
case 1:
return []Data{data[0]}, nil
default:
return nil, fmt.Errorf("too many responses (%v) from etcd for key %q", data, key)
}
}

// GSEtcdAllDataPublisher publishes data into etcd via go-storage to a prefix.
type GSEtcdAllDataPublisher struct {
store storage.Storage
prefix string
timeout time.Duration
}

// NewGSEtcdAllDataPublisher creates a new go-storage-based etcd publisher to a prefix.
func NewGSEtcdAllDataPublisher(
store storage.Storage,
prefix string,
timeout time.Duration,
) GSEtcdAllDataPublisher {
return GSEtcdAllDataPublisher{
store: store,
prefix: prefix,
timeout: timeout,
}
}

// Publish publishes the configuration into etcd to the given prefix.
func (publisher GSEtcdAllDataPublisher) Publish(revision int64, data []byte) error {
if revision != 0 {
return fmt.Errorf("%w into etcd: target revision %d is not supported",
errPublishData, revision)
}
if data == nil {
return fmt.Errorf("%w into etcd: %w", errPublishData, errDataMissing)
}

prefix := getConfigPrefix(publisher.prefix)
key := prefix + "all"
ctx := context.Background()
if publisher.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, publisher.timeout)
defer cancel()
}

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

kvs, err := publisher.store.Range(ctx, storage.WithPrefix(prefix))
if err != nil {
return fmt.Errorf("%w from etcd: %w", errFetchData, err)
}

predicates := make([]predicate.Predicate, 0, len(kvs))
ops := make([]operation.Operation, 0, len(kvs)+1)
for _, kv := range kvs {
if string(kv.Key) == key {
continue
}

predicates = append(predicates, predicate.VersionEqual(kv.Key, kv.ModRevision))
ops = append(ops, operation.Delete(kv.Key))
}
ops = append(ops, operation.Put([]byte(key), data))

txn := publisher.store.Tx(ctx)
if len(predicates) > 0 {
txn = txn.If(predicates...)
}
resp, err := txn.Then(ops...).Commit()
if err != nil {
return fmt.Errorf("%w into etcd: %w", errPutData, err)
}
if resp.Succeeded {
return nil
}
}
}

// GSEtcdKeyDataPublisher publishes data into etcd via go-storage for a prefix and key.
type GSEtcdKeyDataPublisher struct {
store storage.Storage
prefix string
key string
timeout time.Duration
}

// NewGSEtcdKeyDataPublisher creates a new go-storage-based etcd publisher for a prefix and key.
func NewGSEtcdKeyDataPublisher(
store storage.Storage,
prefix, key string,
timeout time.Duration,
) GSEtcdKeyDataPublisher {
return GSEtcdKeyDataPublisher{
store: store,
prefix: prefix,
key: key,
timeout: timeout,
}
}

// Publish publishes the configuration into etcd to the given prefix and key.
func (publisher GSEtcdKeyDataPublisher) Publish(revision int64, data []byte) error {
if data == nil {
return fmt.Errorf("%w into etcd: %w", errPublishData, errDataMissing)
}

key := getConfigKey(publisher.prefix, publisher.key)
ctx := context.Background()
if publisher.timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, publisher.timeout)
defer cancel()
}

var predicates []predicate.Predicate
if revision != 0 {
predicates = append(predicates, predicate.VersionEqual([]byte(key), revision))
}

txn := publisher.store.Tx(ctx)
if predicates != nil {
txn = txn.If(predicates...)
}
resp, err := txn.Then(operation.Put([]byte(key), data)).Commit()
if err != nil {
return fmt.Errorf("%w into etcd: %w", errPutData, err)
}
if !resp.Succeeded {
return fmt.Errorf("%w into etcd: %w", errPutData, errWrongRevision)
}

return nil
}
Loading
Loading