Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.9
go.etcd.io/etcd/etcdutl/v3 v3.6.8
go.etcd.io/etcd/server/v3 v3.6.8
go.uber.org/atomic v1.11.0
golang.org/x/mod v0.33.0
Expand Down Expand Up @@ -172,6 +173,7 @@ require (
github.com/vladimirvivien/gexe v0.4.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
go.etcd.io/etcd/pkg/v3 v3.6.8 // indirect
go.etcd.io/raft/v3 v3.6.0 // indirect
Expand Down Expand Up @@ -259,7 +261,7 @@ require (
github.com/ulikunitz/xz v0.5.14 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
go.etcd.io/etcd/api/v3 v3.6.8
go.etcd.io/etcd/client/pkg/v3 v3.6.8 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.6.8
go.etcd.io/etcd/client/v3 v3.6.8
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.65.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.6.8 h1:Qs/5C0LNFiqXxYf2GU8MVjYUEXJ6sZaYOz0zEqQg
go.etcd.io/etcd/client/pkg/v3 v3.6.8/go.mod h1:GsiTRUZE2318PggZkAo6sWb6l8JLVrnckTNfbG8PWtw=
go.etcd.io/etcd/client/v3 v3.6.8 h1:B3G76t1UykqAOrbio7s/EPatixQDkQBevN8/mwiplrY=
go.etcd.io/etcd/client/v3 v3.6.8/go.mod h1:MVG4BpSIuumPi+ELF7wYtySETmoTWBHVcDoHdVupwt8=
go.etcd.io/etcd/etcdutl/v3 v3.6.8 h1:5YolVcLplhVwSR7IXemN7kBpx/L4qHAmyNc+iW+PL/k=
go.etcd.io/etcd/etcdutl/v3 v3.6.8/go.mod h1:HGfpMG6Sjo9S6KWeXctiYcN8LjLbbUBdAjCYb8V977w=
go.etcd.io/etcd/pkg/v3 v3.6.8 h1:Xe+LIL974spy8b4nEx3H0KMr1ofq3r0kh6FbU3aw4es=
go.etcd.io/etcd/pkg/v3 v3.6.8/go.mod h1:TRibVNe+FqJIe1abOAA1PsuQ4wqO87ZaOoprg09Tn8c=
go.etcd.io/etcd/server/v3 v3.6.8 h1:U2strdSEy1U8qcSzRIdkYpvOPtBy/9i/IfaaCI9flZ4=
Expand Down
5 changes: 5 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Client interface {
DeletePrefix(ctx context.Context, prefix string) error
Compact(ctx context.Context, revision int64) error
Close() error
SnapshotWithVersion(ctx context.Context) (*clientv3.SnapshotResponse, error)
}

type client struct {
Expand Down Expand Up @@ -291,6 +292,10 @@ func (c *client) Close() error {
return c.c.Close()
}

func (c *client) SnapshotWithVersion(ctx context.Context) (*clientv3.SnapshotResponse, error) {
return c.c.SnapshotWithVersion(ctx)
}

func nextStartKey(key []byte) string {
b := make([]byte, len(key)+1)
copy(b, key)
Expand Down
175 changes: 165 additions & 10 deletions pkg/snapshot/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"os"

vclusterconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
"github.com/loft-sh/vcluster/pkg/etcd"
Expand All @@ -16,8 +18,16 @@ import (
"k8s.io/klog/v2"
)

type SnapshotKind string

const (
RequestStoreKey = "/vcluster/snapshot/request"
UnknownSnapshotKind SnapshotKind = "Unknown"
EtcdSnapshotKind SnapshotKind = "EtcdSnapshot"
KeyValueSnapshotKind SnapshotKind = "KeyValueSnapshot"

RequestStoreKey = "/vcluster/snapshot/request"
DBStoreKey = "/vcluster/snapshot/db"
SkipKeysStoreKey = "/vcluster/snapshot/skipkeys"
)

type Client struct {
Expand Down Expand Up @@ -51,6 +61,7 @@ func (c *Client) Run(ctx context.Context, vConfig *config.VirtualClusterConfig)
if err != nil {
return fmt.Errorf("failed to create etcd client: %w", err)
}
defer etcdClient.Close()

// create store
objectStore, err := CreateStore(ctx, &c.Options)
Expand All @@ -60,9 +71,17 @@ func (c *Client) Run(ctx context.Context, vConfig *config.VirtualClusterConfig)

// write the snapshot
klog.Infof("Start writing etcd snapshot %s...", objectStore.Target())
err = c.writeSnapshot(ctx, etcdClient, objectStore)
if err != nil {
return err

if vConfig.BackingStoreType() == vclusterconfig.StoreTypeEmbeddedEtcd {
err = c.writeEtcdSnapshot(ctx, etcdClient, objectStore)
if err != nil {
return err
}
} else {
err = c.writeKeyValueSnapshot(ctx, etcdClient, objectStore)
if err != nil {
return err
}
}

klog.Infof("Successfully wrote snapshot to %s", objectStore.Target())
Expand Down Expand Up @@ -110,7 +129,115 @@ func (c *Client) Delete(ctx context.Context) error {
return nil
}

func (c *Client) writeSnapshot(ctx context.Context, etcdClient etcd.Client, objectStore types.Storage) error {
// writeEtcdSnapshot pulls a point-in-time snapshot from etcd, wraps it into a tar.gz archive, and stores it in the object store.
func (c *Client) writeEtcdSnapshot(ctx context.Context, etcdClient etcd.Client, objectStore types.Storage) error {
log := klog.FromContext(ctx)

log.Info("Getting point-in-time etcd snapshot")
res, err := etcdClient.SnapshotWithVersion(ctx)
if err != nil {
return fmt.Errorf("snapshot request failed: %w", err)
}
defer res.Snapshot.Close()

dbPath, err := writeTempFile(res.Snapshot)
if err != nil {
return fmt.Errorf("failed to write snapshot to temp file: %w", err)
}
defer os.Remove(dbPath)

log.Info("Creating snapshot archive")
snapshotFileWrite, err := os.CreateTemp("", "snapshot-")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer snapshotFileWrite.Close()
defer os.Remove(snapshotFileWrite.Name())

// don't compress with default level as this can get quite cpu heavy
gzipWriter, err := gzip.NewWriterLevel(snapshotFileWrite, 3)
if err != nil {
return fmt.Errorf("failed to create gzip writer: %w", err)
}
defer gzipWriter.Close()

tarWriter := tar.NewWriter(gzipWriter)
defer tarWriter.Close()
Comment thread
cbalan marked this conversation as resolved.

if c.Options.Release != nil {
// The vcluster create with restore command expects the SnapshotReleaseKey key to be the first key in the tar archive
log.Info("Adding vCluster config to snapshot archive")
releaseBytes, err := json.Marshal(c.Options.Release)
if err != nil {
return fmt.Errorf("failed to marshal vCluster release: %w", err)
}

err = writeArchiveEntry(tarWriter, []byte(SnapshotReleaseKey), releaseBytes)
if err != nil {
return fmt.Errorf("failed to snapshot vCluster release: %w", err)
}
}

if c.Request != nil {
log.Info("Adding snapshot request to snapshot archive")
requestBytes, err := json.Marshal(c.Request)
if err != nil {
return fmt.Errorf("failed to marshal snapshot request: %w", err)
}
key := fmt.Sprintf("%s/%s", RequestStoreKey, APIVersion)
err = writeArchiveEntry(tarWriter, []byte(key), requestBytes)
if err != nil {
return fmt.Errorf("failed to snapshot request: %w", err)
}
}

// The presence of the DBStoreKey dictates the type of the snapshot archive. KeyValue vs EtcdSnapshot.
// Snapshot archive structure changes must be reflected in the getSnapshotArchiveKind func.
log.Info("Adding etcd snapshot to snapshot archive")
if err := writeArchiveFileEntry(tarWriter, DBStoreKey, dbPath); err != nil {
return fmt.Errorf("failed to write etcd snapshot to tar archive: %w", err)
}

if c.skipKeys != nil {
log.Info("Adding skipKeys to snapshot archive")
skipKeysBytes, err := json.Marshal(c.skipKeys)
if err != nil {
return fmt.Errorf("failed to marshal skipKeys: %w", err)
}
err = writeArchiveEntry(tarWriter, []byte(SkipKeysStoreKey), skipKeysBytes)
if err != nil {
return fmt.Errorf("failed to snapshot skipKeys: %w", err)
}
}

log.Info("Closing snapshot archive")
if err := tarWriter.Close(); err != nil {
return fmt.Errorf("failed to close tar writer: %w", err)
}

if err := gzipWriter.Close(); err != nil {
return fmt.Errorf("failed to close gzip writer: %w", err)
}

if err := snapshotFileWrite.Close(); err != nil {
return fmt.Errorf("failed to close snapshot archive writer: %w", err)
}

log.Info("Storing snapshot archive into the object store")
snapshotFileRead, err := os.Open(snapshotFileWrite.Name())
if err != nil {
return fmt.Errorf("failed to open snapshot file for reading: %w", err)
}
defer snapshotFileRead.Close()

if err := objectStore.PutObject(ctx, snapshotFileRead); err != nil {
return fmt.Errorf("failed to write snapshot to object store: %w", err)
}

return nil
}

func (c *Client) writeKeyValueSnapshot(ctx context.Context, etcdClient etcd.Client, objectStore types.Storage) error {
// now stream objects from etcd to object store
errChan := make(chan error)
reader, writer, err := os.Pipe()
Expand Down Expand Up @@ -141,7 +268,7 @@ func (c *Client) writeSnapshot(ctx context.Context, etcdClient etcd.Client, obje
return fmt.Errorf("failed to marshal vCluster release: %w", err)
}

err = writeKeyValue(tarWriter, []byte(SnapshotReleaseKey), releaseBytes)
err = writeArchiveEntry(tarWriter, []byte(SnapshotReleaseKey), releaseBytes)
if err != nil {
return fmt.Errorf("failed to snapshot vCluster release: %w", err)
}
Expand All @@ -154,9 +281,9 @@ func (c *Client) writeSnapshot(ctx context.Context, etcdClient etcd.Client, obje
return fmt.Errorf("failed to marshal snapshot request: %w", err)
}
key := fmt.Sprintf("%s/%s", RequestStoreKey, APIVersion)
err = writeKeyValue(tarWriter, []byte(key), requestBytes)
err = writeArchiveEntry(tarWriter, []byte(key), requestBytes)
if err != nil {
return fmt.Errorf("failed to snapshot snapshot request: %w", err)
return fmt.Errorf("failed to snapshot request: %w", err)
}
}

Expand Down Expand Up @@ -185,7 +312,7 @@ func (c *Client) writeSnapshot(ctx context.Context, etcdClient etcd.Client, obje
}
// write the object into the store
klog.V(1).Infof("Snapshot key %s", key)
err := writeKeyValue(tarWriter, obj.Value.Key, obj.Value.Data)
err := writeArchiveEntry(tarWriter, obj.Value.Key, obj.Value.Data)
if err != nil {
return fmt.Errorf("failed to snapshot key %s: %w", key, err)
}
Expand Down Expand Up @@ -216,7 +343,7 @@ func (c *Client) addResourceToSkip(kindPlural, namespacedName string) {
c.skipKeys[fmt.Sprintf("/registry/%s/%s", kindPlural, namespacedName)] = struct{}{}
}

func writeKeyValue(tarWriter *tar.Writer, key, value []byte) error {
func writeArchiveEntry(tarWriter *tar.Writer, key, value []byte) error {
err := tarWriter.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: string(key),
Expand All @@ -235,3 +362,31 @@ func writeKeyValue(tarWriter *tar.Writer, key, value []byte) error {

return nil
}

func writeArchiveFileEntry(tarWriter *tar.Writer, key, path string) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer f.Close()

stat, err := f.Stat()
if err != nil {
return fmt.Errorf("failed to stat file: %w", err)
}

if err := tarWriter.WriteHeader(&tar.Header{
Typeflag: tar.TypeReg,
Name: key,
Size: stat.Size(),
Mode: 0666,
}); err != nil {
return err
}

if _, err := io.Copy(tarWriter, f); err != nil {
return fmt.Errorf("failed to write file to tar archive: %w", err)
}

return nil
}
Loading
Loading