Skip to content

Commit

Permalink
kubevirt: Handle adding new node
Browse files Browse the repository at this point in the history
If a new node is added we have the ip pool has to allocate the vm ips
if node is taking over the vm subnet.

Signed-off-by: Enrique Llorente <[email protected]>
  • Loading branch information
qinqon committed Jul 18, 2023
1 parent 7dfaabe commit f508156
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 4 deletions.
10 changes: 10 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,16 @@ func (wf *WatchFactory) GetPodsBySelector(namespace string, labelSelector metav1
return podLister.Pods(namespace).List(selector)
}

// GetAllPodsBySelector returns all the pods in all namespace by the label selector
func (wf *WatchFactory) GetAllPodsBySelector(labelSelector metav1.LabelSelector) ([]*kapi.Pod, error) {
podLister := wf.informers[PodType].lister.(listers.PodLister)
selector, err := metav1.LabelSelectorAsSelector(&labelSelector)
if err != nil {
return nil, err
}
return podLister.List(selector)
}

// GetNodes returns the node specs of all the nodes
func (wf *WatchFactory) GetNodes() ([]*kapi.Node, error) {
return wf.ListNodes(labels.Everything())
Expand Down
68 changes: 68 additions & 0 deletions go-controller/pkg/kubevirt/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,71 @@ func SyncVirtualMachines(nbClient libovsdbclient.Client, vms map[ktypes.Namespac
}
return nil
}

// FindLiveMigratablePods will return all the pods with a `vm.kubevirt.io`
// label filtered by `kubevirt.io/allow-pod-bridge-network-live-migration`
// annotation
func FindLiveMigratablePods(watchFactory *factory.WatchFactory) ([]*corev1.Pod, error) {
vmPods, err := watchFactory.GetAllPodsBySelector(
metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{
Key: kubevirtv1.VirtualMachineNameLabel,
Operator: metav1.LabelSelectorOpExists,
}},
},
)
if err != nil {
return nil, fmt.Errorf("failed looking for live migratable pods: %v", err)
}
liveMigratablePods := []*corev1.Pod{}
for _, vmPod := range vmPods {
if IsPodLiveMigratable(vmPod) {
liveMigratablePods = append(liveMigratablePods, vmPod)
}
}
return liveMigratablePods, nil
}

// AllowPodBridgeNetworkLiveMigrationAnnotation will refill ip pool in
// case the node has take over the vm subnet for live migrated vms
func allocateSyncMigratablePodIPs(watchFactory *factory.WatchFactory, lsManager *logicalswitchmanager.LogicalSwitchManager, nodeName, nadName string, pod *corev1.Pod, allocatePodIPsOnSwitch func(*corev1.Pod, *util.PodAnnotation, string, string) (string, error)) error {
isStale, err := IsMigratedSourcePodStale(watchFactory, pod)
if err != nil {
return err
}

// We care only for Running virt-launcher pods
if isStale {
return nil
}

annotation, err := util.UnmarshalPodAnnotation(pod.Annotations, nadName)
if err != nil {
return nil
}
switchName, zoneContainsPodSubnet := ZoneContainsPodSubnet(lsManager, annotation)
// If this zone do not own the subnet or the node that is passed
// do not match the switch, they should not be deallocated
if !zoneContainsPodSubnet || (nodeName != "" && switchName != nodeName) {
return nil
}
if _, err := allocatePodIPsOnSwitch(pod, annotation, nadName, switchName); err != nil {
return err
}
return nil
}

// AllowPodBridgeNetworkLiveMigrationAnnotation will refill ip pool in
// case the node has take over the vm subnet for live migrated vms
func AllocateSyncMigratablePodsIPsOnNode(watchFactory *factory.WatchFactory, lsManager *logicalswitchmanager.LogicalSwitchManager, nodeName, nadName string, allocatePodIPsOnSwitch func(*corev1.Pod, *util.PodAnnotation, string, string) (string, error)) error {
liveMigratablePods, err := FindLiveMigratablePods(watchFactory)
if err != nil {
return err
}
for _, liveMigratablePod := range liveMigratablePods {
if err := allocateSyncMigratablePodIPs(watchFactory, lsManager, nodeName, nadName, liveMigratablePod, allocatePodIPsOnSwitch); err != nil {
return err
}
}
return nil
}
16 changes: 12 additions & 4 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type DefaultNetworkController struct {
nodeClusterRouterPortFailed sync.Map
hybridOverlayFailed sync.Map
syncZoneICFailed sync.Map
syncMigratablePodsFailed sync.Map

// variable to determine if all pods present on the node during startup have been processed
// updated atomically
Expand Down Expand Up @@ -739,15 +740,19 @@ func (h *defaultNetworkControllerEventHandler) AddResource(obj interface{}, from
_, gwSync := h.oc.gatewaysFailed.Load(node.Name)
_, hoSync := h.oc.hybridOverlayFailed.Load(node.Name)
_, zoneICSync := h.oc.syncZoneICFailed.Load(node.Name)
_, syncMigratablePods := h.oc.syncMigratablePodsFailed.Load(node.Name)
nodeParams = &nodeSyncs{
nodeSync,
clusterRtrSync,
mgmtSync,
gwSync,
hoSync,
zoneICSync}
zoneICSync,
syncMigratablePods}
} else {
nodeParams = &nodeSyncs{true, true, true, true, config.HybridOverlay.Enabled, config.OVNKubernetesFeature.EnableInterconnect}
nodeHostSubnets, _ := util.ParseNodeHostSubnetAnnotation(node, ovntypes.DefaultNetworkName)
syncMigratablePods := nodeHostSubnets != nil
nodeParams = &nodeSyncs{true, true, true, true, config.HybridOverlay.Enabled, config.OVNKubernetesFeature.EnableInterconnect, syncMigratablePods}
}

if err = h.oc.addUpdateLocalNodeEvent(node, nodeParams); err != nil {
Expand Down Expand Up @@ -887,18 +892,21 @@ func (h *defaultNetworkControllerEventHandler) UpdateResource(oldObj, newObj int
nodeGatewayMTUSupportChanged(oldNode, newNode))
_, hoSync := h.oc.hybridOverlayFailed.Load(newNode.Name)
_, syncZoneIC := h.oc.syncZoneICFailed.Load(newNode.Name)
_, failed = h.oc.syncMigratablePodsFailed.Load(newNode.Name)
syncMigratablePods := failed || nodeSubnetChanged(oldNode, newNode)
nodeSyncsParam = &nodeSyncs{
nodeSync,
clusterRtrSync,
mgmtSync,
gwSync,
hoSync,
syncZoneIC}
syncZoneIC,
syncMigratablePods}
} else {
klog.Infof("Node %s moved from the remote zone %s to local zone.",
newNode.Name, util.GetNodeZone(oldNode), util.GetNodeZone(newNode))
// The node is now a local zone node. Trigger a full node sync.
nodeSyncsParam = &nodeSyncs{true, true, true, true, true, config.OVNKubernetesFeature.EnableInterconnect}
nodeSyncsParam = &nodeSyncs{true, true, true, true, true, config.OVNKubernetesFeature.EnableInterconnect, true}
}

return h.oc.addUpdateLocalNodeEvent(newNode, nodeSyncsParam)
Expand Down
40 changes: 40 additions & 0 deletions go-controller/pkg/ovn/kubevirt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kubevirt"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
ovntest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb"
libovsdbtest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb"
ovntypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
Expand Down Expand Up @@ -80,6 +81,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
migrationTarget testMigrationTarget
remoteNodes []string
interconnected bool
replaceNode string
dnsServiceIPs []string
lrpNetworks []string
dhcpv4 []testDHCPOptions
Expand Down Expand Up @@ -688,6 +690,31 @@ var _ = Describe("OVN Kubevirt Operations", func() {
)
}
Eventually(fakeOvn.nbClient).Should(libovsdbtest.HaveData(expectedOVN), "should populate ovn")
if t.replaceNode != "" {
By("Replace vm node with newNode at the logical switch manager")
newNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "newNode1",
Annotations: map[string]string{
"k8s.ovn.org/node-subnets": fmt.Sprintf(`{"default":[%q,%q]}`, nodeByName[t.replaceNode].subnetIPv4, nodeByName[t.replaceNode].subnetIPv6),
},
},
}
fakeOvn.controller.lsManager.DeleteSwitch(t.replaceNode)
fakeOvn.controller.lsManager.AddOrUpdateSwitch(newNode.Name, ovntest.MustParseIPNets(
nodeByName[t.replaceNode].subnetIPv4,
nodeByName[t.replaceNode].subnetIPv6,
))

Expect(fakeOvn.controller.addUpdateLocalNodeEvent(newNode, &nodeSyncs{syncMigratablePods: true})).To(Succeed())

podToCreate, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Get(context.TODO(), podToCreate.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
podAnnotation, err := util.UnmarshalPodAnnotation(podToCreate.Annotations, ovntypes.DefaultNetworkName)
Expect(err).ToNot(HaveOccurred())

Expect(fakeOvn.controller.lsManager.AllocateIPs(newNode.Name, podAnnotation.IPs)).ToNot(Succeed(), "should allocate the pod IPs when node is replaced")
}

if t.podName != "" {
err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(t.namespace).Delete(context.TODO(), t.podName, metav1.DeleteOptions{})
Expand All @@ -709,6 +736,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4},
dnsServiceIPs: []string{dnsServiceIPv4},
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv4"),
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand All @@ -720,6 +748,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4},
dnsServiceIPs: []string{dnsServiceIPv4},
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv4"),
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand All @@ -732,11 +761,13 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4},
dnsServiceIPs: []string{dnsServiceIPv4},
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv4"),
replaceNode: node1,
}),
Entry("for single stack ipv6 at global zone", testData{
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv6"),
replaceNode: node1,
expectedDhcpv6: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv6,
dns: dnsServiceIPv6,
Expand All @@ -748,6 +779,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
dnsServiceIPs: []string{dnsServiceIPv6},
interconnected: true,
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv6"),
replaceNode: node1,
expectedDhcpv6: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv6,
dns: dnsServiceIPv6,
Expand All @@ -760,11 +792,13 @@ var _ = Describe("OVN Kubevirt Operations", func() {
interconnected: true,
remoteNodes: []string{node1},
testVirtLauncherPod: virtLauncher1(node1, vm1, "ipv6"),
replaceNode: node1,
}),
Entry("for dual stack at global zone", testData{
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv4, dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node1, vm1, "dualstack"),
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand All @@ -781,6 +815,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv4, dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node1, vm1, "dualstack"),
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand All @@ -798,6 +833,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv4, dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node1, vm1, "dualstack"),
replaceNode: node1,
}),

Entry("for pre-copy live migration at global zone", testData{
Expand All @@ -808,6 +844,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node2].lrpNetworkIPv4, nodeByName[node2].lrpNetworkIPv6},
testVirtLauncherPod: virtLauncher2(node2, vm1, "dualstack"),
},
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand Down Expand Up @@ -847,6 +884,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv4, dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node1, vm1, "dualstack"),
replaceNode: node1,
migrationTarget: testMigrationTarget{
lrpNetworks: []string{nodeByName[node2].lrpNetworkIPv4, nodeByName[node2].lrpNetworkIPv6},
testVirtLauncherPod: virtLauncher2(node2, vm1, "dualstack"),
Expand Down Expand Up @@ -892,6 +930,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
dnsServiceIPs: []string{dnsServiceIPv4, dnsServiceIPv6},
testVirtLauncherPod: virtLauncher1(node2, vm1, "dualstack"),
replaceNode: node1,
expectedStaticRoutes: []testStaticRoute{
{
prefix: vmByName[vm1].addressIPv4,
Expand Down Expand Up @@ -938,6 +977,7 @@ var _ = Describe("OVN Kubevirt Operations", func() {
lrpNetworks: []string{nodeByName[node1].lrpNetworkIPv4, nodeByName[node1].lrpNetworkIPv6},
testVirtLauncherPod: virtLauncher2(node1, vm1, "dualstack"),
},
replaceNode: node1,
expectedDhcpv4: []testDHCPOptions{{
cidr: nodeByName[node1].subnetIPv4,
dns: dnsServiceIPv4,
Expand Down
18 changes: 18 additions & 0 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
libovsdbclient "github.com/ovn-org/libovsdb/client"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kubevirt"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/sbdb"
ovntypes "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/pkg/errors"

hotypes "github.com/ovn-org/ovn-kubernetes/go-controller/hybrid-overlay/pkg/types"
Expand Down Expand Up @@ -758,6 +760,7 @@ type nodeSyncs struct {
syncGw bool
syncHo bool
syncZoneIC bool
syncMigratablePods bool
}

func (oc *DefaultNetworkController) addUpdateLocalNodeEvent(node *kapi.Node, nSyncs *nodeSyncs) error {
Expand Down Expand Up @@ -890,6 +893,15 @@ func (oc *DefaultNetworkController) addUpdateLocalNodeEvent(node *kapi.Node, nSy
}
}
}

if nSyncs.syncMigratablePods {
if err := oc.allocateSyncMigratablePodIPs(node); err != nil {
errs = append(errs, err)
oc.syncMigratablePodsFailed.Store(node.Name, true)
} else {
oc.syncMigratablePodsFailed.Delete(node.Name)
}
}
return kerrors.NewAggregate(errs)
}

Expand Down Expand Up @@ -1004,3 +1016,9 @@ func (oc *DefaultNetworkController) getOVNClusterRouterPortToJoinSwitchIfAddrs()

return gwLRPIPs, nil
}
func (oc *DefaultNetworkController) allocateSyncMigratablePodIPs(node *kapi.Node) error {
allocatePodIPsOnSwitchWrapFn := func(liveMigratablePod *kapi.Pod, liveMigratablePodAnnotation *util.PodAnnotation, switchName, nadName string) (string, error) {
return oc.allocatePodIPsOnSwitch(liveMigratablePod, liveMigratablePodAnnotation, switchName, nadName)
}
return kubevirt.AllocateSyncMigratablePodsIPsOnNode(oc.watchFactory, oc.lsManager, node.Name, ovntypes.DefaultNetworkName, allocatePodIPsOnSwitchWrapFn)
}

0 comments on commit f508156

Please sign in to comment.