Skip to content

Commit

Permalink
Merge pull request #3667 from jordigilh/fix/concurrency_policy
Browse files Browse the repository at this point in the history
APB External Route: refactored controllers to use a single workqueue to handle policy updates
  • Loading branch information
trozet committed Jun 28, 2023
2 parents 96ba255 + 462815d commit 273ca9f
Show file tree
Hide file tree
Showing 13 changed files with 938 additions and 1,053 deletions.
65 changes: 25 additions & 40 deletions go-controller/pkg/ovn/controller/apbroute/external_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apbroute

import (
"fmt"
"strings"
"sync"

adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
Expand All @@ -17,6 +18,14 @@ import (

type gatewayInfoList []*gatewayInfo

func (g gatewayInfoList) String() string {
ret := []string{}
for _, i := range g {
ret = append(ret, i.String())
}
return strings.Join(ret, ", ")
}

// HasBFDEnabled returns the BFD value for the given IP stored in the gatewayInfoList when found.
// It also returns a boolean that indicates if the IP was found and an error
// An IP can only have one BFD value.
Expand Down Expand Up @@ -82,6 +91,10 @@ type gatewayInfo struct {
BFDEnabled bool
}

func (g *gatewayInfo) String() string {
return fmt.Sprintf("BFDEnabled: %t, Gateways: %+v", g.BFDEnabled, g.Gateways.items)
}

func newGatewayInfo(items sets.Set[string], bfdEnabled bool) *gatewayInfo {
return &gatewayInfo{Gateways: &syncSet{items: items, mux: &sync.Mutex{}}, BFDEnabled: bfdEnabled}
}
Expand All @@ -93,16 +106,7 @@ type syncSet struct {

// Equal compares two gatewayInfo instances and returns true if all the gateway IPs are equal, regardless of the order, as well as the BFDEnabled field value.
func (g *gatewayInfo) Equal(g2 *gatewayInfo) bool {
return g.BFDEnabled == g2.BFDEnabled && g.Gateways.Equal(g2.Gateways)
}

func (g *syncSet) Equal(s2 *syncSet) bool {
s2.mux.Lock()
s2items := s2.items.Clone()
s2.mux.Unlock()
g.mux.Lock()
defer g.mux.Unlock()
return g.items.Equal(s2items)
return g.BFDEnabled == g2.BFDEnabled && len(g.Gateways.Difference(g2.Gateways.items.Clone())) == 0
}

func (g *syncSet) Has(ip string) bool {
Expand Down Expand Up @@ -145,7 +149,6 @@ type namespaceInfo struct {
Policies sets.Set[string]
StaticGateways gatewayInfoList
DynamicGateways map[ktypes.NamespacedName]*gatewayInfo
markForDelete bool
}

func newNamespaceInfo() *namespaceInfo {
Expand All @@ -157,8 +160,8 @@ func newNamespaceInfo() *namespaceInfo {
}

type routeInfo struct {
policy *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute
markedForDelete bool
policy *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute
markedForDeletion bool
}

type ExternalRouteInfo struct {
Expand Down Expand Up @@ -222,9 +225,9 @@ func newExternalPolicyManager(
}

// getRoutePolicyFromCache retrieves the cached value of the policy if it exists in the cache, as well as locking the key in case it exists.
func (m *externalPolicyManager) getRoutePolicyFromCache(policyName string) (adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, bool, bool) {
func (m *externalPolicyManager) getRoutePolicyFromCache(policyName string) (*adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, bool, bool) {
var (
policy adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute
policy *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute
found, markedForDeletion bool
)
_ = m.routePolicySyncCache.DoWithLock(policyName, func(policyName string) error {
Expand All @@ -233,8 +236,8 @@ func (m *externalPolicyManager) getRoutePolicyFromCache(policyName string) (admi
return nil
}
found = f
policy = *ri.policy
markedForDeletion = ri.markedForDelete
markedForDeletion = ri.markedForDeletion
policy = ri.policy.DeepCopy()
return nil
})
return policy, found, markedForDeletion
Expand All @@ -247,7 +250,7 @@ func (m *externalPolicyManager) storeRoutePolicyInCache(policyInfo *adminpolicyb
m.routePolicySyncCache.LoadOrStore(policyName, &routeInfo{policy: policyInfo})
return nil
}
if ri.markedForDelete {
if ri.markedForDeletion {
return fmt.Errorf("attempting to store policy %s that is in the process of being deleted", policyInfo.Name)
}
ri.policy = policyInfo
Expand All @@ -258,7 +261,7 @@ func (m *externalPolicyManager) storeRoutePolicyInCache(policyInfo *adminpolicyb
func (m *externalPolicyManager) deleteRoutePolicyFromCache(policyName string) error {
return m.routePolicySyncCache.DoWithLock(policyName, func(policyName string) error {
ri, found := m.routePolicySyncCache.Load(policyName)
if found && !ri.markedForDelete {
if found && !ri.markedForDeletion {
return fmt.Errorf("attempting to delete route policy %s from cache before it has been marked for deletion", policyName)
}
m.routePolicySyncCache.Delete(policyName)
Expand All @@ -278,7 +281,7 @@ func (m *externalPolicyManager) getAndMarkRoutePolicyForDeletionInCache(policyNa
if !found {
return nil
}
ri.markedForDelete = true
ri.markedForDeletion = true
exists = true
routePolicy = *ri.policy
return nil
Expand All @@ -296,26 +299,8 @@ func (m *externalPolicyManager) getNamespaceInfoFromCache(namespaceName string)
return nsInfo, true
}

func (m *externalPolicyManager) getAndMarkForDeleteNamespaceInfoFromCache(namespaceName string) (*namespaceInfo, bool) {
nsInfo, ok := m.getNamespaceInfoFromCache(namespaceName)
if !ok {
return nil, false
}
nsInfo.markForDelete = true
return nsInfo, ok
}

func (m *externalPolicyManager) deleteNamespaceInfoInCache(namespaceName string) {
nsInfo, ok := m.namespaceInfoSyncCache.Load(namespaceName)
if !ok {
klog.Warningf("Failed to retrieve namespace %s for deletion", namespaceName)
return
}
if !nsInfo.markForDelete {
klog.Warningf("Attempting to delete namespace %s when it has not been marked for deletion", namespaceName)
return
}
m.namespaceInfoSyncCache.Delete(namespaceName)
func (m *externalPolicyManager) getAllNamespacesNamesInCache() []string {
return m.namespaceInfoSyncCache.GetKeys()
}

func (m *externalPolicyManager) unlockNamespaceInfoCache(namespaceName string) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,90 +1,71 @@
package apbroute

import (
"fmt"

adminpolicybasedrouteapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/adminpolicybasedroute/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)

// processAddNamespace takes in a namespace and applies the policies that are applicable to the namespace, previously stored in the cacheInfo object argument.
// The logic goes through all the policies and applies the gateway IPs derived from the static and dynamic hop to all the pods in the namespace.
// Lastly, it updates the cacheInfo to contain the static and dynamic gateway IPs generated from the previous action to keep track of the gateway IPs applied in the namespace.
func (m *externalPolicyManager) processAddNamespace(new *v1.Namespace, cacheInfo *namespaceInfo) error {
staticGateways, dynamicGateways, err := m.aggregateNamespaceInfo(cacheInfo.Policies)
if err != nil {
return err
}
cacheInfo.StaticGateways = staticGateways
cacheInfo.DynamicGateways = dynamicGateways
return nil
}

// processDeleteNamespace processes a delete namespace event by ensuring that no pod is still running in that namespace before deleting the namespace info cache. It also
// marks the namespace for deletion so that if a new pod event appears targeting the namespace, the operation will be rejected.
func (m *externalPolicyManager) processDeleteNamespace(namespaceName string) error {
_, found := m.getAndMarkForDeleteNamespaceInfoFromCache(namespaceName)
if !found {
// namespace is not a recipient for policies
return nil
}
defer m.unlockNamespaceInfoCache(namespaceName)
podsInNs, err := m.podLister.Pods(namespaceName).List(labels.Everything())
if err != nil {
return err
}
if len(podsInNs) != 0 {
klog.Infof("Attempting to delete namespace %s with resources still attached to it. Retrying...", namespaceName)
return fmt.Errorf("unable to delete namespace %s with resources still attached to it", namespaceName)
}
m.deleteNamespaceInfoInCache(namespaceName)
return nil
}

// processUpdateNamespace takes in a namespace name, current policies applied to the namespace, policies that are now expected to be applied to the namespace and the cache info
// that contains all the current gateway IPs and policies for that namespace. It follows this logic:
// * Calculate the difference between current and expected policies and proceed to remove the gateway IPs from the policies that are no longer applicable to this namespace
// * Calculate the difference between the expected and current ones to determine the new policies to be applied and proceed to apply them.
// * Update the cache info with the new list of policies, as well as the static and dynamic gateway IPs derived from executing the previous logic.
func (m *externalPolicyManager) processUpdateNamespace(namespaceName string, currentPolicies, newPolicies sets.Set[string], cacheInfo *namespaceInfo) error {

// some differences apply, let's figure out if previous policies have been removed first
policiesNotValid := currentPolicies.Difference(newPolicies)
// iterate through the policies that no longer apply to this namespace
for policyName := range policiesNotValid {
err := m.removePolicyFromNamespaceWithName(namespaceName, policyName, cacheInfo)
func (m *externalPolicyManager) syncNamespace(namespace *v1.Namespace, routeQueue workqueue.RateLimitingInterface) error {
klog.V(5).Infof("APB processing namespace: %s", namespace)

keysToBeQueued := sets.New[string]()

// Get a copy of all policies at this time and see if they include this namespace
policyKeys := m.routePolicySyncCache.GetKeys()

for _, policyName := range policyKeys {
err := m.routePolicySyncCache.DoWithLock(policyName,
func(key string) error {
ri, found := m.routePolicySyncCache.Load(policyName)
if found {
nsSel, _ := metav1.LabelSelectorAsSelector(&ri.policy.Spec.From.NamespaceSelector)
if nsSel.Matches(labels.Set(namespace.Labels)) {
keysToBeQueued.Insert(policyName)
return nil
}
for _, hop := range ri.policy.Spec.NextHops.DynamicHops {
gwNs, err := m.listNamespacesBySelector(hop.NamespaceSelector)
if err != nil {
return err
}
for _, ns := range gwNs {
if ns.Name == namespace.Name {
keysToBeQueued.Insert(policyName)
return nil
}
}
}
}
return nil
})
if err != nil {
return err
}
}

// policies that now apply to this namespace
newPoliciesDiff := newPolicies.Difference(currentPolicies)
for policyName := range newPoliciesDiff {
policy, found, markedForDeletion := m.getRoutePolicyFromCache(policyName)
if !found {
return fmt.Errorf("failed to find external route policy %s in cache", policyName)
}
if markedForDeletion {
klog.Infof("Skipping route policy %s as it has been marked for deletion", policyName)
continue
}
err := m.applyPolicyToNamespace(namespaceName, &policy, cacheInfo)
if err != nil {
return err
// Check if this namespace is being tracked by policy in its namespace cache
cacheInfo, found := m.getNamespaceInfoFromCache(namespace.Name)
if found {
for policyName := range cacheInfo.Policies {
keysToBeQueued.Insert(policyName)
}
m.unlockNamespaceInfoCache(namespace.Name)
}

for policyKey := range keysToBeQueued {
klog.V(5).Infof("APB queuing policy: %s for namespace: %s", policyKey, namespace)
routeQueue.Add(policyKey)
}
// at least one policy apply, let's update the cache
cacheInfo.Policies = newPolicies
return nil

return nil
}

// Must be called with lock on namespaceInfo cache
func (m *externalPolicyManager) applyPolicyToNamespace(namespaceName string, policy *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, cacheInfo *namespaceInfo) error {

processedPolicy, err := m.processExternalRoutePolicy(policy)
Expand All @@ -98,15 +79,8 @@ func (m *externalPolicyManager) applyPolicyToNamespace(namespaceName string, pol
return nil
}

func (m *externalPolicyManager) removePolicyFromNamespaceWithName(targetNamespace, policyName string, cacheInfo *namespaceInfo) error {
policy, err := m.routeLister.Get(policyName)
if err != nil {
return err
}
return m.removePolicyFromNamespace(targetNamespace, policy, cacheInfo)
}
// Must be called with lock on namespaceInfo cache
func (m *externalPolicyManager) removePolicyFromNamespace(targetNamespace string, policy *adminpolicybasedrouteapi.AdminPolicyBasedExternalRoute, cacheInfo *namespaceInfo) error {

processedPolicy, err := m.processExternalRoutePolicy(policy)
if err != nil {
return err
Expand All @@ -115,7 +89,13 @@ func (m *externalPolicyManager) removePolicyFromNamespace(targetNamespace string
if err != nil {
return err
}

klog.V(4).InfoS("Deleting APB policy %s in namespace cache %s", policy.Name, targetNamespace)
cacheInfo.Policies = cacheInfo.Policies.Delete(policy.Name)
if len(cacheInfo.Policies) == 0 {
m.namespaceInfoSyncCache.Delete(targetNamespace)
}

return nil
}

Expand All @@ -131,32 +111,3 @@ func (m *externalPolicyManager) listNamespacesBySelector(selector *metav1.LabelS
return ns, nil

}

func (m *externalPolicyManager) aggregateNamespaceInfo(policies sets.Set[string]) (gatewayInfoList, map[ktypes.NamespacedName]*gatewayInfo, error) {

static := gatewayInfoList{}
dynamic := make(map[ktypes.NamespacedName]*gatewayInfo)
for policyName := range policies {
externalPolicy, err := m.routeLister.Get(policyName)
if err != nil {
klog.Warningf("Unable to find route policy %s:%+v", policyName, err)
continue
}
processedPolicy, err := m.processExternalRoutePolicy(externalPolicy)
if err != nil {
return nil, nil, err
}
var duplicated sets.Set[string]
static, duplicated, err = static.Insert(processedPolicy.staticGateways...)
if err != nil {
return nil, nil, err
}
if duplicated.Len() > 0 {
klog.Warningf("Found duplicated gateway IP(s) %+s in policy(s) %+s", sets.List(duplicated), sets.List(policies))
}
for podName, gatewayInfo := range processedPolicy.dynamicGateways {
dynamic[podName] = gatewayInfo
}
}
return static, dynamic, nil
}
Loading

0 comments on commit 273ca9f

Please sign in to comment.