Skip to content

Commit

Permalink
backport 13582
Browse files Browse the repository at this point in the history
Signed-off-by: Priya Bibra <[email protected]>
  • Loading branch information
pbibra committed Sep 26, 2023
1 parent 6cb6e9b commit 3d4f94f
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 93 deletions.
100 changes: 51 additions & 49 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@ limitations under the License.
package discovery

import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/vt/log"

"context"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type TabletPickerCellPreference int
Expand Down Expand Up @@ -280,13 +278,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

// PickForStreaming picks an available tablet
// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
// or the context is canceled
// Keep trying at intervals (tabletPickerRetryDelay) until a healthy
// serving tablet is found or the context is cancelled.
for {
select {
case <-ctx.Done():
Expand All @@ -302,7 +299,7 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
sameAliasCandidates = tp.orderByTabletType(sameAliasCandidates)
allOtherCandidates = tp.orderByTabletType(allOtherCandidates)
} else {
// Randomize candidates
// Randomize candidates.
rand.Shuffle(len(sameCellCandidates), func(i, j int) {
sameCellCandidates[i], sameCellCandidates[j] = sameCellCandidates[j], sameCellCandidates[i]
})
Expand All @@ -325,9 +322,9 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
})
}
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds",
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
Expand All @@ -338,66 +335,57 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
for _, ti := range candidates {
// try to connect to tablet
if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil {
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
}
// err found
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
}
// Got here? Means we iterated all tablets and did not find a healthy one
tp.incNoTabletFoundStat()
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}

// GetMatchingTablets returns a list of TabletInfo for tablets
// that match the cells, keyspace, shard and tabletTypes for this TabletPicker
// GetMatchingTablets returns a list of TabletInfo for healthy
// serving tablets that match the cells, keyspace, shard and
// tabletTypes for this TabletPicker.
func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo {
// Special handling for PRIMARY tablet type
// Since there is only one primary, we ignore cell and find the primary
// Special handling for PRIMARY tablet type: since there is only
// one primary per shard, we ignore cell and find the primary.
aliases := make([]*topodatapb.TabletAlias, 0)
if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY {
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard)
if err != nil {
log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error())
log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err)
return nil
}
aliases = append(aliases, si.PrimaryAlias)
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
// check if cell is actually an alias
// non-blocking read so that this is fast
// Check if cell is actually an alias; using a
// non-blocking read so that this is fast.
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
_, err := tp.ts.GetCellInfo(shortCtx, cell, false)
if err != nil {
// not a valid cell, check whether it is a cell alias
// Not a valid cell, check whether it is a cell alias...
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false)
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
// If we get an error, either cellAlias doesn't exist or
// it isn't a cell alias at all; ignore and continue.
if err == nil {
actualCells = append(actualCells, alias.Cells...)
} else {
log.Infof("Unable to resolve cell %s, ignoring", cell)
}
} else {
// valid cell, add it to our list
// Valid cell, add it to our list.
actualCells = append(actualCells, cell)
}
}

for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
// match cell, keyspace and shard
// Match cell, keyspace, and shard.
sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard)
if err != nil {
continue
Expand All @@ -412,33 +400,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if len(aliases) == 0 {
return nil
}

shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
if err != nil {
log.Warningf("error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
if len(tabletMap) == 0 {
return nil
}
}

tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
if !ok {
// Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores
// topo.ErrNoNode). Just log a warning
log.Warningf("failed to load tablet %v", tabletAlias)
// Either tablet disappeared on us, or we got a partial result
// (GetTabletMap ignores topo.ErrNoNode); just log a warning.
log.Warningf("Tablet picker failed to load tablet %v", tabletAlias)
} else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) {
tablets = append(tablets, tabletInfo)
// Try to connect to the tablet and confirm that it's usable.
if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil {
// Ensure that the tablet is healthy and serving.
shortCtx, cancel := context.WithTimeout(ctx, *topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
}
_ = conn.Close(ctx)
}
}
}
return tablets
}

func init() {
// TODO(sougou): consolidate this call to be once per process.
rand.Seed(time.Now().UnixNano())
globalTPStats = newTabletPickerStats()
}

Expand Down
65 changes: 54 additions & 11 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestPickPrimary(t *testing.T) {
Expand Down Expand Up @@ -479,6 +480,45 @@ func TestPickErrorOnlySpecified(t *testing.T) {
require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0))
}

// TestPickFallbackType tests that when providing a list of tablet types to
// pick from, with the list in preference order, that when the primary/first
// type has no available healthy serving tablets that we select a healthy
// serving tablet from the secondary/second type.
func TestPickFallbackType(t *testing.T) {
cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,primary"
options := TabletPickerOptions{
TabletOrder: "InOrder",
}
te := newPickerTestEnv(t, cells)

// This one should be selected even though it's the secondary type
// as it is healthy and serving.
primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Replica tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -527,18 +567,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell
err := te.topoServ.CreateTablet(context.Background(), tablet)
require.NoError(te.t, err)

shr := &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"},
}
if healthy {
_ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: ""},
})
shr.RealtimeStats.HealthError = ""
}

_ = createFixedHealthConn(tablet, shr)

return tablet
}

Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,9 @@ func (sbc *SandboxConn) MessageAck(ctx context.Context, target *querypb.Target,
// SandboxSQRowCount is the default number of fake splits returned.
var SandboxSQRowCount = int64(10)

// StreamHealth is not implemented.
// StreamHealth always mocks a "healthy" result.
func (sbc *SandboxConn) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
return fmt.Errorf("not implemented in test")
return nil
}

// ExpectVStreamStartPos makes the conn verify that that the next vstream request has the right startPos.
Expand Down Expand Up @@ -632,7 +632,7 @@ func (sbc *SandboxConn) getTxReservedID(txID int64) int64 {
return sbc.txIDToRID[txID]
}

//StringQueries returns the queries executed as a slice of strings
// StringQueries returns the queries executed as a slice of strings
func (sbc *SandboxConn) StringQueries() []string {
result := make([]string, len(sbc.Queries))
for i, query := range sbc.Queries {
Expand Down
Loading

0 comments on commit 3d4f94f

Please sign in to comment.