Skip to content

Commit

Permalink
Create StatusManager - a centralized component responsible for updating
Browse files Browse the repository at this point in the history
the centralized status of an object, based on zone-specific statuses.
Created as a part of cluster manager, handles only apbroutepolicy
objects for now.

Remove updated policy check based on timestamp,
since LastTransitionTime precision is in seconds, and the whole test
takes less than a second to complete, therefore all timestamps will
be the same for multiple updates. Just checking expected policy state
is enough for that test.

TODO: unit tests checking Status.Status won't work with the exisitng
setup, since that field is set by cluster manager.

Signed-off-by: Nadia Pinaeva <[email protected]>
  • Loading branch information
npinaeva committed Aug 24, 2023
1 parent e1a7697 commit c28636d
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 50 deletions.
2 changes: 1 addition & 1 deletion dist/templates/ovn-setup.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ rules:
- k8s.ovn.org
resources:
- adminpolicybasedexternalroutes/status
verbs: [ "update"]
verbs: [ "update", "patch"]
- apiGroups:
- policy.networking.k8s.io
resources:
Expand Down
9 changes: 8 additions & 1 deletion go-controller/pkg/clustermanager/clustermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/clustermanager/egressservice"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/clustermanager/status_manager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/unidling"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/healthcheck"
Expand Down Expand Up @@ -43,7 +44,8 @@ type ClusterManager struct {

// unique identity for clusterManager running on different ovnkube-cluster-manager instance,
// used for leader election
identity string
identity string
statusManager *status_manager.StatusManager
}

// NewClusterManager creates a new cluster manager to manage the cluster nodes.
Expand All @@ -65,6 +67,7 @@ func NewClusterManager(ovnClient *util.OVNClusterManagerClientset, wf *factory.W
wf: wf,
recorder: recorder,
identity: identity,
statusManager: status_manager.NewStatusManager(wf.APBRouteInformer(), ovnClient.AdminPolicyRouteClient),
}

if config.OVNKubernetesFeature.EnableMultiNetwork {
Expand Down Expand Up @@ -146,6 +149,10 @@ func (cm *ClusterManager) Start(ctx context.Context) error {
}
}

if err := cm.statusManager.Start(); err != nil {
return err
}

return nil
}

Expand Down
133 changes: 133 additions & 0 deletions go-controller/pkg/clustermanager/status_manager/status_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package status_manager

import (
"context"
"fmt"
"strings"

adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
adminpolicybasedrouteclient "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1/apis/clientset/versioned"
adminpolicybasedinformers "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1/apis/informers/externalversions/adminpolicybasedroute/v1"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
ktypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
utilpointer "k8s.io/utils/pointer"
)

// fieldManagerName should be different from any zone name
const fieldManagerName = "cluster-manager"

type typedStatusManager[T Object] struct {
gvk schema.GroupVersionKind
informer cache.SharedIndexInformer
patch func(ctx context.Context, name string, pt ktypes.PatchType, data []byte, opts metav1.PatchOptions,
subresources ...string) (T, error)
// updateStatusFunc should handle nil in case of pointer type, and return (updatedObject, needsUpdate).
// When possible, return a new object with just the right status set, this makes encoding faster.
updateStatusFunc func(obj T) (T, bool)
}

type Object interface {
runtime.Object
}

func (m *typedStatusManager[T]) updateStatus(oldObjInterface, newObjInterface interface{}) {
oldObj, ok := oldObjInterface.(T)
if !ok {
utilruntime.HandleError(fmt.Errorf("expecting %T but received %T", *new(T), oldObj))
return
}
newObj, ok := newObjInterface.(T)
if !ok {
utilruntime.HandleError(fmt.Errorf("expecting %T but received %T", *new(T), newObj))
return
}

key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", newObj, err))
return
}

patchedObj, needsUpdate := m.updateStatusFunc(newObj)
if !needsUpdate {
return
}
patchedObj.GetObjectKind().SetGroupVersionKind(m.gvk)
// Send the full object to be applied on the server side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, patchedObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("could not encode for patching: %w", err))
return
}

patchOptions := metav1.PatchOptions{
Force: utilpointer.Bool(true),
FieldManager: fieldManagerName,
}
_, err = m.patch(context.TODO(), key, ktypes.ApplyPatchType, data, patchOptions, "status")

if err != nil {
utilruntime.HandleError(fmt.Errorf("could not patch: %w", err))
}
}

func (m *typedStatusManager[T]) Start() error {
_, err := m.informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: m.updateStatus,
})
return err
}

type StatusManager struct {
abpRouteManager typedStatusManager[*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute]
// other types go here
}

func NewStatusManager(apbRouteInformer adminpolicybasedinformers.AdminPolicyBasedExternalRouteInformer,
apbRoutePolicyClient adminpolicybasedrouteclient.Interface) *StatusManager {
abpRouteManager := typedStatusManager[*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute]{
gvk: schema.GroupVersionKind{
Kind: "AdminPolicyBasedExternalRoute",
Group: "k8s.ovn.org",
Version: "v1",
},
informer: apbRouteInformer.Informer(),
patch: apbRoutePolicyClient.K8sV1().AdminPolicyBasedExternalRoutes().Patch,
updateStatusFunc: updateStatusAPBRoute,
}
return &StatusManager{
abpRouteManager: abpRouteManager,
}
}

func (sm *StatusManager) Start() error {
return sm.abpRouteManager.Start()
}

// updateStatusAPBRoute returns if policy needs status update
func updateStatusAPBRoute(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute) (*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, bool) {
if route == nil {
return nil, false
}
newStatus := adminpolicybasedrouteapi.SuccessStatus
for _, message := range route.Status.Messages {
if strings.Contains(message, types.APBRouteErrorMsg) {
newStatus = adminpolicybasedrouteapi.FailStatus
break
}
}
return &adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute{
Status: adminpolicybasedrouteapi.AdminPolicyBasedRouteStatus{
LastTransitionTime: metav1.Now(),
Status: newStatus,
},
}, route.Status.Status != newStatus
}
6 changes: 6 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ func NewClusterManagerWatchFactory(ovnClientset *util.OVNClusterManagerClientset
eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval),
cpipcFactory: ocpcloudnetworkinformerfactory.NewSharedInformerFactory(ovnClientset.CloudNetworkClient, resyncInterval),
egressServiceFactory: egressserviceinformerfactory.NewSharedInformerFactoryWithOptions(ovnClientset.EgressServiceClient, resyncInterval),
apbRouteFactory: adminbasedpolicyinformerfactory.NewSharedInformerFactory(ovnClientset.AdminPolicyRouteClient, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
}
Expand Down Expand Up @@ -621,6 +622,11 @@ func NewClusterManagerWatchFactory(ovnClientset *util.OVNClusterManagerClientset
}
}

if config.OVNKubernetesFeature.EnableMultiExternalGateway {
// make sure shared informer is created for a factory, so on wf.apbRouteFactory.Start() it is initialized and caches are synced.
wf.apbRouteFactory.K8s().V1().AdminPolicyBasedExternalRoutes().Informer()
}

return wf, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ func initController(k8sObjects, routePolicyObjects []runtime.Object) {
iFactory.NodeCoreInformer().Lister(),
nbClient,
addressset.NewFakeAddressSetFactory(controllerName),
controllerName)
controllerName,
"single-zone")
Expect(err).NotTo(HaveOccurred())

if nbZoneFailed {
Expand Down Expand Up @@ -714,19 +715,10 @@ var _ = Describe("OVN External Gateway policy", func() {
},
}
p.Generation++
lastUpdate := p.Status.LastTransitionTime
_, err = fakeRouteClient.K8sV1().AdminPolicyBasedExternalRoutes().Update(context.Background(), p, v1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())

Eventually(func() v1.Time {
// Retrieve the CR and ensure the last update timestamp is different before comparing it against the slice.
p, err = fakeRouteClient.K8sV1().AdminPolicyBasedExternalRoutes().Get(context.TODO(), staticMultiIPPolicy.Name, v1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return p.Status.LastTransitionTime
}).ShouldNot(Equal(lastUpdate))

By("Validating the static refernces don't contain the last element")

By("Validating the static references don't contain the last element")
expectedPolicy1, expectedRefs1 = expectedPolicyStateAndRefs(
[]*namespaceWithPods{namespaceTarget2WithPod},
[]string{"20.10.10.1", "20.10.10.2", "20.10.10.3", "20.10.10.4"},
Expand Down
89 changes: 62 additions & 27 deletions go-controller/pkg/ovn/controller/apbroute/master_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
ktypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -23,9 +26,9 @@ import (
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
utilpointer "k8s.io/utils/pointer"

adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
adminpolicybasedrouteclient "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1/apis/clientset/versioned"
Expand All @@ -34,7 +37,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
libovsdbutil "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/util"
addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
)

const (
Expand Down Expand Up @@ -69,6 +72,7 @@ type ExternalGatewayMasterController struct {

mgr *externalPolicyManager
nbClient *northBoundClient
zoneID string
}

func NewExternalMasterController(
Expand All @@ -82,6 +86,7 @@ func NewExternalMasterController(
nbClient libovsdbclient.Client,
addressSetFactory addressset.AddressSetFactory,
controllerName string,
zoneID string,
) (*ExternalGatewayMasterController, error) {

externalGWCache := make(map[ktypes.NamespacedName]*ExternalRouteInfo)
Expand Down Expand Up @@ -133,6 +138,7 @@ func NewExternalMasterController(
namespaceInformer.Lister(),
apbRouteInformer.Lister(),
nbCli),
zoneID: zoneID,
}
return c, nil
}
Expand Down Expand Up @@ -497,26 +503,44 @@ func (c *ExternalGatewayMasterController) updateStatusAPBExternalRoute(policyNam
return nil
}

resultErr := retry.RetryOnConflict(util.OvnConflictBackoff, func() error {
routePolicy, err := c.apbRoutePolicyClient.K8sV1().AdminPolicyBasedExternalRoutes().Get(context.TODO(), policyName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// policy doesn't exist, no need to update status
return nil
}
return err
// get object from the informer cache. No need to get the object from the kube-apiserver, since patch
// doesn't check resource versions, and no one else can update status message owned by current zone.
routePolicy, err := c.routeLister.Get(policyName)
if err != nil {
if apierrors.IsNotFound(err) {
// policy doesn't exist, no need to update status
return nil
}
return err
}

updateStatus(routePolicy, strings.Join(sets.List(gwIPs), ","), processedError)

_, err = c.apbRoutePolicyClient.K8sV1().AdminPolicyBasedExternalRoutes().UpdateStatus(context.TODO(), routePolicy, metav1.UpdateOptions{})
if !apierrors.IsNotFound(err) {
return err
}
patchObj, needsUpdate := getPatchStatusObj(routePolicy, strings.Join(sets.List(gwIPs), ","), c.zoneID, processedError)
if !needsUpdate {
return nil
})
if resultErr != nil {
return fmt.Errorf("failed to update AdminPolicyBasedExternalRoutes %s status: %v", policyName, resultErr)
}

gvk := schema.GroupVersionKind{
Kind: "AdminPolicyBasedExternalRoute",
Group: "k8s.ovn.org",
Version: "v1",
}
patchObj.GetObjectKind().SetGroupVersionKind(gvk)
// Send the full object to be applied on the server side.
data, err := runtime.Encode(unstructured.UnstructuredJSONScheme, patchObj)
if err != nil {
return fmt.Errorf("could not encode for patching: %w", err)
}

patchOptions := metav1.PatchOptions{
Force: utilpointer.Bool(true),
FieldManager: c.zoneID,
}

_, err = c.apbRoutePolicyClient.K8sV1().AdminPolicyBasedExternalRoutes().Patch(context.TODO(), policyName,
ktypes.ApplyPatchType, data, patchOptions, "status")

if err != nil {
return err
}
return nil
}
Expand All @@ -529,16 +553,27 @@ func (c *ExternalGatewayMasterController) GetStaticGatewayIPsForTargetNamespace(
return c.mgr.getStaticGatewayIPsForTargetNamespace(namespaceName)
}

func updateStatus(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, gwIPs string, err error) {
// getPatchStatusObj returns an object to call Patch, and bool to signal if update is needed
func getPatchStatusObj(route *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, gwIPs string, controllerID string,
err error) (*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, bool) {
newMsg := fmt.Sprintf("%v configured external gateway IPs: %s", controllerID, gwIPs)
if err != nil {
route.Status.Status = adminpolicybasedrouteapi.FailStatus
route.Status.Messages = append(route.Status.Messages, fmt.Sprintf("Failed to apply policy: %v", err.Error()))
return
newMsg = fmt.Sprintf("%s %s: %v", controllerID, types.APBRouteErrorMsg, err.Error())
}
needsUpdate := true
for _, message := range route.Status.Messages {
if message == newMsg {
// found previous status
needsUpdate = false
break
}
}
route.Status.LastTransitionTime = metav1.Time{Time: time.Now()}
route.Status.Status = adminpolicybasedrouteapi.SuccessStatus
route.Status.Messages = append(route.Status.Messages, fmt.Sprintf("Configured external gateway IPs: %s", gwIPs))
klog.V(4).Infof("Updating Admin Policy Based External Route %s with Status: %s, Message: %s", route.Name, route.Status.Status, route.Status.Messages[len(route.Status.Messages)-1])
return &adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute{
Status: adminpolicybasedrouteapi.AdminPolicyBasedRouteStatus{
Messages: []string{newMsg},
LastTransitionTime: metav1.Now(),
},
}, needsUpdate
}

// AddHybridRoutePolicyForPod exposes the function addHybridRoutePolicyForPod
Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func newDefaultNetworkControllerCommon(cnci *CommonNetworkControllerInfo,
cnci.nbClient,
addressSetFactory,
DefaultNetworkControllerName,
cnci.zone,
)
if err != nil {
return nil, fmt.Errorf("unable to create new admin policy based external route controller while creating new default network controller :%w", err)
Expand Down
2 changes: 2 additions & 0 deletions go-controller/pkg/types/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,6 @@ const (
// InformerSyncTimeout is used to wait from the initial informer cache sync.
// It allows ~4 list() retries with the default reflector exponential backoff config
InformerSyncTimeout = 20 * time.Second

APBRouteErrorMsg = "failed to apply policy"
)
Loading

0 comments on commit c28636d

Please sign in to comment.