diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 7a019e49030..7a5df534e96 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -62,9 +62,10 @@ const PoolTypeAttr = "PoolType" // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type JSONGateResolver struct { - target resolver.Target - clientConn resolver.ClientConn - poolType string + target resolver.Target + clientConn resolver.ClientConn + poolType string + currentAddrs []resolver.Address } func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} @@ -210,15 +211,24 @@ func (b *JSONGateResolverBuilder) start() error { } parseCount.Add("changed", 1) + var wg sync.WaitGroup + + // notify all the resolvers that the targets changed in parallel, since each update might sleep for + // the warmup time b.mu.RLock() - // notify all the resolvers that the targets changed for _, r := range b.resolvers { - err = b.update(r) - if err != nil { - log.Errorf("Failed to update resolver: %v", err) - } + wg.Add(1) + go func(r *JSONGateResolver) { + defer wg.Done() + + err = b.update(r) + if err != nil { + log.Errorf("Failed to update resolver: %v", err) + } + }(r) } b.mu.RUnlock() + wg.Wait() } }() @@ -393,8 +403,17 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error { addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)}) } - log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) + // If we've already selected some targets, give the new addresses some time to warm up before removing + // the old ones from the list + if r.currentAddrs != nil && warmupTime.Seconds() > 0 { + combined := append(r.currentAddrs, addrs...) + log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets) + r.clientConn.UpdateState(resolver.State{Addresses: combined}) + time.Sleep(*warmupTime) + } + log.V(100).Infof("updating targets for %s after warmup to %v", r.target.URL.String(), targets) + r.currentAddrs = addrs return r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index d25961e8b30..a9bbbfe9343 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -60,6 +60,7 @@ var ( addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") balancerType = flag.String("balancer", "round_robin", "load balancing algorithm to use") + warmupTime = flag.Duration("warmup_time", 30*time.Second, "time to maintain connections to previously selected hosts") timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")