Skip to content

Commit

Permalink
tabletbalancer: use tablet list to detect topology changes lazily
Browse files Browse the repository at this point in the history
Signed-off-by: Venkatraju V <[email protected]>
  • Loading branch information
venkatraju committed Jun 26, 2024
1 parent 5e03248 commit b1f1810
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 63 deletions.
52 changes: 29 additions & 23 deletions go/vt/vtgate/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ converge on the desired balanced query load.
type TabletBalancer interface {
// Randomly shuffle the tablets into an order for routing queries
ShuffleTablets(target *querypb.Target, tablets []*discovery.TabletHealth)

// Callback when the topology changes to invalidate any cached state
TopologyChanged()
}

func NewTabletBalancer(localCell string, vtGateCells []string) TabletBalancer {
Expand Down Expand Up @@ -136,6 +133,9 @@ type targetAllocation struct {
// Allocation routed to each tablet from the local cell used for ranking
Allocation map[uint32]int

// Tablets that local cell does not route to
Unallocated map[uint32]struct{}

// Total allocation which is basically 1,000,000 / len(vtgatecells)
TotalAllocation int
}
Expand Down Expand Up @@ -175,27 +175,18 @@ func (b *tabletBalancer) ShuffleTablets(target *querypb.Target, tablets []*disco
}
}

// TopologyChanged is a callback to indicate the topology changed and any cached
// allocations should be cleared
func (b *tabletBalancer) TopologyChanged() {
b.mu.Lock()
defer b.mu.Unlock()

b.allocations = map[discovery.KeyspaceShardTabletType]*targetAllocation{}
}

// To stick with integer arithmetic, use 1,000,000 as the full load
const ALLOCATION = 1000000

func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *targetAllocation {

a := targetAllocation{}

// Initialization: Set up some data structures and derived values
a.Target = make(map[string]int)
a.Inflows = make(map[string]int)
a.Outflows = make(map[string]map[string]int)
a.Allocation = make(map[uint32]int)
a := targetAllocation{
Target: map[string]int{},
Inflows: map[string]int{},
Outflows: map[string]map[string]int{},
Allocation: map[uint32]int{},
Unallocated: map[uint32]struct{}{},
}
flowPerVtgateCell := ALLOCATION / len(b.vtGateCells)
flowPerTablet := ALLOCATION / len(allTablets)
cellExistsWithNoTablets := false
Expand Down Expand Up @@ -326,6 +317,8 @@ func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *ta
if flow > 0 {
a.Allocation[tablet.Tablet.Alias.Uid] = flow * flowPerTablet / a.Target[cell]
a.TotalAllocation += flow * flowPerTablet / a.Target[cell]
} else {
a.Unallocated[tablet.Tablet.Alias.Uid] = struct{}{}
}
}

Expand All @@ -334,15 +327,28 @@ func (b *tabletBalancer) allocateFlows(allTablets []*discovery.TabletHealth) *ta

// getAllocation builds the allocation map if needed and returns a copy of the map
func (b *tabletBalancer) getAllocation(target *querypb.Target, tablets []*discovery.TabletHealth) (map[uint32]int, int) {

b.mu.Lock()
defer b.mu.Unlock()

allocation, exists := b.allocations[discovery.KeyFromTarget(target)]
if !exists {
allocation = b.allocateFlows(tablets)
b.allocations[discovery.KeyFromTarget(target)] = allocation
if exists && (len(allocation.Allocation)+len(allocation.Unallocated)) == len(tablets) {
mismatch := false
for _, tablet := range tablets {
if _, ok := allocation.Allocation[tablet.Tablet.Alias.Uid]; !ok {
if _, ok := allocation.Unallocated[tablet.Tablet.Alias.Uid]; !ok {
mismatch = true
break
}
}
}
if !mismatch {
// No change in tablets for this target. Return computed allocation
return allocation.Allocation, allocation.TotalAllocation
}
}

allocation = b.allocateFlows(tablets)
b.allocations[discovery.KeyFromTarget(target)] = allocation

return allocation.Allocation, allocation.TotalAllocation
}
32 changes: 5 additions & 27 deletions go/vt/vtgate/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,11 @@ func TestTopologyChanged(t *testing.T) {
}
}

// Run again with the full topology, but without triggering a topology change
// event to cause a reallocation
tablets2 := allTablets
// Run again with the full topology. Now traffic should go to cell b
for i := 0; i < N; i++ {
b.ShuffleTablets(target, tablets2)
b.ShuffleTablets(target, allTablets)

allocation, totalAllocation := b.getAllocation(target, tablets2)
allocation, totalAllocation := b.getAllocation(target, allTablets)

if totalAllocation != ALLOCATION/2 {
t.Errorf("totalAllocation mismatch %s", b.print())
Expand All @@ -366,28 +364,8 @@ func TestTopologyChanged(t *testing.T) {
t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell)
}

if tablets2[0].Tablet.Alias.Cell != "a" {
t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell)
}
}

// Trigger toplogy changed event, now traffic should go to b
b.TopologyChanged()
for i := 0; i < N; i++ {
b.ShuffleTablets(target, tablets2)

allocation, totalAllocation := b.getAllocation(target, tablets2)

if totalAllocation != ALLOCATION/2 {
t.Errorf("totalAllocation mismatch %s", b.print())
}

if allocation[allTablets[0].Tablet.Alias.Uid] != ALLOCATION/4 {
t.Errorf("allocation mismatch %s, cell %s", b.print(), allTablets[0].Tablet.Alias.Cell)
}

if tablets2[0].Tablet.Alias.Cell != "b" {
t.Errorf("shuffle promoted wrong tablet from cell %s", tablets2[0].Tablet.Alias.Cell)
if allTablets[0].Tablet.Alias.Cell != "b" {
t.Errorf("shuffle promoted wrong tablet from cell %s", allTablets[0].Tablet.Alias.Cell)
}
}
}
13 changes: 0 additions & 13 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,6 @@ func (gw *TabletGateway) setupBalancer(ctx context.Context) {
log.Exitf("balancer_vtgate_cells is required for balanced mode")
}
gw.balancer = balancer.NewTabletBalancer(gw.localCell, balancerVtgateCells)

// subscribe to healthcheck updates so that the balancer can reset its allocation
hcChan := gw.hc.Subscribe()
go func(ctx context.Context, c chan *discovery.TabletHealth, balancer balancer.TabletBalancer) {
for {
select {
case <-ctx.Done():
return
case <-hcChan:
balancer.TopologyChanged()
}
}
}(ctx, hcChan, gw.balancer)
}

// QueryServiceByAlias satisfies the Gateway interface
Expand Down

0 comments on commit b1f1810

Please sign in to comment.