Skip to content

Commit

Permalink
Merge pull request #3755 from trozet/create_transit_globally
Browse files Browse the repository at this point in the history
Fixes race across node workers to create transit switch
  • Loading branch information
trozet committed Jul 12, 2023
2 parents 6df1825 + c8ec7c0 commit 437d2ab
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (cm *networkControllerManager) newCommonNetworkControllerInfo() (*ovn.Commo

// initDefaultNetworkController creates the controller for default network
func (cm *networkControllerManager) initDefaultNetworkController() error {
defaultController, err := ovn.NewDefaultNetworkController(cm.newCommonNetworkControllerInfo())
cnci, err := cm.newCommonNetworkControllerInfo()
if err != nil {
return fmt.Errorf("failed to create common network controller info: %w", err)
}
defaultController, err := ovn.NewDefaultNetworkController(cnci)
if err != nil {
return err
}
Expand Down
28 changes: 25 additions & 3 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type DefaultNetworkController struct {

// NewDefaultNetworkController creates a new OVN controller for creating logical network
// infrastructure and policy for default l3 network
func NewDefaultNetworkController(cnci *CommonNetworkControllerInfo, err error) (*DefaultNetworkController, error) {
func NewDefaultNetworkController(cnci *CommonNetworkControllerInfo) (*DefaultNetworkController, error) {
stopChan := make(chan struct{})
wg := &sync.WaitGroup{}
return newDefaultNetworkControllerCommon(cnci, stopChan, wg, nil)
Expand Down Expand Up @@ -315,7 +315,7 @@ func (oc *DefaultNetworkController) Start(ctx context.Context) error {
if err != nil {
return err
}
if err = oc.Init(); err != nil {
if err = oc.Init(ctx); err != nil {
return err
}

Expand All @@ -337,7 +337,7 @@ func (oc *DefaultNetworkController) Stop() {
// TODO: Verify that the cluster was not already called with a different global subnet
//
// If true, then either quit or perform a complete reconfiguration of the cluster (recreate switches/routers with new subnet values)
func (oc *DefaultNetworkController) Init() error {
func (oc *DefaultNetworkController) Init(ctx context.Context) error {
existingNodes, err := oc.kube.GetNodes()
if err != nil {
klog.Errorf("Error in fetching nodes: %v", err)
Expand Down Expand Up @@ -386,9 +386,15 @@ func (oc *DefaultNetworkController) Init() error {
oc.routerLoadBalancerGroupUUID = loadBalancerGroup.UUID
}

networkID := util.InvalidNetworkID
nodeNames := []string{}
for _, node := range existingNodes.Items {
nodeNames = append(nodeNames, node.Name)

if config.OVNKubernetesFeature.EnableInterconnect && networkID == util.InvalidNetworkID {
// get networkID from any node in the cluster
networkID, _ = util.ParseNetworkIDAnnotation(&node, oc.zoneICHandler.GetNetworkName())
}
}
if err := oc.SetupMaster(nodeNames); err != nil {
klog.Errorf("Failed to setup master (%v)", err)
Expand All @@ -398,6 +404,22 @@ func (oc *DefaultNetworkController) Init() error {
// So execute an individual sync method at startup to cleanup any difference
klog.V(4).Info("Cleaning External Gateway ECMP routes")
WithSyncDurationMetricNoError("external gateway routes", oc.apbExternalRouteController.Repair)

if config.OVNKubernetesFeature.EnableInterconnect {
// if networkID is invalid, then we didn't find it on the initial node list.
// cluster-manager could still be starting and assigning, so execute full Init to search
if networkID == util.InvalidNetworkID {
if err := oc.zoneICHandler.Init(oc.kube, ctx); err != nil {
return err
}
} else {
// we already found the networkID, no need to search
if err := oc.zoneICHandler.EnsureTransitSwitch(networkID); err != nil {
return err
}
}
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/ovn/ovn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
libovsdbclient "github.com/ovn-org/libovsdb/client"
ovncnitypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/cni/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
adminpolicybasedroutefake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1/apis/clientset/versioned/fake"
egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake"
Expand All @@ -25,8 +26,6 @@ import (
egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake"
egressservice "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressservice/v1"
egressservicefake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressservice/v1/apis/clientset/versioned/fake"

adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
Expand Down Expand Up @@ -249,6 +248,7 @@ func NewOvnController(ovnClient *util.OVNMasterClientset, wf *factory.WatchFacto
}

dnc, err := newDefaultNetworkControllerCommon(cnci, stopChan, wg, addressSetFactory)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

if nbZoneFailed {
// Delete the NBGlobal row as this function created it. Otherwise many tests would fail while
Expand Down
1 change: 0 additions & 1 deletion go-controller/pkg/ovn/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
func getPodAnnotations(fakeClient kubernetes.Interface, namespace, name string) string {
pod, err := fakeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
fmt.Printf("########### %s\n", pod.Annotations)
return pod.Annotations[util.OvnPodAnnotationName]
}

Expand Down
9 changes: 7 additions & 2 deletions go-controller/pkg/ovn/secondary_layer3_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (oc *SecondaryLayer3NetworkController) newRetryFramework(
// Start starts the secondary layer3 controller, handles all events and creates all needed logical entities
func (oc *SecondaryLayer3NetworkController) Start(ctx context.Context) error {
klog.Infof("Start secondary %s network controller of network %s", oc.TopologyType(), oc.GetNetworkName())
if err := oc.Init(); err != nil {
if err := oc.Init(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -463,7 +463,12 @@ func (oc *SecondaryLayer3NetworkController) WatchNodes() error {
return err
}

func (oc *SecondaryLayer3NetworkController) Init() error {
func (oc *SecondaryLayer3NetworkController) Init(ctx context.Context) error {
if config.OVNKubernetesFeature.EnableInterconnect {
if err := oc.zoneICHandler.Init(oc.kube, ctx); err != nil {
return err
}
}
_, err := oc.createOvnClusterRouter()
return err
}
Expand Down
108 changes: 64 additions & 44 deletions go-controller/pkg/ovn/zone_interconnect/zone_ic_handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package zoneinterconnect

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"

libovsdbclient "github.com/ovn-org/libovsdb/client"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/sbdb"
Expand Down Expand Up @@ -130,6 +134,66 @@ func NewZoneInterconnectHandler(nInfo util.NetInfo, nbClient, sbClient libovsdbc
return zic
}

func (zic *ZoneInterconnectHandler) EnsureTransitSwitch(networkID int) error {
transitSwitchTunnelKey := BaseTransitSwitchTunnelKey + networkID
ts := &nbdb.LogicalSwitch{
Name: zic.networkTransitSwitchName,
OtherConfig: map[string]string{
"interconn-ts": zic.networkTransitSwitchName,
"requested-tnl-key": strconv.Itoa(transitSwitchTunnelKey),
"mcast_snoop": "true",
"mcast_querier": "false",
"mcast_flood_unregistered": "true",
},
}

// Create transit switch if it doesn't exist
if err := libovsdbops.CreateOrUpdateLogicalSwitch(zic.nbClient, ts); err != nil {
return fmt.Errorf("failed to create/update transit switch %s: %w", zic.networkTransitSwitchName, err)
}
return nil
}

// Init sets up the global transit switch required for interoperability with other zones
// Must wait for network id to be annotated to this node by cluster manager
func (zic *ZoneInterconnectHandler) Init(kube *kube.KubeOVN, ctx context.Context) error {

maxTimeout := 2 * time.Minute
networkID := util.InvalidNetworkID
var err1 error
start := time.Now()
err := wait.PollUntilContextTimeout(ctx, 250*time.Millisecond, maxTimeout, true, func(ctx context.Context) (bool, error) {
nodes, err := kube.GetNodes()
if err != nil {
err1 = fmt.Errorf("failed to get nodes: %v", err)
return false, nil
}
for _, node := range nodes.Items {
networkID, err = util.ParseNetworkIDAnnotation(&node, zic.GetNetworkName())
if err != nil {
err1 = fmt.Errorf("failed to get the network id for the network %s on node %s: %v",
zic.GetNetworkName(), node.Name, err)
}
}
if networkID == util.InvalidNetworkID {
return false, err1
}
return true, nil
})

if err != nil {
return fmt.Errorf("failed to find network ID: %w, %v", err, err1)
}

if err := zic.EnsureTransitSwitch(networkID); err != nil {
return err
}

klog.Infof("Time taken to create transit switch: %s", time.Since(start))

return nil
}

// AddLocalZoneNode creates the interconnect resources in OVN NB DB for the local zone node.
// See createLocalZoneNodeResources() below for more details.
func (zic *ZoneInterconnectHandler) AddLocalZoneNode(node *corev1.Node) error {
Expand Down Expand Up @@ -256,34 +320,12 @@ func (zic *ZoneInterconnectHandler) createLocalZoneNodeResources(node *corev1.No
return fmt.Errorf("failed to get the node transit switch port ips for node %s: %w", node.Name, err)
}

networkId, err := util.ParseNetworkIDAnnotation(node, zic.GetNetworkName())
if err != nil {
return fmt.Errorf("failed to get the network id for the network %s on node %s: %v", zic.GetNetworkName(), node.Name, err)
}

transitRouterPortMac := util.IPAddrToHWAddr(nodeTransitSwitchPortIPs[0].IP)
var transitRouterPortNetworks []string
for _, ip := range nodeTransitSwitchPortIPs {
transitRouterPortNetworks = append(transitRouterPortNetworks, ip.String())
}

transitSwitchTunnelKey := BaseTransitSwitchTunnelKey + networkId
ts := &nbdb.LogicalSwitch{
Name: zic.networkTransitSwitchName,
OtherConfig: map[string]string{
"interconn-ts": zic.networkTransitSwitchName,
"requested-tnl-key": strconv.Itoa(transitSwitchTunnelKey),
"mcast_snoop": "true",
"mcast_querier": "false",
"mcast_flood_unregistered": "true",
},
}

// Create transit switch if it doesn't exist
if err := libovsdbops.CreateOrUpdateLogicalSwitch(zic.nbClient, ts); err != nil {
return fmt.Errorf("failed to create/update transit switch %s: %w", zic.networkTransitSwitchName, err)
}

// Connect transit switch to the cluster router by creating a pair of logical switch port - logical router port
logicalRouterPortName := zic.GetNetworkScopedName(types.RouterToTransitSwitchPrefix + node.Name)
logicalRouterPort := nbdb.LogicalRouterPort{
Expand Down Expand Up @@ -335,28 +377,6 @@ func (zic *ZoneInterconnectHandler) createRemoteZoneNodeResources(node *corev1.N
return fmt.Errorf("failed to get the node transit switch port Ips : %w", err)
}

networkId, err := util.ParseNetworkIDAnnotation(node, zic.GetNetworkName())
if err != nil {
return fmt.Errorf("failed to get the network id for the network %s on node %s: %v", zic.GetNetworkName(), node.Name, err)
}

transitSwitchTunnelKey := BaseTransitSwitchTunnelKey + networkId
ts := &nbdb.LogicalSwitch{
Name: zic.networkTransitSwitchName,
OtherConfig: map[string]string{
"interconn-ts": zic.networkTransitSwitchName,
"requested-tnl-key": strconv.Itoa(transitSwitchTunnelKey),
"mcast_snoop": "true",
"mcast_querier": "false",
"mcast_flood_unregistered": "true",
},
}

// Create transit switch if it doesn't exist
if err := libovsdbops.CreateOrUpdateLogicalSwitch(zic.nbClient, ts); err != nil {
return fmt.Errorf("failed to create/update transit switch %s: %w", zic.networkTransitSwitchName, err)
}

transitRouterPortMac := util.IPAddrToHWAddr(nodeTransitSwitchPortIPs[0].IP)
var transitRouterPortNetworks []string
for _, ip := range nodeTransitSwitchPortIPs {
Expand Down
18 changes: 18 additions & 0 deletions go-controller/pkg/ovn/zone_interconnect/zone_ic_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", types.DefaultNetworkName, libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -425,6 +427,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", types.DefaultNetworkName, libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -505,6 +509,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", types.DefaultNetworkName, libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -555,6 +561,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", types.DefaultNetworkName, libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -672,6 +680,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
netInfo, err := util.NewNetInfo(&ovncnitypes.NetConf{NetConf: cnitypes.NetConf{Name: "blue"}, Topology: types.Layer3Topology})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
zoneICHandler := NewZoneInterconnectHandler(netInfo, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(1)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", "blue", libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -710,6 +720,8 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
netInfo, err := util.NewNetInfo(&ovncnitypes.NetConf{NetConf: cnitypes.NetConf{Name: "blue"}, Topology: types.Layer3Topology})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
zoneICHandler := NewZoneInterconnectHandler(netInfo, libovsdbOvnNBClient, libovsdbOvnSBClient)
err = zoneICHandler.EnsureTransitSwitch(1)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = invokeICHandlerAddNodeFunction("global", zoneICHandler, &testNode1, &testNode2, &testNode3)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = checkInterconnectResources("global", "blue", libovsdbOvnNBClient, testNodesRouteInfo, &testNode1, &testNode2, &testNode3)
Expand Down Expand Up @@ -768,6 +780,9 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
gomega.Expect(zoneICHandler).NotTo(gomega.BeNil())

err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = zoneICHandler.AddLocalZoneNode(&testNode4)
gomega.Expect(err).To(gomega.HaveOccurred(), "failed to get node id for node - node4")

Expand Down Expand Up @@ -856,6 +871,9 @@ var _ = ginkgo.Describe("Zone Interconnect Operations", func() {
zoneICHandler := NewZoneInterconnectHandler(&util.DefaultNetInfo{}, libovsdbOvnNBClient, libovsdbOvnSBClient)
gomega.Expect(zoneICHandler).NotTo(gomega.BeNil())

err = zoneICHandler.EnsureTransitSwitch(0)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = zoneICHandler.AddRemoteZoneNode(&testNode4)
gomega.Expect(err).To(gomega.HaveOccurred(), "failed to get node id for node - node4")

Expand Down

0 comments on commit 437d2ab

Please sign in to comment.