Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// ShardDDLPessimismOperationKeyAdapter is used to store shard DDL operation in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL operation.
ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/")
// ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model.
// k/v: Encode(task-name, downSchema, downTable) -> DDLs.
ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/")
Comment thread
lance6716 marked this conversation as resolved.

// ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask.
// In other words, if any Info for this subtask exists, we should obey source tables in the Info.
Expand Down Expand Up @@ -100,7 +103,7 @@ func keyAdapterKeysLen(s KeyAdapter) int {
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter:
return 2
case ShardDDLOptimismInitSchemaKeyAdapter:
case ShardDDLOptimismInitSchemaKeyAdapter, ShardDDLPessimismDDLsKeyAdapter:
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest
}

// adjust unsynced field in sync status by looking at DDL locks.
// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself
// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself.
func (s *Server) fillUnsyncedStatus(resps []*pb.QueryStatusResponse) {
for _, resp := range resps {
for _, subtaskStatus := range resp.SubTaskStatus {
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
schema, table = "foo", "bar"
ID = fmt.Sprintf("%s-`%s`.`%s`", taskName, schema, table)
i11 = pessimism.NewInfo(taskName, sources[0], schema, table, DDLs)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false, false)
)
_, err = pessimism.PutInfo(etcdTestCli, i11)
c.Assert(err, check.IsNil)
Expand Down
49 changes: 29 additions & 20 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ func (p *Pessimist) buildLocks(etcdCli *clientv3.Client) (int64, int64, error) {
}
p.logger.Info("get history shard DDL lock operation", zap.Reflect("operation", opm), zap.Int64("revision", rev2))

latestDoneDDLsMap, _, err := pessimism.GetAllLatestDoneDDLs(etcdCli)
if err != nil {
return 0, 0, err
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = p.recoverLocks(ifm, opm)
err = p.recoverLocks(ifm, opm, latestDoneDDLsMap)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
Expand Down Expand Up @@ -291,7 +296,7 @@ func (p *Pessimist) UnlockLock(ctx context.Context, id, replaceOwner string, for

// 2. check whether has resolved before (this often should not happen).
if lock.IsResolved() {
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
return err
}
Expand Down Expand Up @@ -398,20 +403,25 @@ func (p *Pessimist) RemoveMetaData(task string) error {
p.lk.RemoveLock(op.ID)
}

p.lk.RemoveLatestDoneDDLsByTask(task)
Comment thread
lance6716 marked this conversation as resolved.
Outdated
// clear meta data in etcd
_, err = pessimism.DeleteInfosOperationsByTask(p.cli, task)
_, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task)
return err
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error {
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info,
opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string]map[string]map[string][]string) error {
// add all last done ddls.
p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap)

// construct locks based on the shard DDL info.
for task, ifs := range ifm {
sources := p.taskSources(task)
// if no operation exists for the lock, we let the smallest (lexicographical order) source as the owner of the lock.
// if any operation exists for the lock, we let the source with `exec=true` as the owner of the lock (the logic is below).
for _, info := range pessimismInfoMapToSlice(ifs) {
_, _, _, err := p.lk.TrySync(info, sources)
_, _, _, err := p.lk.TrySync(p.cli, info, sources)
if err != nil {
return err
}
Expand Down Expand Up @@ -471,7 +481,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
p.logger.Info("receive a shard DDL info", zap.Stringer("info", info))

p.infoOpMu.Lock()
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
lockID, synced, remain, err := p.lk.TrySync(p.cli, info, p.taskSources(info.Task))
if err != nil {
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
Expand Down Expand Up @@ -530,7 +540,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
if lock.IsResolved() {
p.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op))
// remove all operations for this shard DDL lock.
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
Expand Down Expand Up @@ -576,11 +586,9 @@ func (p *Pessimist) handleLock(lockID, source string) error {
if lock.IsResolved() {
// remove all operations for this shard DDL lock.
// this is to handle the case where dm-master exit before deleting operations for them.
err := p.removeLock(lock)
if err != nil {
if err := p.removeLockPutDDLs(lock); err != nil {
return err
}
return nil
Comment thread
lance6716 marked this conversation as resolved.
}

// check whether the owner has done.
Expand All @@ -601,7 +609,7 @@ func (p *Pessimist) handleLock(lockID, source string) error {

// putOpForOwner PUTs the shard DDL lock operation for the owner into etcd.
func (p *Pessimist) putOpForOwner(lock *pessimism.Lock, owner string, skipDone bool) error {
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false)
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false, false)
rev, succ, err := pessimism.PutOperations(p.cli, skipDone, op)
if err != nil {
return err
Expand All @@ -625,7 +633,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s

ops := make([]pessimism.Operation, 0, len(sources))
for _, source := range sources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}

rev, succ, err := pessimism.PutOperations(p.cli, skipDone, ops...)
Expand All @@ -637,9 +645,9 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s
}

// removeLock removes the lock in memory and its information in etcd.
Comment thread
lance6716 marked this conversation as resolved.
Outdated
func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error {
// remove all operations for this shard DDL lock.
if err := p.deleteOps(lock); err != nil {
if err := p.deleteOpsPutDDLs(lock); err != nil {
return err
}

Expand Down Expand Up @@ -672,23 +680,24 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
}
})
p.lk.RemoveLock(lock.ID)
p.lk.AddLatestDoneDDLs(lock.ID, lock.DDLs)
return nil
}

// deleteOps DELETEs shard DDL lock operations relative to the lock.
func (p *Pessimist) deleteOps(lock *pessimism.Lock) error {
func (p *Pessimist) deleteOpsPutDDLs(lock *pessimism.Lock) error {
ready := lock.Ready()
ops := make([]pessimism.Operation, 0, len(ready))
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}
rev, err := pessimism.DeleteOperations(p.cli, ops...)
rev, err := pessimism.DeleteOperationsPutDDLs(p.cli, lock.ID, ops, lock.DDLs)
if err != nil {
return err
}
p.logger.Info("delete shard DDL lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
p.logger.Info("delete shard DDL lock operations and put latest done ddls", zap.String("lock", lock.ID), zap.Int64("revision", rev), zap.Strings("ddls", lock.DDLs))
return err
}

Expand All @@ -704,7 +713,7 @@ func (p *Pessimist) deleteInfosOps(lock *pessimism.Lock) error {
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}

rev, err := pessimism.DeleteInfosOperations(p.cli, infos, ops)
Expand Down Expand Up @@ -785,7 +794,7 @@ func (p *Pessimist) waitNonOwnerToBeDone(ctx context.Context, lock *pessimism.Lo
// we still put `skip` operations for waitSources one more time with `skipDone=true`.
ops := make([]pessimism.Operation, 0, len(waitSources))
for _, source := range waitSources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}
rev, succ, err := pessimism.PutOperations(p.cli, true, ops...)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {
c.Assert(p.Locks()[ID2].IsDone(source2), IsFalse)

// mark exec operation for one non-owner as `done` (and delete the info).
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true)
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op22c, i22)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand All @@ -325,7 +325,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {

// mark skip operation for the non-owner as `done` (and delete the info).
// the lock should become resolved and deleted.
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true)
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op23c, i23)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand Down Expand Up @@ -844,7 +844,7 @@ func (t *testPessimist) TestMeetEtcdCompactError(c *C) {
ID1 = fmt.Sprintf("%s-`%s`.`%s`", task1, schema, table)
i11 = pessimism.NewInfo(task1, source1, schema, table, DDLs)
i12 = pessimism.NewInfo(task1, source2, schema, table, DDLs)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false, false)
revCompacted int64

infoCh chan pessimism.Info
Expand Down
84 changes: 84 additions & 0 deletions pkg/shardddl/pessimism/ddls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
"context"
"encoding/json"

"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

// putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls.
// This operation should often be sent by DM-master.
func putLatestDoneDDLsOp(task, downSchema, downTable string, ddls []string) (clientv3.Op, error) {
data, err := json.Marshal(ddls)
if err != nil {
return clientv3.Op{}, err
}
key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(task, downSchema, downTable)

return clientv3.OpPut(key, string(data)), nil
}

// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd.
func PutLatestDoneDDLs(cli *clientv3.Client, task, downSchema, downTable string, ddls []string) (int64, error) {
putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls)
if err != nil {
return 0, err
}
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, putOp)
return rev, err
}

// GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently.
// k/v: task -> downSchema -> downTable -> DDLs
// This function should often be called by DM-master.
func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string]map[string]map[string][]string, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Get(ctx, common.ShardDDLPessimismDDLsKeyAdapter.Path(), clientv3.WithPrefix())
if err != nil {
return nil, 0, err
}

ddlsMap := make(map[string]map[string]map[string][]string, len(resp.Kvs))
for _, kv := range resp.Kvs {
var ddls []string
if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil {
return nil, 0, err2
}
keys, err2 := common.ShardDDLPessimismDDLsKeyAdapter.Decode(string(kv.Key))
if err2 != nil {
return nil, 0, err2
}
task := keys[0]
downSchema := keys[1]
downTable := keys[2]

if _, ok := ddlsMap[task]; !ok {
ddlsMap[task] = make(map[string]map[string][]string)
}
if _, ok := ddlsMap[task][downSchema]; !ok {
ddlsMap[task][downSchema] = make(map[string][]string)
}
ddlsMap[task][downSchema][downTable] = ddls
}

return ddlsMap, resp.Header.Revision, nil
}
58 changes: 58 additions & 0 deletions pkg/shardddl/pessimism/ddls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
. "github.com/pingcap/check"

"github.com/pingcap/dm/pkg/utils"
)

func (t *testForEtcd) TestDDLsEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
ID1 = "test1-`foo`.`bar`"
ID2 = "test2-`foo`.`bar`"
task1, downSchema1, downTable1 = utils.ExtractAllFromLockID(ID1)
task2, downSchema2, downTable2 = utils.ExtractAllFromLockID(ID2)
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
)

// put the same keys twice.
rev1, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1)
c.Assert(err, IsNil)
rev2, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// put another DDLs
rev3, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs2)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)

// put for another lock
rev4, err := PutLatestDoneDDLs(etcdTestCli, task2, downSchema2, downTable2, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)

// get all ddls.
latestDoneDDLsMap, rev5, err := GetAllLatestDoneDDLs(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(latestDoneDDLsMap, HasLen, 2)
c.Assert(latestDoneDDLsMap[task1][downSchema1][downTable1], DeepEquals, DDLs2)
c.Assert(latestDoneDDLsMap[task2][downSchema2][downTable2], DeepEquals, DDLs1)
}
2 changes: 1 addition & 1 deletion pkg/shardddl/pessimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) {
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table))
info = NewInfo(task, source, schema, table, DDLs)
op = NewOperation(ID, task, source, DDLs, false, false)
op = NewOperation(ID, task, source, DDLs, false, false, false)
)

// put info success because no operation exist.
Expand Down
Loading