Skip to content

Commit

Permalink
add proper support for pool types to first_ready balancer (#400)
Browse files Browse the repository at this point in the history
Change the discovery layer to pass in the pool type as part of the Address
Attributes, and use that to maintain a map of current connections for each pool
type.
  • Loading branch information
demmer authored and dedelala committed Sep 9, 2024
1 parent b81bef8 commit 6c467e8
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
5 changes: 4 additions & 1 deletion go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sync"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -57,6 +58,8 @@ import (
// type: Only select from hosts of this type (required)
//

const PoolTypeAttr = "PoolType"

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateResolver struct {
target resolver.Target
Expand Down Expand Up @@ -385,7 +388,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {

var addrs []resolver.Address
for _, target := range targets {
addrs = append(addrs, resolver.Address{Addr: target.Addr})
addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)})
}

log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)
Expand Down
46 changes: 24 additions & 22 deletions go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (

// newBuilder creates a new first_ready balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder("first_ready", &frPickerBuilder{}, base.Config{HealthCheck: true})
return base.NewBalancerBuilder("first_ready", &frPickerBuilder{currentConns: map[string]balancer.SubConn{}}, base.Config{HealthCheck: true})
}

func init() {
Expand All @@ -54,8 +54,8 @@ func init() {
// Once a conn is chosen and is in the ready state, it will remain as the
// active subconn even if other connections become available.
type frPickerBuilder struct {
mu sync.Mutex
currentConn balancer.SubConn
mu sync.Mutex
currentConns map[string]balancer.SubConn
}

func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
Expand All @@ -68,33 +68,35 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
f.mu.Lock()
defer f.mu.Unlock()

// If we've already chosen a subconn, and it is still in the ready list, then
// no need to change state
if f.currentConn != nil {
log.V(100).Infof("first_ready: currentConn is active, checking if still ready")
for sc := range info.ReadySCs {
if f.currentConn == sc {
log.V(100).Infof("first_ready: currentConn still active - not changing")
return f
}
// First check if there is a previously chosen subconn in the ready list
for sc, scInfo := range info.ReadySCs {
// poolType is a required attribute to be set by discovery
poolType := scInfo.Address.Attributes.Value(PoolTypeAttr).(string)
currentConn := f.currentConns[poolType]
if currentConn == sc {
log.V(100).Infof("first_ready: conn for pool %s still active - keeping addr %s", poolType, scInfo.Address.Addr)
return &frPicker{sc}
}
}

// Otherwise either we don't have an active conn or the conn we were using is
// no longer active, so pick an arbitrary new one out of the map.
log.V(100).Infof("first_ready: currentConn is not active, picking a new one")
for sc := range info.ReadySCs {
f.currentConn = sc
break
for sc, scInfo := range info.ReadySCs {
poolType := scInfo.Address.Attributes.Value(PoolTypeAttr).(string)
log.V(100).Infof("first_ready: conn for pool %s not active - picked %s", poolType, scInfo.Address.Addr)
f.currentConns[poolType] = sc
return &frPicker{sc}
}

return f
return base.NewErrPicker(errors.New("error should not be reachable"))
}

// Pick simply returns the currently chosen conn
func (f *frPickerBuilder) Pick(balancer.PickInfo) (balancer.PickResult, error) {
f.mu.Lock()
defer f.mu.Unlock()
// frPicker is a trivial picker that just wraps the one chosen conn
type frPicker struct {
currentConn balancer.SubConn
}

return balancer.PickResult{SubConn: f.currentConn}, nil
// Pick simply returns the currently chosen conn
func (p *frPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{SubConn: p.currentConn}, nil
}

0 comments on commit 6c467e8

Please sign in to comment.