diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 545e29ec8f2..7f115eb7ee7 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -187,6 +187,10 @@ const ( VEventType_VERSION VEventType = 17 VEventType_LASTPK VEventType = 18 VEventType_SAVEPOINT VEventType = 19 + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + VEventType_COPY_COMPLETED VEventType = 20 ) // Enum value maps for VEventType. @@ -212,28 +216,30 @@ var ( 17: "VERSION", 18: "LASTPK", 19: "SAVEPOINT", + 20: "COPY_COMPLETED", } VEventType_value = map[string]int32{ - "UNKNOWN": 0, - "GTID": 1, - "BEGIN": 2, - "COMMIT": 3, - "ROLLBACK": 4, - "DDL": 5, - "INSERT": 6, - "REPLACE": 7, - "UPDATE": 8, - "DELETE": 9, - "SET": 10, - "OTHER": 11, - "ROW": 12, - "FIELD": 13, - "HEARTBEAT": 14, - "VGTID": 15, - "JOURNAL": 16, - "VERSION": 17, - "LASTPK": 18, - "SAVEPOINT": 19, + "UNKNOWN": 0, + "GTID": 1, + "BEGIN": 2, + "COMMIT": 3, + "ROLLBACK": 4, + "DDL": 5, + "INSERT": 6, + "REPLACE": 7, + "UPDATE": 8, + "DELETE": 9, + "SET": 10, + "OTHER": 11, + "ROW": 12, + "FIELD": 13, + "HEARTBEAT": 14, + "VGTID": 15, + "JOURNAL": 16, + "VERSION": 17, + "LASTPK": 18, + "SAVEPOINT": 19, + "COPY_COMPLETED": 20, } ) @@ -2981,7 +2987,7 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x4b, 0x55, 0x50, 0x49, 0x4e, 0x44, 0x45, 0x58, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x4d, 0x49, 0x47, 0x52, 0x41, 0x54, 0x45, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x53, 0x48, 0x41, 0x52, 0x44, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09, 0x4f, 0x4e, 0x4c, 0x49, 0x4e, 0x45, 0x44, 0x44, - 0x4c, 0x10, 0x05, 0x2a, 0xf9, 0x01, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, + 0x4c, 0x10, 0x05, 0x2a, 0x8d, 0x02, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47, 0x54, 0x49, 0x44, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, @@ -2996,13 +3002,14 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x09, 0x0a, 0x05, 0x56, 0x47, 0x54, 0x49, 0x44, 0x10, 0x0f, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x55, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x10, 0x12, 0x0b, 0x0a, 0x07, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x11, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x53, 0x54, 0x50, 0x4b, 0x10, 0x12, - 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x10, 0x13, 0x2a, - 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, - 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, - 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x10, 0x13, 0x12, + 0x12, 0x0a, 0x0e, 0x43, 0x4f, 0x50, 0x59, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, + 0x44, 0x10, 0x14, 0x2a, 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, + 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, + 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, + 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, + 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index 17cf3e6dd01..c91c61ec2cd 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -45,6 +45,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_basic( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_copy_resume( id1 bigint, id2 bigint, @@ -139,6 +145,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_basic": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_copy_resume": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index a13aac8291d..832799366b1 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -169,7 +169,7 @@ func TestVStreamCopyBasic(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + _, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) if err != nil { t.Fatal(err) } @@ -180,7 +180,7 @@ func TestVStreamCopyBasic(t *testing.T) { } qr := sqltypes.ResultToProto3(&lastPK) tablePKs := []*binlogdatapb.TableLastPK{{ - TableName: "t1", + TableName: "t1_copy_basic", Lastpk: qr, }} var shardGtids []*binlogdatapb.ShardGtid @@ -200,8 +200,8 @@ func TestVStreamCopyBasic(t *testing.T) { vgtid.ShardGtids = shardGtids filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", + Match: "t1_copy_basic", + Filter: "select * from t1_copy_basic", }}, } flags := &vtgatepb.VStreamFlags{} @@ -210,19 +210,44 @@ func TestVStreamCopyBasic(t *testing.T) { if err != nil { t.Fatal(err) } - numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */) + numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */ + expectedCompletedEvents := []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED`, + } require.NotNil(t, reader) var evs []*binlogdatapb.VEvent + var completedEvs []*binlogdatapb.VEvent for { e, err := reader.Recv() switch err { case nil: evs = append(evs, e...) + + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { + completedEvs = append(completedEvs, ev) + } + } + + printEvents(evs) // for debugging ci failures + if len(evs) == numExpectedEvents { + // The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. + // On the other hand, the last event should always be a fully COPY_COMPLETED event. + // That's why the sort.Slice doesn't have to handle the last element in completedEvs. + sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool { + return completedEvs[i].GetShard() < completedEvs[j].GetShard() + }) + for i, ev := range completedEvs { + require.Regexp(t, expectedCompletedEvents[i], ev.String()) + } t.Logf("TestVStreamCopyBasic was successful") return + } else if numExpectedEvents < len(evs) { + t.Fatalf("len(events)=%v are not expected\n", len(evs)) } - printEvents(evs) // for debugging ci failures case io.EOF: log.Infof("stream ended\n") cancel() diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index c1ae7e209bb..a960720dced 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1336,6 +1336,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar vsm: vsm, eventCh: make(chan []*binlogdatapb.VEvent), ts: ts, + copyCompletedShard: make(map[string]struct{}), } _ = vs.stream(ctx) return nil diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 8c6dd9f04f4..a815cdc4f31 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -110,6 +110,9 @@ type vstream struct { // the timestamp of the most recent event, keyed by streamId. streamId is of the form . timestamps map[string]int64 + // the shard map tracking the copy completion, keyed by streamId. streamId is of the form . + copyCompletedShard map[string]struct{} + vsm *vstreamManager eventCh chan []*binlogdatapb.VEvent @@ -171,6 +174,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta eventCh: make(chan []*binlogdatapb.VEvent), heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, + copyCompletedShard: make(map[string]struct{}), } return vs.stream(ctx) } @@ -598,6 +602,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { + return err + } + eventss = nil + sendevents = nil + case binlogdatapb.VEventType_COPY_COMPLETED: + sendevents = append(sendevents, event) + if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied { + sendevents = append(sendevents, doneEvent) + } + eventss = append(eventss, sendevents) + + if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { + return err + } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err } @@ -733,6 +753,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e return nil } +// isCopyFullyCompleted returns true if all stream has received a copy_completed event. +// If true, it will also return a new copy_completed event that needs to be sent. +// This new event represents the completion of all the copy operations. +func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) { + vs.mu.Lock() + defer vs.mu.Unlock() + + vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{} + + for _, shard := range vs.vgtid.ShardGtids { + if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok { + return false, nil + } + } + return true, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } +} + func (vs *vstream) getError() error { vs.errMu.Lock() defer vs.errMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 056d5da1822..2508a625ea1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -414,7 +414,9 @@ func (uvs *uvstreamer) Stream() error { uvs.vse.errorCounts.Add("Copy", 1) return err } - uvs.sendTestEvent("Copy Done") + if err := uvs.allCopyComplete(); err != nil { + return err + } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) @@ -457,6 +459,17 @@ func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) { uvs.plans[tableName].tablePK.Lastpk = qr } +func (uvs *uvstreamer) allCopyComplete() error { + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } + + if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil { + return err + } + return nil +} + // dummy event sent only in test mode func (uvs *uvstreamer) sendTestEvent(msg string) { if !uvstreamerTestMode { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 1ed673ebf90..610b9012f7f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -233,7 +233,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { } - callbacks["OTHER.*Copy Done"] = func() { + callbacks["COPY_COMPLETED"] = func() { log.Info("Copy done, inserting events to stream") insertRow(t, "t1", 1, numInitialRows+4) insertRow(t, "t2", 2, numInitialRows+3) @@ -252,7 +252,7 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) - numCopyEvents += 2 /* GTID + Test event after all copy is done */ + numCopyEvents += 2 /* GTID + Event after all copy is done */ numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ @@ -539,7 +539,7 @@ var expectedEvents = []string{ "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}", "type:COMMIT", - "type:OTHER gtid:\"Copy Done\"", + "type:COPY_COMPLETED", "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"14140\"}}}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 43abed35e23..c2e6f8cef55 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -447,7 +447,7 @@ func TestVStreamCopySimpleFlow(t *testing.T) { testcases := []testcase{ { input: []string{}, - output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}}, + output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}}, }, { @@ -2178,6 +2178,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog if evs[i].Type != binlogdatapb.VEventType_DDL { t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) } + case "copy_completed": + if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED { + t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i]) + } default: evs[i].Timestamp = 0 if evs[i].Type == binlogdatapb.VEventType_FIELD { diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 8433e105025..d697482a2f7 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -289,6 +289,10 @@ enum VEventType { VERSION = 17; LASTPK = 18; SAVEPOINT = 19; + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + COPY_COMPLETED = 20; } // RowChange represents one row change. diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 9c2155bfa86..d36051d73d7 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -49368,7 +49368,8 @@ export namespace binlogdata { JOURNAL = 16, VERSION = 17, LASTPK = 18, - SAVEPOINT = 19 + SAVEPOINT = 19, + COPY_COMPLETED = 20 } /** Properties of a RowChange. */ diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index a439f77b73c..11040565de6 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -116986,6 +116986,7 @@ $root.binlogdata = (function() { * @property {number} VERSION=17 VERSION value * @property {number} LASTPK=18 LASTPK value * @property {number} SAVEPOINT=19 SAVEPOINT value + * @property {number} COPY_COMPLETED=20 COPY_COMPLETED value */ binlogdata.VEventType = (function() { var valuesById = {}, values = Object.create(valuesById); @@ -117009,6 +117010,7 @@ $root.binlogdata = (function() { values[valuesById[17] = "VERSION"] = 17; values[valuesById[18] = "LASTPK"] = 18; values[valuesById[19] = "SAVEPOINT"] = 19; + values[valuesById[20] = "COPY_COMPLETED"] = 20; return values; })(); @@ -119251,6 +119253,7 @@ $root.binlogdata = (function() { case 17: case 18: case 19: + case 20: break; } if (message.timestamp != null && message.hasOwnProperty("timestamp")) @@ -119398,6 +119401,10 @@ $root.binlogdata = (function() { case 19: message.type = 19; break; + case "COPY_COMPLETED": + case 20: + message.type = 20; + break; } if (object.timestamp != null) if ($util.Long)