Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 53 additions & 21 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,22 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err
// connections with open transactions, holding locks on the table.
// This is done on a best-effort basis, by issuing `KILL` and `KILL QUERY` commands. As MySQL goes,
// it is not guaranteed that the queries/transactions will terminate in a timely manner.
func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableName string) error {
log.Infof("killTableLockHoldersAndAccessors: %v", tableName)
func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, uuid string, tableName string, excludeIds ...int64) error {
log.Infof("killTableLockHoldersAndAccessors %v:, table-%v", uuid, tableName)
Comment thread
shlomi-noach marked this conversation as resolved.
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
if err != nil {
return err
}
defer conn.Close()

skipKill := func(threadId int64) bool {
for _, excludeId := range excludeIds {
if threadId == excludeId {
return true
}
}
return false
}
{
// First, let's look at PROCESSLIST for queries that _might_ be operating on our table. This may have
// plenty false positives as we're simply looking for the table name as a query substring.
Expand All @@ -647,10 +655,14 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
return vterrors.Wrapf(err, "finding queries potentially operating on table")
}

log.Infof("killTableLockHoldersAndAccessors: found %v potential queries", len(rs.Rows))
log.Infof("killTableLockHoldersAndAccessors %v: found %v potential queries", uuid, len(rs.Rows))
// Now that we have some list of queries, we actually parse them to find whether the query actually references our table:
for _, row := range rs.Named().Rows {
threadId := row.AsInt64("id", 0)
if skipKill(threadId) {
log.Infof("killTableLockHoldersAndAccessors %v: skipping thread %v as it is excluded", uuid, threadId)
continue
}
infoQuery := row.AsString("info", "")
stmt, err := e.env.Environment().Parser().Parse(infoQuery)
if err != nil {
Expand All @@ -677,7 +689,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}, stmt)

if queryUsesTable {
log.Infof("killTableLockHoldersAndAccessors: killing query %v: %.100s", threadId, infoQuery)
log.Infof("killTableLockHoldersAndAccessors %v: killing query %v: %.100s", uuid, threadId, infoQuery)
killQuery := fmt.Sprintf("KILL QUERY %d", threadId)
if _, err := conn.Conn.ExecuteFetch(killQuery, 1, false); err != nil {
log.Error(vterrors.Errorf(vtrpcpb.Code_ABORTED, "could not kill query %v. Ignoring", threadId))
Expand All @@ -702,14 +714,18 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
if err != nil {
return vterrors.Wrapf(err, "finding transactions locking table `%s` %s", tableName, description)
}
log.Infof("terminateTransactions: found %v transactions locking table `%s` %s", len(rs.Rows), tableName, description)
log.Infof("terminateTransactions %v: found %v transactions locking table `%s` %s", uuid, len(rs.Rows), tableName, description)
for _, row := range rs.Named().Rows {
threadId := row.AsInt64(column, 0)
log.Infof("terminateTransactions: killing connection %v with transaction locking table `%s` %s", threadId, tableName, description)
if skipKill(threadId) {
log.Infof("terminateTransactions %v: skipping thread %v as it is excluded", uuid, threadId)
continue
}
log.Infof("terminateTransactions %v: killing connection %v with transaction locking table `%s` %s", uuid, threadId, tableName, description)
killConnection := fmt.Sprintf("KILL %d", threadId)
_, err = conn.Conn.ExecuteFetch(killConnection, 1, false)
if err != nil {
log.Errorf("terminateTransactions: unable to kill the connection %d locking table `%s` %s: %v", threadId, tableName, description, err)
log.Errorf("terminateTransactions %v: unable to kill the connection %d locking table `%s` %s: %v", uuid, threadId, tableName, description, err)
}
}
return nil
Expand Down Expand Up @@ -855,7 +871,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// impacts query serving so we wait for a multiple of the cutover threshold here, with
// that variable primarily serving to limit the max time we later spend waiting for
// a position again AFTER we've taken the locks and table access is blocked.
if err := waitForPos(s, postSentryPos, onlineDDL.CutOverThreshold*3); err != nil {
if err := waitForPos(s, postSentryPos, 3*onlineDDL.CutOverThreshold); err != nil {
return vterrors.Wrapf(err, "failed waiting for pos after sentry creation")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
Expand All @@ -868,7 +884,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer lockConn.Recycle()
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `onlineDDL.CutOverThreshold` will be terminated.
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*onlineDDL.CutOverThreshold)
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 3*onlineDDL.CutOverThreshold)
if err != nil {
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on locking connection")
}
Expand All @@ -883,7 +899,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `onlineDDL.CutOverThreshold` will be terminated.
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*onlineDDL.CutOverThreshold*4)
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 2*onlineDDL.CutOverThreshold)
if err != nil {
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on rename connection")
}
Expand Down Expand Up @@ -993,7 +1009,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
return vterrors.Wrapf(err, "checking prepared pool for table")
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.UUID, onlineDDL.Table); err != nil {
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}
Expand All @@ -1013,25 +1029,36 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// real production

e.updateMigrationStage(ctx, onlineDDL.UUID, "locking tables")
lockCtx, cancel := context.WithTimeout(ctx, onlineDDL.CutOverThreshold)
defer cancel()
lockCtx, killWhileRenamingCancel := context.WithTimeout(ctx, onlineDDL.CutOverThreshold)
defer killWhileRenamingCancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Conn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return vterrors.Wrapf(err, "failed locking tables")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
killWhileRenamingContext, killWhileRenamingCancel := context.WithCancel(ctx)
defer killWhileRenamingCancel()
// We run the RENAME in a goroutine, so that we can wait for
go func() {
defer close(renameCompleteChan)
_, err := renameConn.Conn.Exec(ctx, renameQuery.Query, 1, false)
renameCompleteChan <- err
killWhileRenamingCancel() // RENAME is done, no need to kill queries anymore
Comment thread
shlomi-noach marked this conversation as resolved.
}()
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return vterrors.Wrapf(err, "failed waiting for rename process")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")

if shouldForceCutOver {
log.Infof("cutOverVReplMigration %v: force cut-over requested, killing table lock holders and accessors while RENAME is in place", s.workflow)
Comment thread
shlomi-noach marked this conversation as resolved.
if err := e.killTableLockHoldersAndAccessors(killWhileRenamingContext, onlineDDL.UUID, onlineDDL.Table, lockConn.Conn.ID(), renameConn.Conn.ID()); err != nil {
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
Expand Down Expand Up @@ -1107,7 +1134,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err := <-renameCompleteChan; err != nil {
return vterrors.Wrapf(err, "failed waiting for rename to complete")
}
renameWasSuccessful = true
renameWasSuccessful = true // Migration effectively successful
}
}
}
Expand Down Expand Up @@ -2518,7 +2545,7 @@ func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Conte

if forceCutOverAfter > 0 {
// Irrespective of the --force-cut-over-after flag value, as long as it's nonzero, we now terminate
// connections adn transactions on the migrated table.
// connections and transactions on the migrated table.
// --force-cut-over-after was designed to work with `vitess` migrations, that could cut-over multiple times,
// and was meant to set a limit to the overall duration of the attempts, for example 1 hour.
// With INSTANT DDL or other quick operations, this becomes meaningless. Once we begin the operation, there
Expand All @@ -2531,7 +2558,7 @@ func (e *Executor) executeSpecialAlterDirectDDLActionMigration(ctx context.Conte
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
return vterrors.Wrapf(err, "checking prepared pool for table")
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.UUID, onlineDDL.Table); err != nil {
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}
Expand Down Expand Up @@ -3019,11 +3046,16 @@ func shouldCutOverAccordingToBackoff(
// is beyond the --force-cut-over-after setting, or the column `force_cutover` is "1", and this means:
// - we do not want to backoff, we want to cutover asap
// - we agree to brute-force KILL any pending queries on the migrated table so as to ensure it's unlocked.
if forceCutOverAfter > 0 && sinceReadyToComplete > forceCutOverAfter {
// time since migration was ready to complete is beyond the --force-cut-over-after setting
return true, true
if forceCutOverAfter > 0 {
if sinceReadyToComplete > forceCutOverAfter {
// time since migration was ready to complete is beyond the --force-cut-over-after setting
return true, true
}
if forceCutOverAfter <= time.Millisecond {
// --force-cut-over-after is set so low that it is effectively "now", even if "sinceReadyToComplete" is lower.
return true, true
}
}

// Backoff mechanism. Do not attempt to cut-over every single minute. Check how much time passed since last cut-over attempt
desiredTimeSinceLastCutover := cutoverIntervals[len(cutoverIntervals)-1]
if int(cutoverAttempts) < len(cutoverIntervals) {
Expand Down Expand Up @@ -3204,7 +3236,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
}
if err := e.cutOverVReplMigration(ctx, s, shouldForceCutOver); err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
log.Errorf("cutOverVReplMigration failed: err=%v", err)
log.Errorf("cutOverVReplMigration failed %s: err=%v", onlineDDL.UUID, err)

if sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError); isSQLErr && sqlErr != nil {
// let's see if this error is actually acceptable
Expand Down
14 changes: 11 additions & 3 deletions go/vt/vttablet/onlineddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,21 @@ func TestShouldCutOverAccordingToBackoff(t *testing.T) {
expectShouldForceCutOver: true,
},
{
name: "microsecond, not ready",
name: "2 milliseoncds, not ready",
Comment thread
shlomi-noach marked this conversation as resolved.
Outdated
cutoverAttempts: 3,
forceCutOverAfter: time.Millisecond,
sinceReadyToComplete: time.Microsecond,
forceCutOverAfter: 2 * time.Millisecond,
sinceReadyToComplete: time.Millisecond,
expectShouldCutOver: false,
expectShouldForceCutOver: false,
},
{
name: "microsecond, ready irrespective of sinceReadyToComplete",
cutoverAttempts: 3,
forceCutOverAfter: time.Millisecond,
sinceReadyToComplete: time.Microsecond,
expectShouldCutOver: true,
expectShouldForceCutOver: true,
},
{
name: "cutover-after overrides backoff",
cutoverAttempts: 3,
Expand Down
Loading