Skip to content

Commit

Permalink
apply patch 11909 (#131)
Browse files Browse the repository at this point in the history
* apply patch 11909

Signed-off-by: Priya Bibra <[email protected]>

* fix typo

Signed-off-by: Priya Bibra <[email protected]>

---------

Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 14, 2023
1 parent 0906d05 commit c057af8
Show file tree
Hide file tree
Showing 8 changed files with 518 additions and 59 deletions.
168 changes: 165 additions & 3 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

const schemaUnsharded = `
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
insert into customer_seq(id, next_id, cache) values(0, 1, 3);
`
const vschemaUnsharded = `
{
Expand Down Expand Up @@ -218,14 +219,19 @@ const vschemaSharded = `
func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false)
vtgateConn.ExecuteFetch("commit", 1000, false)
}

type numEvents struct {
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64
}

// tests the StopOnReshard flag
Expand Down Expand Up @@ -375,6 +381,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
return &ne
}

// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces
// Ensure that there are no missing row events during the resharding process.
func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents {
defaultCellName := "zone1"
allCellNames = defaultCellName
allCells := []string{allCellNames}
vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig)

require.NotNil(t, vc)
ogdr := defaultReplicas
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// We want to confirm that the following two tables are streamed.
// 1. the customer_seq in the unsharded keyspace
// 2. the customer table in the sharded keyspace
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
done := false

id := 1000
// First goroutine that keeps inserting rows into the table being streamed until a minute after reshard
// We should keep getting events on the new shards
go func() {
for {
if done {
return
}
id++
time.Sleep(1 * time.Second)
insertRow("sharded", "customer", id)
}
}()
// stream events from the VStream API
var ne numEvents
reshardDone := false
go func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "0":
if reshardDone {
ne.numShard0AfterReshardEvents++
} else {
ne.numShard0BeforeReshardEvents++
}
case "-80":
ne.numLessThan80Events++
case "80-":
ne.numGreaterThan80Events++
case "-40":
ne.numLessThan40Events++
case "40-":
ne.numGreaterThan40Events++
}
ne.numRowEvents++
case binlogdatapb.VEventType_JOURNAL:
ne.numJournalEvents++
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("Returned err %v", err)
done = true
}
if done {
return
}
}
}()

ticker := time.NewTicker(1 * time.Second)
tickCount := 0
for {
<-ticker.C
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshardDone = true
case 60:
done = true
}
if done {
break
}
}
log.Infof("ne=%v", ne)

// The number of row events streamed by the VStream API should match the number of rows inserted.
// This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process.
//
// On the other hand, we don't verify the exact number of row events for the unsharded keyspace
// because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1.
// We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding,
// is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward.
customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer")
insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0])
require.NoError(t, err)
require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events)
return ne
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down Expand Up @@ -406,3 +556,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) {

testVStreamWithFailover(t, false)
}

func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
ne := testVStreamCopyMultiKeyspaceReshard(t, 3000)
require.Equal(t, int64(0), ne.numJournalEvents)
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numShard0BeforeReshardEvents)
require.NotZero(t, ne.numShard0AfterReshardEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}
68 changes: 60 additions & 8 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ create table t1_copy_basic(
primary key(id1)
) Engine=InnoDB;
create table t1_copy_all(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table t1_copy_resume(
id1 bigint,
id2 bigint,
Expand Down Expand Up @@ -151,6 +157,12 @@ create table t1_sharded(
Name: "hash",
}},
},
"t1_copy_all": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down Expand Up @@ -218,6 +230,31 @@ create table t1_sharded(
},
},
}

schema2 = `
create table t1_copy_all_ks2(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
`

vschema2 = &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1_copy_all_ks2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
},
}
)

func TestMain(m *testing.M) {
Expand All @@ -226,21 +263,36 @@ func TestMain(m *testing.M) {
exitCode := func() int {
var cfg vttest.Config
cfg.Topology = &vttestpb.VTTestTopology{
Keyspaces: []*vttestpb.Keyspace{{
Name: "ks",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
}},
Keyspaces: []*vttestpb.Keyspace{
{
Name: "ks",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
},
{
Name: "ks2",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
},
},
}
if err := cfg.InitSchemas("ks", schema, vschema); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
}
defer os.RemoveAll(cfg.SchemaDir)
if err := cfg.InitSchemas("ks2", schema2, vschema2); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
}

cfg.TabletHostName = *tabletHostName

Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtgate/endtoend/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package endtoend
import (
"context"
"fmt"
osExec "os/exec"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -55,6 +56,16 @@ func TestCreateAndDropDatabase(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

// cleanup the keyspace from the topology.
defer func() {
// the corresponding database needs to be created in advance.
// a subsequent DeleteKeyspace command returns the error of 'node doesn't exist' without it.
_ = exec(t, conn, "create database testitest")

_, err := osExec.Command("vtctldclient", "--server", grpcAddress, "DeleteKeyspace", "--recursive", "--force", "testitest").CombinedOutput()
require.NoError(t, err)
}()

// run it 3 times.
for count := 0; count < 3; count++ {
t.Run(fmt.Sprintf("exec:%d", count), func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/endtoend/row_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/utils"
)

func TestRowCount(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()
utils.Exec(t, conn, "use ks")
type tc struct {
query string
expected int
Expand Down
Loading

0 comments on commit c057af8

Please sign in to comment.