diff --git a/glusterd2/commands/peers/peer-rpc-svc.go b/glusterd2/commands/peers/peer-rpc-svc.go index d37f3b734..40a9f50ca 100644 --- a/glusterd2/commands/peers/peer-rpc-svc.go +++ b/glusterd2/commands/peers/peer-rpc-svc.go @@ -8,8 +8,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/peer" "github.com/gluster/glusterd2/glusterd2/servers/peerrpc" "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transactionv2" - "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" + "github.com/gluster/glusterd2/glusterd2/transaction" + "github.com/gluster/glusterd2/glusterd2/transaction/cleanuphandler" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/utils" diff --git a/glusterd2/commands/volumes/volume-create.go b/glusterd2/commands/volumes/volume-create.go index 25d420789..5f51c7a95 100644 --- a/glusterd2/commands/volumes/volume-create.go +++ b/glusterd2/commands/volumes/volume-create.go @@ -12,7 +12,6 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" - transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" gderrors "github.com/gluster/glusterd2/pkg/errors" @@ -199,7 +198,7 @@ func CreateVolume(ctx context.Context, req api.VolCreateReq) (status int, err er return http.StatusBadRequest, err } - txn, err := transactionv2.NewTxnWithLocks(ctx, req.Name) + txn, err := transaction.NewTxnWithLocks(ctx, req.Name) if err != nil { return restutils.ErrToStatusCode(err) } diff --git a/glusterd2/commands/volumes/volume-delete.go b/glusterd2/commands/volumes/volume-delete.go index 59b079050..ad62a820e 100644 --- a/glusterd2/commands/volumes/volume-delete.go +++ b/glusterd2/commands/volumes/volume-delete.go @@ -8,7 +8,6 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" - transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gorilla/mux" @@ -45,7 +44,7 @@ func volumeDeleteHandler(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(ctx, "/volumeDeleteHandler") defer span.End() - txn, err := transactionv2.NewTxnWithLocks(ctx, volname) + txn, err := transaction.NewTxnWithLocks(ctx, volname) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) diff --git a/glusterd2/commands/volumes/volume-start.go b/glusterd2/commands/volumes/volume-start.go index a195bb2da..e289eb3d9 100644 --- a/glusterd2/commands/volumes/volume-start.go +++ b/glusterd2/commands/volumes/volume-start.go @@ -10,7 +10,6 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" - transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volgen" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" @@ -156,7 +155,7 @@ func StartVolume(ctx context.Context, volname string, req api.VolumeStartReq) (v ctx, span := trace.StartSpan(ctx, "/volumeStartHandler") defer span.End() - txn, err := transactionv2.NewTxnWithLocks(ctx, volname) + txn, err := transaction.NewTxnWithLocks(ctx, volname) if err != nil { status, err := restutils.ErrToStatusCode(err) return nil, status, err diff --git a/glusterd2/commands/volumes/volume-stop.go b/glusterd2/commands/volumes/volume-stop.go index f1c721fde..99cf160fa 100644 --- a/glusterd2/commands/volumes/volume-stop.go +++ b/glusterd2/commands/volumes/volume-stop.go @@ -11,7 +11,6 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" - transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/glusterd2/volgen" "github.com/gluster/glusterd2/glusterd2/volume" "github.com/gluster/glusterd2/pkg/api" @@ -124,7 +123,7 @@ func volumeStopHandler(w http.ResponseWriter, r *http.Request) { logger := gdctx.GetReqLogger(ctx) volname := mux.Vars(r)["volname"] - txn, err := transactionv2.NewTxnWithLocks(ctx, volname) + txn, err := transaction.NewTxnWithLocks(ctx, volname) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) diff --git a/glusterd2/main.go b/glusterd2/main.go index 60f97e4af..931e6e5bd 100644 --- a/glusterd2/main.go +++ b/glusterd2/main.go @@ -17,8 +17,8 @@ import ( "github.com/gluster/glusterd2/glusterd2/pmap" "github.com/gluster/glusterd2/glusterd2/servers" "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transactionv2" - "github.com/gluster/glusterd2/glusterd2/transactionv2/cleanuphandler" + "github.com/gluster/glusterd2/glusterd2/transaction" + "github.com/gluster/glusterd2/glusterd2/transaction/cleanuphandler" gdutils "github.com/gluster/glusterd2/glusterd2/utils" "github.com/gluster/glusterd2/glusterd2/volgen" "github.com/gluster/glusterd2/glusterd2/xlator" diff --git a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go b/glusterd2/transaction/cleanuphandler/cleanup_handler.go similarity index 98% rename from glusterd2/transactionv2/cleanuphandler/cleanup_handler.go rename to glusterd2/transaction/cleanuphandler/cleanup_handler.go index 56799245c..5863bc697 100644 --- a/glusterd2/transactionv2/cleanuphandler/cleanup_handler.go +++ b/glusterd2/transaction/cleanuphandler/cleanup_handler.go @@ -8,7 +8,7 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transactionv2" + "github.com/gluster/glusterd2/glusterd2/transaction" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" diff --git a/glusterd2/transaction/context.go b/glusterd2/transaction/context.go index 40d4c1077..cde79dc00 100644 --- a/glusterd2/transaction/context.go +++ b/glusterd2/transaction/context.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "reflect" - "time" "github.com/gluster/glusterd2/glusterd2/store" @@ -14,8 +13,6 @@ import ( log "github.com/sirupsen/logrus" ) -const etcdTxnTimeout = 10 - // TxnCtx is used to carry contextual information across the lifetime of a transaction type TxnCtx interface { // Set attaches the given key with value to the context. It updates value if key exists already. @@ -120,7 +117,7 @@ func (c *Tctx) Commit() error { putOps = append(putOps, clientv3.OpPut(key, value)) } - ctx, cancel := context.WithTimeout(context.Background(), etcdTxnTimeout*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), etcdTxnTimeout) txn, err := store.Txn(ctx). If(). Then(putOps...). diff --git a/glusterd2/transaction/context_mock.go b/glusterd2/transaction/context_mock.go deleted file mode 100644 index cb5b60db7..000000000 --- a/glusterd2/transaction/context_mock.go +++ /dev/null @@ -1,62 +0,0 @@ -package transaction - -import ( - "errors" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" -) - -// MockTctx implements a dummy context type that can be used in tests -type MockTctx struct { - data map[string]interface{} -} - -// NewMockCtx returns a new instance of MockTctx -func NewMockCtx() *MockTctx { - return &MockTctx{ - data: make(map[string]interface{}), - } -} - -// Set attaches the given key with value to the context. It updates value if key exists already. -func (m *MockTctx) Set(key string, value interface{}) error { - m.data[key] = value - return nil -} - -// SetNodeResult is similar to Set but prefixes the key with the node UUID specified. -func (m *MockTctx) SetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return m.Set(storeKey, value) -} - -// Get gets the value for the given key. Returns an error if the key is not present -func (m *MockTctx) Get(key string, value interface{}) error { - _, ok := m.data[key] - if !ok { - return errors.New("key not present") - } - return nil -} - -// GetNodeResult is similar to Get but prefixes the key with node UUID specified. -func (m *MockTctx) GetNodeResult(peerID uuid.UUID, key string, value interface{}) error { - storeKey := peerID.String() + "/" + key - return m.Get(storeKey, value) -} - -// Delete deletes the key and value -func (m *MockTctx) Delete(key string) error { - delete(m.data, key) - return nil -} - -// Logger returns a dummy logger -func (m *MockTctx) Logger() log.FieldLogger { - return log.New() -} - -// Prefix returns the prefix to be used for storing values -func (m MockTctx) Prefix() string { - return "mock" -} diff --git a/glusterd2/transactionv2/engine.go b/glusterd2/transaction/engine.go similarity index 100% rename from glusterd2/transactionv2/engine.go rename to glusterd2/transaction/engine.go diff --git a/glusterd2/transactionv2/errors.go b/glusterd2/transaction/errors.go similarity index 100% rename from glusterd2/transactionv2/errors.go rename to glusterd2/transaction/errors.go diff --git a/glusterd2/transactionv2/executor.go b/glusterd2/transaction/executor.go similarity index 100% rename from glusterd2/transactionv2/executor.go rename to glusterd2/transaction/executor.go diff --git a/glusterd2/transaction/lock.go b/glusterd2/transaction/lock.go index 45720bdf1..85c60c1ad 100644 --- a/glusterd2/transaction/lock.go +++ b/glusterd2/transaction/lock.go @@ -5,11 +5,9 @@ import ( "errors" "time" - "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/store" "github.com/coreos/etcd/clientv3/concurrency" - "github.com/pborman/uuid" log "github.com/sirupsen/logrus" ) @@ -27,129 +25,6 @@ var ( ErrLockExists = errors.New("existing lock found for given lock ID") ) -// createLockStepFunc returns the registry IDs of StepFuncs which lock/unlock the given key. -// If existing StepFuncs are not found, new funcs are created and registered. -func createLockStepFunc(key string) (string, string, error) { - lockFuncID := key + ".Lock" - unlockFuncID := key + ".Unlock" - - _, lockFuncFound := getStepFunc(lockFuncID) - _, unlockFuncFound := getStepFunc(unlockFuncID) - - if lockFuncFound && unlockFuncFound { - return lockFuncID, unlockFuncID, nil - } - - key = lockPrefix + key - locker := concurrency.NewMutex(store.Store.Session, key) - - lockFunc := func(c TxnCtx) error { - - ctx, cancel := context.WithTimeout(context.Background(), lockObtainTimeout) - defer cancel() - - c.Logger().WithField("key", key).Debug("attempting to lock") - err := locker.Lock(ctx) - switch err { - case nil: - c.Logger().WithField("key", key).Debug("lock obtained") - case context.DeadlineExceeded: - // Propagate this all the way back to the client as a HTTP 409 response - c.Logger().WithField("key", key).Debug("timeout: failed to obtain lock") - err = ErrLockTimeout - } - - return err - } - RegisterStepFunc(lockFunc, lockFuncID) - - unlockFunc := func(c TxnCtx) error { - - c.Logger().WithField("key", key).Debug("attempting to unlock") - err := locker.Unlock(context.Background()) - if err == nil { - c.Logger().WithField("key", key).Debug("lock unlocked") - } - - return err - } - RegisterStepFunc(unlockFunc, unlockFuncID) - - return lockFuncID, unlockFuncID, nil -} - -// CreateLockSteps returns a lock and an unlock Step which lock/unlock the given key -// TODO: Remove this function -func CreateLockSteps(key string) (*Step, *Step, error) { - lockFunc, unlockFunc, err := createLockStepFunc(key) - if err != nil { - return nil, nil, err - } - - lockStep := &Step{DoFunc: lockFunc, UndoFunc: unlockFunc, Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} - unlockStep := &Step{DoFunc: unlockFunc, UndoFunc: "", Nodes: []uuid.UUID{gdctx.MyUUID}, Skip: false} - - return lockStep, unlockStep, nil -} - -// LockUnlockFunc is signature of functions used for distributed locking -// and unlocking. -type LockUnlockFunc func(ctx context.Context) error - -// CreateLockFuncs creates and returns functions for distributed lock and -// unlock. This is similar to CreateLockSteps() but returns normal functions. -// TODO: Remove this function -func CreateLockFuncs(key string) (LockUnlockFunc, LockUnlockFunc) { - - key = lockPrefix + key - locker := concurrency.NewMutex(store.Store.Session, key) - - // TODO: There is an opportunity for refactor here to re-use code - // between CreateLockFunc and CreateLockSteps. This variant doesn't - // have registry either. - - lockFunc := func(ctx context.Context) error { - logger := gdctx.GetReqLogger(ctx) - if logger == nil { - logger = log.StandardLogger() - } - - ctx, cancel := context.WithTimeout(ctx, lockObtainTimeout) - defer cancel() - - logger.WithField("key", key).Debug("attempting to lock") - err := locker.Lock(ctx) - switch err { - case nil: - logger.WithField("key", key).Debug("lock obtained") - case context.DeadlineExceeded: - // Propagate this all the way back to the client as a HTTP 409 response - logger.WithField("key", key).Debug("timeout: failed to obtain lock") - err = ErrLockTimeout - } - - return err - } - - unlockFunc := func(ctx context.Context) error { - logger := gdctx.GetReqLogger(ctx) - if logger == nil { - logger = log.StandardLogger() - } - - logger.WithField("key", key).Debug("attempting to unlock") - if err := locker.Unlock(context.Background()); err != nil { - logger.WithField("key", key).WithError(err).Error("unlock failed") - return err - } - - logger.WithField("key", key).Debug("lock unlocked") - return nil - } - - return lockFunc, unlockFunc -} - // Locks are the collection of cluster wide transaction lock type Locks map[string]*concurrency.Mutex diff --git a/glusterd2/transaction/rpc-client.go b/glusterd2/transaction/rpc-client.go deleted file mode 100644 index f51e7843b..000000000 --- a/glusterd2/transaction/rpc-client.go +++ /dev/null @@ -1,78 +0,0 @@ -package transaction - -import ( - "context" - "encoding/json" - "errors" - - "github.com/gluster/glusterd2/glusterd2/peer" - "github.com/gluster/glusterd2/pkg/utils" - - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "go.opencensus.io/plugin/ocgrpc" - "google.golang.org/grpc" -) - -// runStepOn will run the step on the specified node -func runStepOn(origCtx context.Context, step string, node uuid.UUID, c TxnCtx) error { - // TODO: I'm creating connections on demand. This should be changed so that - // we have long term connections. - p, err := peer.GetPeerF(node.String()) - if err != nil { - c.Logger().WithError(err).WithField("peerid", node.String()).Error("peer not found") - return err - } - - logger := c.Logger().WithField("remotepeer", p.ID.String()+"("+p.Name+")") - - var conn *grpc.ClientConn - - remote, err := utils.FormRemotePeerAddress(p.PeerAddresses[0]) - if err != nil { - return err - } - - conn, err = grpc.Dial(remote, - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), - grpc.WithInsecure(), - ) - if err == nil && conn != nil { - logger.WithFields(log.Fields{ - "remote": remote, - }).Debug("connected to remote") - } - - if conn == nil { - logger.WithError(err).WithField("remote", p.PeerAddresses[0]).Error("failed to grpc.Dial remote") - return err - } - defer conn.Close() - - client := NewTxnSvcClient(conn) - - req := &TxnStepReq{ - StepFunc: step, - } - data, err := json.Marshal(c) - if err != nil { - logger.WithError(err).Error("failed to JSON marshal transaction context") - return err - } - req.Context = data - - var rsp *TxnStepResp - - rsp, err = client.RunStep(origCtx, req) - if err != nil { - logger.WithError(err).WithField("rpc", "TxnSvc.RunStep").Error("failed RPC call") - return err - } - - if rsp.Error != "" { - logger.WithError(errors.New(rsp.Error)).Error("TxnSvc.Runstep failed on peer") - return errors.New(rsp.Error) - } - - return nil -} diff --git a/glusterd2/transaction/rpc-service.go b/glusterd2/transaction/rpc-service.go deleted file mode 100644 index 43ca372aa..000000000 --- a/glusterd2/transaction/rpc-service.go +++ /dev/null @@ -1,79 +0,0 @@ -package transaction - -import ( - "context" - "encoding/json" - "errors" - - "github.com/gluster/glusterd2/glusterd2/servers/peerrpc" - - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" - "google.golang.org/grpc" -) - -type txnSvc int - -func init() { - peerrpc.Register(new(txnSvc)) -} - -// RunStep handles the incoming request. It executes the requested step and returns the results -func (p *txnSvc) RunStep(rpcCtx context.Context, req *TxnStepReq) (*TxnStepResp, error) { - - var ( - resp TxnStepResp - f StepFunc - err error - ok bool - logger log.FieldLogger - ) - - var ctx Tctx - if err = json.Unmarshal(req.Context, &ctx); err != nil { - log.WithError(err).Error("failed to Unmarshal transaction context") - goto End - } - - logger = ctx.Logger().WithField("stepfunc", req.StepFunc) - logger.Debug("RunStep request received") - - if rpcCtx != nil { - _, span := trace.StartSpan(rpcCtx, req.StepFunc) - reqID := ctx.GetTxnReqID() - span.AddAttributes( - trace.StringAttribute("reqID", reqID), - ) - defer span.End() - } - - f, ok = getStepFunc(req.StepFunc) - if !ok { - err = errors.New("step function not found in registry") - goto End - } - - logger.Debug("executing step function") - if err = f(&ctx); err != nil { - logger.WithError(err).Error("step function failed") - goto End - } - - if err = ctx.Commit(); err != nil { - logger.WithError(err).Error("failed to commit txn context to store") - } - -End: - // Ensure RPC will always send a success reply. Error is stored in - // body of response. - if err != nil { - resp.Error = err.Error() - } - - return &resp, nil -} - -// RegisterService registers txnSvc with the given grpc.Server -func (p *txnSvc) RegisterService(s *grpc.Server) { - RegisterTxnSvcServer(s, p) -} diff --git a/glusterd2/transaction/step.go b/glusterd2/transaction/step.go index 525d8bd4d..a1e2f5de9 100644 --- a/glusterd2/transaction/step.go +++ b/glusterd2/transaction/step.go @@ -3,15 +3,8 @@ package transaction import ( "context" "errors" - "fmt" - "net/http" - - "github.com/gluster/glusterd2/glusterd2/gdctx" - "github.com/gluster/glusterd2/pkg/api" "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" ) // StepFunc is the function that is supposed to be run during a transaction step @@ -35,134 +28,6 @@ var ( ErrStepFuncNotFound = errors.New("stepFunc was not found") ) -// do runs the DoFunc on the nodes -func (s *Step) do(origCtx context.Context, ctx TxnCtx) error { - return runStepFuncOnNodes(origCtx, s.DoFunc, ctx, s.Nodes) -} - -// undo runs the UndoFunc on the nodes -func (s *Step) undo(ctx TxnCtx) error { - if s.UndoFunc != "" { - return runStepFuncOnNodes(context.TODO(), s.UndoFunc, ctx, s.Nodes) - } - return nil -} - -// stepPeerResp is response from a single peer that runs a step -type stepPeerResp struct { - PeerID uuid.UUID - Error error -} - -// stepResp contains response from multiple peers that run a step and the type -// implements the `api.ErrorResponse` interface -type stepResp struct { - Step string - Resps []stepPeerResp - errCount int -} - -func (r stepResp) Error() string { - return fmt.Sprintf("Step %s failed on %d nodes", r.Step, r.errCount) -} - -func (r stepResp) Response() api.ErrorResp { - - var apiResp api.ErrorResp - for _, resp := range r.Resps { - if resp.Error == nil { - continue - } - - apiResp.Errors = append(apiResp.Errors, api.HTTPError{ - Code: int(api.ErrTxnStepFailed), - Message: api.ErrorCodeMap[api.ErrTxnStepFailed], - Fields: map[string]string{ - "peer-id": resp.PeerID.String(), - "step": r.Step, - "error": resp.Error.Error()}, - }) - } - - return apiResp -} - -func (r stepResp) Status() int { - return http.StatusInternalServerError -} - -func runStepFuncOnNodes(origCtx context.Context, stepName string, ctx TxnCtx, nodes []uuid.UUID) error { - - respCh := make(chan stepPeerResp, len(nodes)) - defer close(respCh) - - for _, node := range nodes { - go runStepFuncOnNode(origCtx, stepName, ctx, node, respCh) - } - - // Ideally, we have to cancel the pending go-routines on first error - // response received from any of the nodes. But that's really tricky - // to do. Serializing sequentially is the easiest fix but we lose - // concurrency. Instead, we let the do() function run on all nodes. - - resp := stepResp{ - Step: stepName, - Resps: make([]stepPeerResp, len(nodes)), - } - - var peerResp stepPeerResp - for range nodes { - peerResp = <-respCh - if peerResp.Error != nil { - resp.errCount++ - ctx.Logger().WithError(peerResp.Error).WithFields(log.Fields{ - "step": stepName, "node": peerResp.PeerID, - }).Error("Step failed on node.") - } - resp.Resps = append(resp.Resps, peerResp) - } - - if resp.errCount != 0 { - return resp - } - - return nil -} - -func runStepFuncOnNode(origCtx context.Context, stepName string, ctx TxnCtx, node uuid.UUID, respCh chan<- stepPeerResp) { - - ctx.Logger().WithFields(log.Fields{ - "step": stepName, "node": node, - }).Debug("Running step on node.") - - var err error - if uuid.Equal(node, gdctx.MyUUID) { - err = traceStep(RunStepFuncLocally)(origCtx, stepName, ctx) - } else { - // remote node - err = runStepOn(origCtx, stepName, node, ctx) - } - - respCh <- stepPeerResp{node, err} -} - -type runFunc func(origCtx context.Context, stepName string, ctx TxnCtx) error - -func traceStep(f runFunc) runFunc { - return func(origCtx context.Context, stepName string, ctx TxnCtx) error { - if origCtx != nil { - _, span := trace.StartSpan(origCtx, stepName) - reqID := ctx.GetTxnReqID() - span.AddAttributes( - trace.StringAttribute("reqID", reqID), - ) - defer span.End() - } - - return f(origCtx, stepName, ctx) - } -} - // RunStepFuncLocally runs a step func on local node func RunStepFuncLocally(origCtx context.Context, stepName string, ctx TxnCtx) error { stepFunc, ok := getStepFunc(stepName) diff --git a/glusterd2/transactionv2/steprunner.go b/glusterd2/transaction/steprunner.go similarity index 80% rename from glusterd2/transactionv2/steprunner.go rename to glusterd2/transaction/steprunner.go index a0c2f3f00..b68d54ee0 100644 --- a/glusterd2/transactionv2/steprunner.go +++ b/glusterd2/transaction/steprunner.go @@ -6,15 +6,14 @@ import ( "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transaction" "github.com/pborman/uuid" ) // StepManager is an interface for running a step and also rollback step on local node type StepManager interface { - RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error - RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error + RunStep(ctx context.Context, step *Step, txnCtx TxnCtx) error + RollBackStep(ctx context.Context, step *Step, txnCtx TxnCtx) error SyncStep(ctx context.Context, stepIndex int, txn *Txn) error } @@ -30,7 +29,7 @@ func newStepManager() StepManager { } } -func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { +func (sm *stepManager) shouldRunStep(step *Step) bool { if step.Skip { return false } @@ -46,9 +45,9 @@ func (sm *stepManager) shouldRunStep(step *transaction.Step) bool { // runStep synchronises the locally cached keys and values from the store // before running the step function on node -func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx transaction.TxnCtx) error { +func (sm *stepManager) runStep(ctx context.Context, stepName string, txnCtx TxnCtx) error { txnCtx.SyncCache() - return transaction.RunStepFuncLocally(ctx, stepName, txnCtx) + return RunStepFuncLocally(ctx, stepName, txnCtx) } // isPrevStepsExecutedOnNode reports that all previous steps @@ -92,7 +91,7 @@ func (sm *stepManager) SyncStep(ctx context.Context, syncStepIndex int, txn *Txn } // RollBackStep will rollback a given step on local node -func (sm *stepManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { +func (sm *stepManager) RollBackStep(ctx context.Context, step *Step, txnCtx TxnCtx) error { if !sm.shouldRunStep(step) { txnCtx.Logger().WithField("step", step.UndoFunc).Debug("peer is excluded in running this step") return nil @@ -106,7 +105,7 @@ func (sm *stepManager) RollBackStep(ctx context.Context, step *transaction.Step, } // RunStepRunStep will execute the step on local node -func (sm *stepManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) error { +func (sm *stepManager) RunStep(ctx context.Context, step *Step, txnCtx TxnCtx) error { if !sm.shouldRunStep(step) { txnCtx.Logger().WithField("step", step.DoFunc).Debug("peer is excluded in running this step") return nil diff --git a/glusterd2/transactionv2/steptracing.go b/glusterd2/transaction/steptracing.go similarity index 88% rename from glusterd2/transactionv2/steptracing.go rename to glusterd2/transaction/steptracing.go index f198d20e4..d1a356326 100644 --- a/glusterd2/transactionv2/steptracing.go +++ b/glusterd2/transaction/steptracing.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "github.com/gluster/glusterd2/glusterd2/transaction" - "go.opencensus.io/trace" ) @@ -18,7 +16,7 @@ type tracingManager struct { } // RunStep is a middleware which creates tracing span for step.DoFunc -func (t *tracingManager) RunStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) (err error) { +func (t *tracingManager) RunStep(ctx context.Context, step *Step, txnCtx TxnCtx) (err error) { spanName := fmt.Sprintf("RunStep/%s", step.DoFunc) ctx, span := trace.StartSpan(ctx, spanName) defer span.End() @@ -37,7 +35,7 @@ func (t *tracingManager) RunStep(ctx context.Context, step *transaction.Step, tx } // RollBackStep is a middleware which creates tracing span for step.UndoFunc -func (t *tracingManager) RollBackStep(ctx context.Context, step *transaction.Step, txnCtx transaction.TxnCtx) (err error) { +func (t *tracingManager) RollBackStep(ctx context.Context, step *Step, txnCtx TxnCtx) (err error) { spanName := fmt.Sprintf("RollBackStep/%s", step.UndoFunc) ctx, span := trace.StartSpan(ctx, spanName) defer span.End() diff --git a/glusterd2/transactionv2/tracingexecutor.go b/glusterd2/transaction/tracingexecutor.go similarity index 100% rename from glusterd2/transactionv2/tracingexecutor.go rename to glusterd2/transaction/tracingexecutor.go diff --git a/glusterd2/transaction/transaction-rpc.pb.go b/glusterd2/transaction/transaction-rpc.pb.go deleted file mode 100644 index 2e1ff4928..000000000 --- a/glusterd2/transaction/transaction-rpc.pb.go +++ /dev/null @@ -1,205 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: glusterd2/transaction/transaction-rpc.proto - -package transaction - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type TxnStepReq struct { - StepFunc string `protobuf:"bytes,1,opt,name=StepFunc,proto3" json:"StepFunc,omitempty"` - Context []byte `protobuf:"bytes,2,opt,name=Context,proto3" json:"Context,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TxnStepReq) Reset() { *m = TxnStepReq{} } -func (m *TxnStepReq) String() string { return proto.CompactTextString(m) } -func (*TxnStepReq) ProtoMessage() {} -func (*TxnStepReq) Descriptor() ([]byte, []int) { - return fileDescriptor_8c3c13f7a3182c1a, []int{0} -} - -func (m *TxnStepReq) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TxnStepReq.Unmarshal(m, b) -} -func (m *TxnStepReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TxnStepReq.Marshal(b, m, deterministic) -} -func (m *TxnStepReq) XXX_Merge(src proto.Message) { - xxx_messageInfo_TxnStepReq.Merge(m, src) -} -func (m *TxnStepReq) XXX_Size() int { - return xxx_messageInfo_TxnStepReq.Size(m) -} -func (m *TxnStepReq) XXX_DiscardUnknown() { - xxx_messageInfo_TxnStepReq.DiscardUnknown(m) -} - -var xxx_messageInfo_TxnStepReq proto.InternalMessageInfo - -func (m *TxnStepReq) GetStepFunc() string { - if m != nil { - return m.StepFunc - } - return "" -} - -func (m *TxnStepReq) GetContext() []byte { - if m != nil { - return m.Context - } - return nil -} - -type TxnStepResp struct { - Error string `protobuf:"bytes,1,opt,name=Error,proto3" json:"Error,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TxnStepResp) Reset() { *m = TxnStepResp{} } -func (m *TxnStepResp) String() string { return proto.CompactTextString(m) } -func (*TxnStepResp) ProtoMessage() {} -func (*TxnStepResp) Descriptor() ([]byte, []int) { - return fileDescriptor_8c3c13f7a3182c1a, []int{1} -} - -func (m *TxnStepResp) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_TxnStepResp.Unmarshal(m, b) -} -func (m *TxnStepResp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_TxnStepResp.Marshal(b, m, deterministic) -} -func (m *TxnStepResp) XXX_Merge(src proto.Message) { - xxx_messageInfo_TxnStepResp.Merge(m, src) -} -func (m *TxnStepResp) XXX_Size() int { - return xxx_messageInfo_TxnStepResp.Size(m) -} -func (m *TxnStepResp) XXX_DiscardUnknown() { - xxx_messageInfo_TxnStepResp.DiscardUnknown(m) -} - -var xxx_messageInfo_TxnStepResp proto.InternalMessageInfo - -func (m *TxnStepResp) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func init() { - proto.RegisterType((*TxnStepReq)(nil), "transaction.TxnStepReq") - proto.RegisterType((*TxnStepResp)(nil), "transaction.TxnStepResp") -} - -func init() { - proto.RegisterFile("glusterd2/transaction/transaction-rpc.proto", fileDescriptor_8c3c13f7a3182c1a) -} - -var fileDescriptor_8c3c13f7a3182c1a = []byte{ - // 174 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4e, 0xcf, 0x29, 0x2d, - 0x2e, 0x49, 0x2d, 0x4a, 0x31, 0xd2, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x4e, 0x4c, 0x2e, 0xc9, 0xcc, - 0xcf, 0x43, 0x66, 0xeb, 0x16, 0x15, 0x24, 0xeb, 0x15, 0x14, 0xe5, 0x97, 0xe4, 0x0b, 0x71, 0x23, - 0x09, 0x2b, 0x39, 0x71, 0x71, 0x85, 0x54, 0xe4, 0x05, 0x97, 0xa4, 0x16, 0x04, 0xa5, 0x16, 0x0a, - 0x49, 0x71, 0x71, 0x80, 0x98, 0x6e, 0xa5, 0x79, 0xc9, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, - 0x70, 0xbe, 0x90, 0x04, 0x17, 0xbb, 0x73, 0x7e, 0x5e, 0x49, 0x6a, 0x45, 0x89, 0x04, 0x93, 0x02, - 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0xa4, 0xcc, 0xc5, 0x0d, 0x37, 0xa3, 0xb8, 0x40, 0x48, 0x84, - 0x8b, 0xd5, 0xb5, 0xa8, 0x28, 0xbf, 0x08, 0x6a, 0x02, 0x84, 0x63, 0xe4, 0xc1, 0xc5, 0x06, 0x52, - 0x54, 0x96, 0x2c, 0x64, 0xc7, 0xc5, 0x1e, 0x54, 0x0a, 0x56, 0x2e, 0x24, 0xae, 0x87, 0xe4, 0x16, - 0x3d, 0x84, 0x43, 0xa4, 0x24, 0xb0, 0x4b, 0x14, 0x17, 0x28, 0x31, 0x24, 0xb1, 0x81, 0xbd, 0x61, - 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x69, 0x42, 0xfa, 0xe6, 0xf5, 0x00, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// TxnSvcClient is the client API for TxnSvc service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type TxnSvcClient interface { - RunStep(ctx context.Context, in *TxnStepReq, opts ...grpc.CallOption) (*TxnStepResp, error) -} - -type txnSvcClient struct { - cc *grpc.ClientConn -} - -func NewTxnSvcClient(cc *grpc.ClientConn) TxnSvcClient { - return &txnSvcClient{cc} -} - -func (c *txnSvcClient) RunStep(ctx context.Context, in *TxnStepReq, opts ...grpc.CallOption) (*TxnStepResp, error) { - out := new(TxnStepResp) - err := c.cc.Invoke(ctx, "/transaction.TxnSvc/RunStep", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// TxnSvcServer is the server API for TxnSvc service. -type TxnSvcServer interface { - RunStep(context.Context, *TxnStepReq) (*TxnStepResp, error) -} - -func RegisterTxnSvcServer(s *grpc.Server, srv TxnSvcServer) { - s.RegisterService(&_TxnSvc_serviceDesc, srv) -} - -func _TxnSvc_RunStep_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TxnStepReq) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(TxnSvcServer).RunStep(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/transaction.TxnSvc/RunStep", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(TxnSvcServer).RunStep(ctx, req.(*TxnStepReq)) - } - return interceptor(ctx, in, info, handler) -} - -var _TxnSvc_serviceDesc = grpc.ServiceDesc{ - ServiceName: "transaction.TxnSvc", - HandlerType: (*TxnSvcServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "RunStep", - Handler: _TxnSvc_RunStep_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "glusterd2/transaction/transaction-rpc.proto", -} diff --git a/glusterd2/transaction/transaction-rpc.proto b/glusterd2/transaction/transaction-rpc.proto deleted file mode 100644 index c84fafff2..000000000 --- a/glusterd2/transaction/transaction-rpc.proto +++ /dev/null @@ -1,16 +0,0 @@ -syntax = "proto3"; - -package transaction; - -message TxnStepReq { - string StepFunc = 1; - bytes Context = 2; // Context JSON encoded TxnCtx -} - -message TxnStepResp { - string Error = 1; -} - -service TxnSvc { - rpc RunStep(TxnStepReq) returns(TxnStepResp) {} -} diff --git a/glusterd2/transaction/transaction.go b/glusterd2/transaction/transaction.go index deca9501d..fbd211bc8 100644 --- a/glusterd2/transaction/transaction.go +++ b/glusterd2/transaction/transaction.go @@ -3,62 +3,71 @@ package transaction import ( "context" + "errors" "expvar" "fmt" + "time" "github.com/gluster/glusterd2/glusterd2/gdctx" "github.com/gluster/glusterd2/glusterd2/store" "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/clientv3/concurrency" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" + "go.opencensus.io/trace" ) const ( - txnPrefix = "transaction/" + txnPrefix = "transaction/" + txnTimeOut = time.Minute * 3 ) -var expTxn = expvar.NewMap("txn") +var ( + expTxn *expvar.Map +) // Txn is a set of steps type Txn struct { - id uuid.UUID - locks Locks - reqID uuid.UUID - storePrefix string - - Ctx TxnCtx - Steps []*Step - DontCheckAlive bool - DisableRollback bool + locks Locks + // Nodes is the union of the all the TxnStep.Nodes and is implicitly // set in Txn.Do(). This list is used to determine liveness of the // nodes before running the transaction steps. - Nodes []uuid.UUID - OrigCtx context.Context + Nodes []uuid.UUID `json:"nodes"` + StorePrefix string `json:"store_prefix"` + ID uuid.UUID `json:"id"` + ReqID uuid.UUID `json:"req_id"` + Ctx TxnCtx `json:"ctx"` + Steps []*Step `json:"steps"` + DontCheckAlive bool `json:"dont_check_alive"` + DisableRollback bool `json:"disable_rollback"` + StartTime time.Time `json:"start_time"` + TxnSpanCtx trace.SpanContext `json:"txn_span_ctx"` + + success chan struct{} + error chan error + stop chan struct{} + succeeded bool } // NewTxn returns an initialized Txn without any steps func NewTxn(ctx context.Context) *Txn { t := new(Txn) - t.id = uuid.NewRandom() - t.reqID = gdctx.GetReqID(ctx) - t.locks = make(map[string]*concurrency.Mutex) - t.storePrefix = txnPrefix + t.id.String() + "/" + t.ID = uuid.NewRandom() + t.ReqID = gdctx.GetReqID(ctx) + t.locks = Locks{} + t.StorePrefix = txnPrefix + t.ID.String() + "/" config := &TxnCtxConfig{ LogFields: log.Fields{ - "txnid": t.id.String(), - "reqid": t.reqID.String(), + "txnid": t.ID.String(), + "reqid": t.ReqID.String(), }, - StorePrefix: t.storePrefix, + StorePrefix: t.StorePrefix, } - t.Ctx = newCtx(config) - - t.OrigCtx = ctx + t.Ctx = NewCtx(config) + spanCtx := trace.FromContext(ctx) + t.TxnSpanCtx = spanCtx.SpanContext() t.Ctx.Logger().Debug("new transaction created") return t } @@ -66,118 +75,189 @@ func NewTxn(ctx context.Context) *Txn { // NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { t := NewTxn(ctx) + t.locks = Locks{} + err := t.acquireClusterLocks(lockIDs...) + return t, err +} + +func (t *Txn) acquireClusterLocks(lockIDs ...string) error { + if t.locks == nil { + t.locks = Locks{} + } for _, id := range lockIDs { logger := t.Ctx.Logger().WithField("lockID", id) - logger.Debug("attempting to obtain lock") - + logger.Debug("txn attempts to acquire cluster lock") if err := t.locks.Lock(id); err != nil { logger.WithError(err).Error("failed to obtain lock") - t.Done() - return nil, err + t.releaseLocks() + return err } - - logger.Debug("lock obtained") + logger.Debug("cluster lock acquired") } - return t, nil + return nil +} + +func (t *Txn) releaseLocks() { + t.locks.UnLock(context.Background()) } // Done releases any obtained locks and cleans up the transaction namespace // Done must be called after a transaction ends func (t *Txn) Done() { - // Release obtained locks - for _, locker := range t.locks { - locker.Unlock(context.Background()) + defer t.releaseLocks() + if !t.succeeded { + return } - // Wipe txn namespace - if _, err := store.Delete(context.TODO(), t.storePrefix, clientv3.WithPrefix()); err != nil { + t.Ctx.Logger().Info("transaction succeeded on all nodes") + t.removeContextData() + + if err := GlobalTxnManager.RemoveTransaction(t.ID); err != nil { + t.Ctx.Logger().WithError(err).Error("failed to remove txn data from pending-transaction namespace") + } +} + +func (t *Txn) removeContextData() { + if _, err := store.Delete(context.TODO(), t.StorePrefix, clientv3.WithPrefix()); err != nil { t.Ctx.Logger().WithError(err).WithField("key", - t.storePrefix).Error("Failed to remove transaction namespace from store") + t.StorePrefix).Error("Failed to remove transaction namespace from store") } - expTxn.Add("initiated_txn_in_progress", -1) } func (t *Txn) checkAlive() error { - - if len(t.Nodes) == 0 { - for _, s := range t.Steps { - t.Nodes = append(t.Nodes, s.Nodes...) - } - } - t.Nodes = nodesUnion(t.Nodes) - for _, node := range t.Nodes { // TODO: Using prefixed query, get all alive nodes in a single etcd query if _, online := store.Store.IsNodeAlive(node); !online { return fmt.Errorf("node %s is probably down", node.String()) } } - return nil } // Do runs the transaction on the cluster func (t *Txn) Do() error { - if !t.DontCheckAlive { - if err := t.checkAlive(); err != nil { - return err - } + var ( + timer = time.NewTimer(txnTimeOut) + ) + { + t.success = make(chan struct{}) + t.stop = make(chan struct{}) + t.error = make(chan error) + t.StartTime = time.Now() } + defer timer.Stop() + defer close(t.stop) t.Ctx.Logger().Debug("Starting transaction") - expTxn.Add("initiated_txn_in_progress", 1) + go t.waitForCompletion() - // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.Commit(); err != nil { + if err := t.prepare(); err != nil { + t.Ctx.Logger().WithError(err).Error("failed in preparing transaction") return err } - for i, s := range t.Steps { - if s.Skip { - continue + t.Ctx.Logger().Debug("waiting for completion of transaction") + + select { + case <-t.success: + t.succeeded = true + case err := <-t.error: + t.onFailure(err) + return err + case <-timer.C: + t.onFailure(errTxnTimeout) + return errTxnTimeout + } + + return nil +} + +func (t *Txn) onFailure(err error) error { + t.Ctx.Logger().WithError(err).Error("error in executing txn, marking as failure") + txnStatus := TxnStatus{State: txnFailed, TxnID: t.ID, Reason: err.Error()} + return GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, t.Nodes...) +} + +// prepare prepares a transaction before adding it to store +func (t *Txn) prepare() error { + if len(t.Nodes) == 0 { + for _, s := range t.Steps { + t.Nodes = append(t.Nodes, s.Nodes...) } + } + t.Nodes = nodesUnion(t.Nodes) - if err := s.do(t.OrigCtx, t.Ctx); err != nil { - if t.DontCheckAlive && isNodeUnreachable(err) { - continue - } - expTxn.Add("initiated_txn_failure", 1) - if !t.DisableRollback { - t.Ctx.Logger().WithError(err).Error("Transaction failed, rolling back changes") - t.undo(i) - } + if !t.DontCheckAlive { + if err := t.checkAlive(); err != nil { return err } } - expTxn.Add("initiated_txn_success", 1) - return nil + if err := GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnPending, TxnID: t.ID}, t.ID, t.Nodes...); err != nil { + return err + } + + if err := GlobalTxnManager.UpdateLastExecutedStep(-1, t.ID, t.Nodes...); err != nil { + return err + } + + // commit txn.Ctx.Set()s done in REST handlers to the store + if err := t.Ctx.Commit(); err != nil { + return err + } + + t.Ctx.Logger().Debug("adding txn to store") + return GlobalTxnManager.AddTxn(t) } -func isNodeUnreachable(err error) bool { - unreachable := true - if s, ok := err.(*stepResp); ok { - for _, e := range s.Resps { - if grpc.Code(e.Error) != codes.Unavailable { - unreachable = false +// notifyState will send a notification on `success` chan if txn got marked as succeeded on given +// nodeID. In case txn got failed on the given nodeID then it will send a notification on Txn.error +// chan. +func (t *Txn) notifyState(nodeID uuid.UUID, success chan<- struct{}) { + txnStatusChan := GlobalTxnManager.WatchTxnStatus(t.stop, t.ID, nodeID) + + for { + select { + case <-t.stop: + return + case status := <-txnStatusChan: + log.WithFields(log.Fields{ + "nodeId": nodeID.String(), + "status": fmt.Sprintf("%+v", status), + }).Debug("state received") + + if status.State == txnSucceeded { + success <- struct{}{} + return + } else if status.State == txnFailed { + t.error <- errors.New(status.Reason) + return } } } - return unreachable } -// undo undoes a transaction and will be automatically called by Perform if any step fails. -// The Steps are undone in the reverse order, from the failed step. -func (t *Txn) undo(n int) { - for i := n; i >= 0; i-- { - if t.Steps[i].Skip { - continue +// waitForCompletion will wait for transaction to complete on all nodes. +// If txn got marked as succeeded on all nodes then it will send a notification +// on Txn.success chan. +func (t *Txn) waitForCompletion() { + var successChan = make(chan struct{}) + + for _, nodeID := range t.Nodes { + go t.notifyState(nodeID, successChan) + } + + for range t.Nodes { + select { + case <-t.stop: + return + case <-successChan: } - t.Steps[i].undo(t.Ctx) } + t.success <- struct{}{} } // nodesUnion removes duplicate nodes @@ -192,3 +272,25 @@ func nodesUnion(nodes []uuid.UUID) []uuid.UUID { } return nodes } + +// FilterNonFailedTxn will return txns which are not marked as failed +func FilterNonFailedTxn(txns []*Txn) []*Txn { + var nonFailedTxns []*Txn + for _, txn := range txns { + txnStatus, err := GlobalTxnManager.GetTxnStatus(txn.ID, gdctx.MyUUID) + if err == nil && txnStatus.State.Valid() && txnStatus.State != txnFailed { + nonFailedTxns = append(nonFailedTxns, txn) + } + } + return nonFailedTxns +} + +func init() { + expTxn = expvar.NewMap("txn") + expTxn.Set("engine_config", expvar.Func(func() interface{} { + return map[string]interface{}{ + "txn_timeout_second": txnTimeOut.Seconds(), + "txn_sync_timeout_second": txnSyncTimeout.Seconds(), + } + })) +} diff --git a/glusterd2/transactionv2/txnmanager.go b/glusterd2/transaction/txnmanager.go similarity index 97% rename from glusterd2/transactionv2/txnmanager.go rename to glusterd2/transaction/txnmanager.go index a2cd5174d..46e3caf64 100644 --- a/glusterd2/transactionv2/txnmanager.go +++ b/glusterd2/transaction/txnmanager.go @@ -11,7 +11,6 @@ import ( "time" "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transaction" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" @@ -186,7 +185,7 @@ func (tm *txnManager) watchRespToTxns(resp clientv3.WatchResponse) (txns []*Txn) continue } - txn := &Txn{Ctx: new(transaction.Tctx)} + txn := &Txn{Ctx: new(Tctx)} if err := json.Unmarshal(event.Kv.Value, txn); err != nil { continue } @@ -220,7 +219,7 @@ func (tm *txnManager) GetTxnByUUID(id uuid.UUID) (*Txn, error) { kv := resp.Kvs[0] - txn := &Txn{Ctx: new(transaction.Tctx)} + txn := &Txn{Ctx: new(Tctx)} if err := json.Unmarshal(kv.Value, txn); err != nil { return nil, err } @@ -239,7 +238,7 @@ func (tm *txnManager) GetTxns() (txns []*Txn) { continue } - txn := &Txn{Ctx: new(transaction.Tctx)} + txn := &Txn{Ctx: new(Tctx)} if err := json.Unmarshal(kv.Value, txn); err != nil { continue } @@ -252,7 +251,7 @@ func (tm *txnManager) GetTxns() (txns []*Txn) { func (tm *txnManager) UpDateTxnStatus(status TxnStatus, txnID uuid.UUID, nodeIDs ...uuid.UUID) error { var ( ctx, cancel = context.WithTimeout(context.Background(), etcdTxnTimeout) - clusterLock = transaction.Locks{} + clusterLock = Locks{} putOps []clientv3.Op ) @@ -284,7 +283,7 @@ func (tm *txnManager) GetTxnStatus(txnID uuid.UUID, nodeID uuid.UUID) (TxnStatus var ( ctx, cancel = context.WithCancel(context.Background()) key = tm.getStoreKey(txnID.String(), nodeID.String(), TxnStatusPrefix) - clusterLock = transaction.Locks{} + clusterLock = Locks{} ) defer cancel() diff --git a/glusterd2/transactionv2/types.go b/glusterd2/transaction/types.go similarity index 100% rename from glusterd2/transactionv2/types.go rename to glusterd2/transaction/types.go diff --git a/glusterd2/transactionv2/utils.go b/glusterd2/transaction/utils.go similarity index 100% rename from glusterd2/transactionv2/utils.go rename to glusterd2/transaction/utils.go diff --git a/glusterd2/transactionv2/transaction.go b/glusterd2/transactionv2/transaction.go deleted file mode 100644 index a80e3ea19..000000000 --- a/glusterd2/transactionv2/transaction.go +++ /dev/null @@ -1,296 +0,0 @@ -// Package transaction implements a distributed transaction handling framework -package transaction - -import ( - "context" - "errors" - "expvar" - "fmt" - "time" - - "github.com/gluster/glusterd2/glusterd2/gdctx" - "github.com/gluster/glusterd2/glusterd2/store" - "github.com/gluster/glusterd2/glusterd2/transaction" - - "github.com/coreos/etcd/clientv3" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "go.opencensus.io/trace" -) - -const ( - txnPrefix = "transaction/" - txnTimeOut = time.Minute * 3 -) - -// Txn is a set of steps -type Txn struct { - locks transaction.Locks - - // Nodes is the union of the all the TxnStep.Nodes and is implicitly - // set in Txn.Do(). This list is used to determine liveness of the - // nodes before running the transaction steps. - Nodes []uuid.UUID `json:"nodes"` - StorePrefix string `json:"store_prefix"` - ID uuid.UUID `json:"id"` - ReqID uuid.UUID `json:"req_id"` - Ctx transaction.TxnCtx `json:"ctx"` - Steps []*transaction.Step `json:"steps"` - DontCheckAlive bool `json:"dont_check_alive"` - DisableRollback bool `json:"disable_rollback"` - StartTime time.Time `json:"start_time"` - TxnSpanCtx trace.SpanContext `json:"txn_span_ctx"` - - success chan struct{} - error chan error - stop chan struct{} - succeeded bool -} - -// NewTxn returns an initialized Txn without any steps -func NewTxn(ctx context.Context) *Txn { - t := new(Txn) - - t.ID = uuid.NewRandom() - t.ReqID = gdctx.GetReqID(ctx) - t.locks = transaction.Locks{} - t.StorePrefix = txnPrefix + t.ID.String() + "/" - config := &transaction.TxnCtxConfig{ - LogFields: log.Fields{ - "txnid": t.ID.String(), - "reqid": t.ReqID.String(), - }, - StorePrefix: t.StorePrefix, - } - t.Ctx = transaction.NewCtx(config) - spanCtx := trace.FromContext(ctx) - t.TxnSpanCtx = spanCtx.SpanContext() - t.Ctx.Logger().Debug("new transaction created") - return t -} - -// NewTxnWithLocks returns an empty Txn with locks obtained on given lockIDs -func NewTxnWithLocks(ctx context.Context, lockIDs ...string) (*Txn, error) { - t := NewTxn(ctx) - t.locks = transaction.Locks{} - err := t.acquireClusterLocks(lockIDs...) - return t, err -} - -func (t *Txn) acquireClusterLocks(lockIDs ...string) error { - if t.locks == nil { - t.locks = transaction.Locks{} - } - - for _, id := range lockIDs { - logger := t.Ctx.Logger().WithField("lockID", id) - logger.Debug("txn attempts to acquire cluster lock") - if err := t.locks.Lock(id); err != nil { - logger.WithError(err).Error("failed to obtain lock") - t.releaseLocks() - return err - } - logger.Debug("cluster lock acquired") - } - - return nil -} - -func (t *Txn) releaseLocks() { - t.locks.UnLock(context.Background()) -} - -// Done releases any obtained locks and cleans up the transaction namespace -// Done must be called after a transaction ends -func (t *Txn) Done() { - defer t.releaseLocks() - if !t.succeeded { - return - } - - t.Ctx.Logger().Info("transaction succeeded on all nodes") - t.removeContextData() - - if err := GlobalTxnManager.RemoveTransaction(t.ID); err != nil { - t.Ctx.Logger().WithError(err).Error("failed to remove txn data from pending-transaction namespace") - } -} - -func (t *Txn) removeContextData() { - if _, err := store.Delete(context.TODO(), t.StorePrefix, clientv3.WithPrefix()); err != nil { - t.Ctx.Logger().WithError(err).WithField("key", - t.StorePrefix).Error("Failed to remove transaction namespace from store") - } - -} - -func (t *Txn) checkAlive() error { - for _, node := range t.Nodes { - // TODO: Using prefixed query, get all alive nodes in a single etcd query - if _, online := store.Store.IsNodeAlive(node); !online { - return fmt.Errorf("node %s is probably down", node.String()) - } - } - return nil -} - -// Do runs the transaction on the cluster -func (t *Txn) Do() error { - var ( - timer = time.NewTimer(txnTimeOut) - ) - { - t.success = make(chan struct{}) - t.stop = make(chan struct{}) - t.error = make(chan error) - t.StartTime = time.Now() - } - defer timer.Stop() - defer close(t.stop) - - t.Ctx.Logger().Debug("Starting transaction") - go t.waitForCompletion() - - if err := t.prepare(); err != nil { - t.Ctx.Logger().WithError(err).Error("failed in preparing transaction") - return err - } - - t.Ctx.Logger().Debug("waiting for completion of transaction") - - select { - case <-t.success: - t.succeeded = true - case err := <-t.error: - t.onFailure(err) - return err - case <-timer.C: - t.onFailure(errTxnTimeout) - return errTxnTimeout - } - - return nil -} - -func (t *Txn) onFailure(err error) error { - t.Ctx.Logger().WithError(err).Error("error in executing txn, marking as failure") - txnStatus := TxnStatus{State: txnFailed, TxnID: t.ID, Reason: err.Error()} - return GlobalTxnManager.UpDateTxnStatus(txnStatus, t.ID, t.Nodes...) -} - -// prepare prepares a transaction before adding it to store -func (t *Txn) prepare() error { - if len(t.Nodes) == 0 { - for _, s := range t.Steps { - t.Nodes = append(t.Nodes, s.Nodes...) - } - } - t.Nodes = nodesUnion(t.Nodes) - - if !t.DontCheckAlive { - if err := t.checkAlive(); err != nil { - return err - } - } - - if err := GlobalTxnManager.UpDateTxnStatus(TxnStatus{State: txnPending, TxnID: t.ID}, t.ID, t.Nodes...); err != nil { - return err - } - - if err := GlobalTxnManager.UpdateLastExecutedStep(-1, t.ID, t.Nodes...); err != nil { - return err - } - - // commit txn.Ctx.Set()s done in REST handlers to the store - if err := t.Ctx.Commit(); err != nil { - return err - } - - t.Ctx.Logger().Debug("adding txn to store") - return GlobalTxnManager.AddTxn(t) -} - -// notifyState will send a notification on `success` chan if txn got marked as succeeded on given -// nodeID. In case txn got failed on the given nodeID then it will send a notification on Txn.error -// chan. -func (t *Txn) notifyState(nodeID uuid.UUID, success chan<- struct{}) { - txnStatusChan := GlobalTxnManager.WatchTxnStatus(t.stop, t.ID, nodeID) - - for { - select { - case <-t.stop: - return - case status := <-txnStatusChan: - log.WithFields(log.Fields{ - "nodeId": nodeID.String(), - "status": fmt.Sprintf("%+v", status), - }).Debug("state received") - - if status.State == txnSucceeded { - success <- struct{}{} - return - } else if status.State == txnFailed { - t.error <- errors.New(status.Reason) - return - } - } - } -} - -// waitForCompletion will wait for transaction to complete on all nodes. -// If txn got marked as succeeded on all nodes then it will send a notification -// on Txn.success chan. -func (t *Txn) waitForCompletion() { - var successChan = make(chan struct{}) - - for _, nodeID := range t.Nodes { - go t.notifyState(nodeID, successChan) - } - - for range t.Nodes { - select { - case <-t.stop: - return - case <-successChan: - } - } - t.success <- struct{}{} -} - -// nodesUnion removes duplicate nodes -func nodesUnion(nodes []uuid.UUID) []uuid.UUID { - for i := 0; i < len(nodes); i++ { - for j := i + 1; j < len(nodes); j++ { - if uuid.Equal(nodes[i], nodes[j]) { - nodes = append(nodes[:j], nodes[j+1:]...) - j-- - } - } - } - return nodes -} - -// FilterNonFailedTxn will return txns which are not marked as failed -func FilterNonFailedTxn(txns []*Txn) []*Txn { - var nonFailedTxns []*Txn - for _, txn := range txns { - txnStatus, err := GlobalTxnManager.GetTxnStatus(txn.ID, gdctx.MyUUID) - if err == nil && txnStatus.State.Valid() && txnStatus.State != txnFailed { - nonFailedTxns = append(nonFailedTxns, txn) - } - } - return nonFailedTxns -} - -func init() { - expVar := expvar.Get("txn") - if expVar == nil { - expVar = expvar.NewMap("txn") - } - expVar.(*expvar.Map).Set("engine_config", expvar.Func(func() interface{} { - return map[string]interface{}{ - "txn_timeout_second": txnTimeOut.Seconds(), - "txn_sync_timeout_second": txnSyncTimeout.Seconds(), - } - })) -} diff --git a/plugins/tracemgmt/rest.go b/plugins/tracemgmt/rest.go index 67e994766..995ed94b3 100644 --- a/plugins/tracemgmt/rest.go +++ b/plugins/tracemgmt/rest.go @@ -7,7 +7,6 @@ import ( "github.com/gluster/glusterd2/glusterd2/peer" restutils "github.com/gluster/glusterd2/glusterd2/servers/rest/utils" "github.com/gluster/glusterd2/glusterd2/transaction" - transactionv2 "github.com/gluster/glusterd2/glusterd2/transactionv2" "github.com/gluster/glusterd2/pkg/errors" tracemgmtapi "github.com/gluster/glusterd2/plugins/tracemgmt/api" "github.com/gluster/glusterd2/plugins/tracemgmt/traceutils" @@ -34,7 +33,7 @@ func tracingEnableHandler(w http.ResponseWriter, r *http.Request) { return } - txn, err := transactionv2.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) + txn, err := transaction.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) @@ -124,7 +123,7 @@ func tracingUpdateHandler(w http.ResponseWriter, r *http.Request) { return } - txn, err := transactionv2.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) + txn, err := transaction.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err) @@ -206,7 +205,7 @@ func tracingDisableHandler(w http.ResponseWriter, r *http.Request) { return } - txn, err := transactionv2.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) + txn, err := transaction.NewTxnWithLocks(ctx, gdctx.MyClusterID.String()) if err != nil { status, err := restutils.ErrToStatusCode(err) restutils.SendHTTPError(ctx, w, status, err)