Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// ShardDDLPessimismOperationKeyAdapter is used to store shard DDL operation in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL operation.
ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/")
// ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model.
// k/v: Encode(lockID) -> DDLs.
ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/")
Comment thread
lance6716 marked this conversation as resolved.

// ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask.
// In other words, if any Info for this subtask exists, we should obey source tables in the Info.
Expand Down Expand Up @@ -94,7 +97,7 @@ func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter:
UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, ShardDDLPessimismDDLsKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
schema, table = "foo", "bar"
ID = fmt.Sprintf("%s-`%s`.`%s`", taskName, schema, table)
i11 = pessimism.NewInfo(taskName, sources[0], schema, table, DDLs)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false, false)
)
_, err = pessimism.PutInfo(etcdTestCli, i11)
c.Assert(err, check.IsNil)
Expand Down
51 changes: 31 additions & 20 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ func (p *Pessimist) buildLocks(etcdCli *clientv3.Client) (int64, int64, error) {
}
p.logger.Info("get history shard DDL lock operation", zap.Reflect("operation", opm), zap.Int64("revision", rev2))

latestDoneDDLsMap, _, err := pessimism.GetAllLatestDoneDDLs(etcdCli)
if err != nil {
return 0, 0, err
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = p.recoverLocks(ifm, opm)
err = p.recoverLocks(ifm, opm, latestDoneDDLsMap)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
Expand Down Expand Up @@ -291,7 +296,7 @@ func (p *Pessimist) UnlockLock(ctx context.Context, id, replaceOwner string, for

// 2. check whether has resolved before (this often should not happen).
if lock.IsResolved() {
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
return err
}
Expand Down Expand Up @@ -394,24 +399,31 @@ func (p *Pessimist) RemoveMetaData(task string) error {
for _, info := range infos {
p.lk.RemoveLockByInfo(info)
}
lockIDSet := make(map[string]struct{}, len(ops))
Comment thread
lance6716 marked this conversation as resolved.
Outdated
for _, op := range ops {
p.lk.RemoveLock(op.ID)
lockIDSet[op.ID] = struct{}{}
}

lockIDs := p.lk.RemoveLatestDoneDDLsByTask(task)
// clear meta data in etcd
_, err = pessimism.DeleteInfosOperationsByTask(p.cli, task)
_, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task, lockIDs)
return err
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error {
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info,
opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string][]string) error {
// add all last done ddls.
p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap)

// construct locks based on the shard DDL info.
for task, ifs := range ifm {
sources := p.taskSources(task)
// if no operation exists for the lock, we let the smallest (lexicographical order) source as the owner of the lock.
// if any operation exists for the lock, we let the source with `exec=true` as the owner of the lock (the logic is below).
for _, info := range pessimismInfoMapToSlice(ifs) {
_, _, _, err := p.lk.TrySync(info, sources)
_, _, _, err := p.lk.TrySync(p.cli, info, sources)
if err != nil {
return err
}
Expand Down Expand Up @@ -471,7 +483,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
p.logger.Info("receive a shard DDL info", zap.Stringer("info", info))

p.infoOpMu.Lock()
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
lockID, synced, remain, err := p.lk.TrySync(p.cli, info, p.taskSources(info.Task))
if err != nil {
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
Expand Down Expand Up @@ -530,7 +542,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
if lock.IsResolved() {
p.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op))
// remove all operations for this shard DDL lock.
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
Expand Down Expand Up @@ -576,11 +588,9 @@ func (p *Pessimist) handleLock(lockID, source string) error {
if lock.IsResolved() {
// remove all operations for this shard DDL lock.
// this is to handle the case where dm-master exit before deleting operations for them.
err := p.removeLock(lock)
if err != nil {
if err := p.removeLockPutDDLs(lock); err != nil {
return err
}
return nil
Comment thread
lance6716 marked this conversation as resolved.
}

// check whether the owner has done.
Expand All @@ -601,7 +611,7 @@ func (p *Pessimist) handleLock(lockID, source string) error {

// putOpForOwner PUTs the shard DDL lock operation for the owner into etcd.
func (p *Pessimist) putOpForOwner(lock *pessimism.Lock, owner string, skipDone bool) error {
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false)
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false, false)
rev, succ, err := pessimism.PutOperations(p.cli, skipDone, op)
if err != nil {
return err
Expand All @@ -625,7 +635,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s

ops := make([]pessimism.Operation, 0, len(sources))
for _, source := range sources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}

rev, succ, err := pessimism.PutOperations(p.cli, skipDone, ops...)
Expand All @@ -637,9 +647,9 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s
}

// removeLock removes the lock in memory and its information in etcd.
Comment thread
lance6716 marked this conversation as resolved.
Outdated
func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error {
// remove all operations for this shard DDL lock.
if err := p.deleteOps(lock); err != nil {
if err := p.deleteOpsPutDDLs(lock); err != nil {
return err
}

Expand Down Expand Up @@ -672,24 +682,25 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
}
})
p.lk.RemoveLock(lock.ID)
p.lk.AddLatestDoneDDLs(lock.ID, lock.DDLs)
metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone)
return nil
}

// deleteOps DELETEs shard DDL lock operations relative to the lock.
func (p *Pessimist) deleteOps(lock *pessimism.Lock) error {
func (p *Pessimist) deleteOpsPutDDLs(lock *pessimism.Lock) error {
ready := lock.Ready()
ops := make([]pessimism.Operation, 0, len(ready))
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}
rev, err := pessimism.DeleteOperations(p.cli, ops...)
rev, err := pessimism.DeleteOperationsPutDDLs(p.cli, lock.ID, ops, lock.DDLs)
if err != nil {
return err
}
p.logger.Info("delete shard DDL lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
p.logger.Info("delete shard DDL lock operations and put latest done ddls", zap.String("lock", lock.ID), zap.Int64("revision", rev), zap.Strings("ddls", lock.DDLs))
return err
}

Expand All @@ -705,7 +716,7 @@ func (p *Pessimist) deleteInfosOps(lock *pessimism.Lock) error {
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}

rev, err := pessimism.DeleteInfosOperations(p.cli, infos, ops)
Expand Down Expand Up @@ -786,7 +797,7 @@ func (p *Pessimist) waitNonOwnerToBeDone(ctx context.Context, lock *pessimism.Lo
// we still put `skip` operations for waitSources one more time with `skipDone=true`.
ops := make([]pessimism.Operation, 0, len(waitSources))
for _, source := range waitSources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}
rev, succ, err := pessimism.PutOperations(p.cli, true, ops...)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {
c.Assert(p.Locks()[ID2].IsDone(source2), IsFalse)

// mark exec operation for one non-owner as `done` (and delete the info).
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true)
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op22c, i22)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand All @@ -325,7 +325,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {

// mark skip operation for the non-owner as `done` (and delete the info).
// the lock should become resolved and deleted.
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true)
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op23c, i23)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand Down Expand Up @@ -844,7 +844,7 @@ func (t *testPessimist) TestMeetEtcdCompactError(c *C) {
ID1 = fmt.Sprintf("%s-`%s`.`%s`", task1, schema, table)
i11 = pessimism.NewInfo(task1, source1, schema, table, DDLs)
i12 = pessimism.NewInfo(task1, source2, schema, table, DDLs)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false, false)
revCompacted int64

infoCh chan pessimism.Info
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,6 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand All @@ -801,7 +800,6 @@ github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJ
github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0=
github.com/pingcap/parser v0.0.0-20200924053142-5d7e8ebf605e/go.mod h1:RlLfMRJwFBSiXd2lUaWdV5pSXtrpyvZM8k5bbZWsheU=
github.com/pingcap/parser v0.0.0-20210125075924-ffe0fda947cb/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE=
github.com/pingcap/parser v0.0.0-20210324190955-ab6d0f2c18ee h1:q5PF2QMZ9iGTCQOuWNnXJ5m1C2VifTOns/n6rvCe5rw=
github.com/pingcap/parser v0.0.0-20210324190955-ab6d0f2c18ee/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74 h1:FkVEC3Fck3fD16hMObMl/IWs72jR9FmqPn0Bdf728Sk=
github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw=
Expand Down Expand Up @@ -1337,7 +1335,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
85 changes: 85 additions & 0 deletions pkg/shardddl/pessimism/ddls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
"context"
"encoding/json"

"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

// putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls.
// This operation should often be sent by DM-master.
func putLatestDoneDDLsOp(lockID string, ddls []string) (clientv3.Op, error) {
data, err := json.Marshal(ddls)
if err != nil {
return clientv3.Op{}, err
}
key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID)

return clientv3.OpPut(key, string(data)), nil
}

// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd.
func PutLatestDoneDDLs(cli *clientv3.Client, lockID string, ddls []string) (int64, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may use above function

data, err := json.Marshal(ddls)
if err != nil {
return 0, err
}
value := string(data)
key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID)

ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Put(ctx, key, value)
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}

// GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently.
// k/v: lockID -> DDLs
// This function should often be called by DM-master.
func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string][]string, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Get(ctx, common.ShardDDLPessimismDDLsKeyAdapter.Path(), clientv3.WithPrefix())
if err != nil {
return nil, 0, err
}

ddlsMap := make(map[string][]string, len(resp.Kvs))
for _, kv := range resp.Kvs {
var ddls []string
if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil {
return nil, 0, err2
}
keys, err2 := common.ShardDDLPessimismDDLsKeyAdapter.Decode(string(kv.Key))
if err2 != nil {
return nil, 0, err2
}
lockID := keys[0]

ddlsMap[lockID] = ddls
}

return ddlsMap, resp.Header.Revision, nil
}
56 changes: 56 additions & 0 deletions pkg/shardddl/pessimism/ddls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
. "github.com/pingcap/check"
)

func (t *testForEtcd) TestDDLsEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
ID1 = "test1-`foo`.`bar`"
ID2 = "test2-`foo`.`bar`"
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
)

// put the same keys twice.
rev1, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1)
c.Assert(err, IsNil)
rev2, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// put another DDLs
rev3, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs2)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)

// put for another lock
rev4, err := PutLatestDoneDDLs(etcdTestCli, ID2, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)

// get all ddls.
latestDoneDDLsMap, rev5, err := GetAllLatestDoneDDLs(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(latestDoneDDLsMap, HasLen, 2)
c.Assert(latestDoneDDLsMap, HasKey, ID1)
c.Assert(latestDoneDDLsMap, HasKey, ID2)
c.Assert(latestDoneDDLsMap[ID1], DeepEquals, DDLs2)
c.Assert(latestDoneDDLsMap[ID2], DeepEquals, DDLs1)
}
2 changes: 1 addition & 1 deletion pkg/shardddl/pessimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) {
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table))
info = NewInfo(task, source, schema, table, DDLs)
op = NewOperation(ID, task, source, DDLs, false, false)
op = NewOperation(ID, task, source, DDLs, false, false, false)
)

// put info success because no operation exist.
Expand Down
Loading