Skip to content

Commit

Permalink
Merge pull request #435 from Danil-Grigorev/reconcile-etcd-memebers-o…
Browse files Browse the repository at this point in the history
…n-pre-delete

Implement etcd member management in pre-terminate hook
  • Loading branch information
alexander-demicev committed Sep 17, 2024
2 parents 465f030 + fc6f21d commit 7820d2d
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 8 deletions.
6 changes: 6 additions & 0 deletions controlplane/api/v1beta1/rke2controlplane_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ const (
// RollingUpdateStrategyType replaces the old control planes by new one using rolling update
// i.e. gradually scale up or down the old control planes and scale up or down the new one.
RollingUpdateStrategyType RolloutStrategyType = "RollingUpdate"

// PreTerminateHookCleanupAnnotation is the annotation RKE2 sets on Machines to ensure it can later remove the
// etcd member right before Machine termination (i.e. before InfraMachine deletion).
// Note: Starting with Kubernetes v1.31 this hook will wait for all other pre-terminate hooks to finish to
// ensure it runs last (thus ensuring that kubelet is still working while other pre-terminate hooks run).
PreTerminateHookCleanupAnnotation = clusterv1.PreTerminateDeleteHookAnnotationPrefix + "/rke2-cleanup"
)

func init() { //nolint:gochecknoinits
Expand Down
138 changes: 137 additions & 1 deletion controlplane/internal/controllers/rke2controlplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strings"
"time"

"github.com/blang/semver/v4"
Expand All @@ -32,6 +33,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -47,6 +49,7 @@ import (
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/version"

controlplanev1 "github.com/rancher/cluster-api-provider-rke2/controlplane/api/v1beta1"
"github.com/rancher/cluster-api-provider-rke2/pkg/kubeconfig"
Expand Down Expand Up @@ -514,6 +517,10 @@ func (r *RKE2ControlPlaneReconciler) reconcileNormal(
return ctrl.Result{}, err
}

if result, err := r.reconcilePreTerminateHook(ctx, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout()

Expand Down Expand Up @@ -698,14 +705,31 @@ func (r *RKE2ControlPlaneReconciler) reconcileDelete(ctx context.Context,
}

// Delete control plane machines in parallel
machinesToDelete := ownedMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
machinesToDelete := ownedMachines

var errs []error

for i := range machinesToDelete {
m := machinesToDelete[i]
logger := logger.WithValues("machine", m)

// During RKE2CP deletion we don't care about forwarding etcd leadership or removing etcd members.
// So we are removing the pre-terminate hook.
// This is important because when deleting RKE2CP we will delete all members of etcd and it's not possible
// to forward etcd leadership without any member left after we went through the Machine deletion.
// Also in this case the reconcileDelete code of the Machine controller won't execute Node drain
// and wait for volume detach.
if err := r.removePreTerminateHookAnnotationFromMachine(ctx, m); err != nil {
errs = append(errs, err)

continue
}

if !m.DeletionTimestamp.IsZero() {
// Nothing to do, Machine already has deletionTimestamp set.
continue
}

if err := r.Client.Delete(ctx, machinesToDelete[i]); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to cleanup owned machine")
errs = append(errs, err)
Expand All @@ -720,6 +744,8 @@ func (r *RKE2ControlPlaneReconciler) reconcileDelete(ctx context.Context,
return ctrl.Result{}, err
}

logger.Info("Waiting for control plane Machines to not exist anymore")

conditions.MarkFalse(rcp, controlplanev1.ResizedCondition, clusterv1.DeletingReason, clusterv1.ConditionSeverityInfo, "")

return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
Expand Down Expand Up @@ -909,6 +935,116 @@ func (r *RKE2ControlPlaneReconciler) ClusterToRKE2ControlPlane(ctx context.Conte
}
}

func (r *RKE2ControlPlaneReconciler) reconcilePreTerminateHook(ctx context.Context, controlPlane *rke2.ControlPlane) (ctrl.Result, error) {
// Ensure that every active machine has the drain hook set
patchHookAnnotation := false

for _, machine := range controlPlane.Machines.Filter(collections.ActiveMachines) {
if _, exists := machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation] = ""
patchHookAnnotation = true
}
}

if patchHookAnnotation {
// Patch machine annoations
if err := controlPlane.PatchMachines(ctx); err != nil {
return ctrl.Result{}, err
}
}

if !controlPlane.HasDeletingMachine() {
return ctrl.Result{}, nil
}

log := ctrl.LoggerFrom(ctx)

// Return early, if there is already a deleting Machine without the pre-terminate hook.
// We are going to wait until this Machine goes away before running the pre-terminate hook on other Machines.
for _, deletingMachine := range controlPlane.DeletingMachines() {
if _, exists := deletingMachine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
}

// Pick the Machine with the oldest deletionTimestamp to keep this function deterministic / reentrant
// so we only remove the pre-terminate hook from one Machine at a time.
deletingMachines := controlPlane.DeletingMachines()
deletingMachine := controlPlane.SortedByDeletionTimestamp(deletingMachines)[0]

log = log.WithValues("Machine", klog.KObj(deletingMachine))
ctx = ctrl.LoggerInto(ctx, log)

parsedVersion, err := semver.ParseTolerant(controlPlane.RCP.Spec.Version)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to parse Kubernetes version %q", controlPlane.RCP.Spec.Version)
}

// Return early if there are other pre-terminate hooks for the Machine.
// The CAPRKE2 pre-terminate hook should be the one executed last, so that kubelet
// is still working while other pre-terminate hooks are run.
// Note: This is done only for Kubernetes >= v1.31 to reduce the blast radius of this check.
if version.Compare(parsedVersion, semver.MustParse("1.31.0"), version.WithoutPreReleases()) >= 0 {
if machineHasOtherPreTerminateHooks(deletingMachine) {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}
}

// Return early because the Machine controller is not yet waiting for the pre-terminate hook.
c := conditions.Get(deletingMachine, clusterv1.PreTerminateDeleteHookSucceededCondition)
if c == nil || c.Status != corev1.ConditionFalse || c.Reason != clusterv1.WaitingExternalHookReason {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

// The following will execute and remove the pre-terminate hook from the Machine.

// If we have more than 1 Machine and etcd is managed we forward etcd leadership and remove the member
// to keep the etcd cluster healthy.
if controlPlane.Machines.Len() > 1 {
workloadCluster, err := r.GetWorkloadCluster(ctx, controlPlane)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err,
"failed to remove etcd member for deleting Machine %s: failed to create client to workload cluster", klog.KObj(deletingMachine))
}

// Note: In regular deletion cases (remediation, scale down) the leader should have been already moved.
// We're doing this again here in case the Machine became leader again or the Machine deletion was
// triggered in another way (e.g. a user running kubectl delete machine)
etcdLeaderCandidate := controlPlane.Machines.Filter(collections.Not(collections.HasDeletionTimestamp)).Newest()
if etcdLeaderCandidate != nil {
if err := workloadCluster.ForwardEtcdLeadership(ctx, deletingMachine, etcdLeaderCandidate); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to move leadership to candidate Machine %s", etcdLeaderCandidate.Name)
}
} else {
log.Info("Skip forwarding etcd leadership, because there is no other control plane Machine without a deletionTimestamp")
}

// Note: Removing the etcd member will lead to the etcd and the kube-apiserver Pod on the Machine shutting down.
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, deletingMachine); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to remove etcd member for deleting Machine %s", klog.KObj(deletingMachine))
}
}

if err := r.removePreTerminateHookAnnotationFromMachine(ctx, deletingMachine); err != nil {
return ctrl.Result{}, err
}

log.Info("Waiting for Machines to be deleted", "machines",
strings.Join(controlPlane.Machines.Filter(collections.HasDeletionTimestamp).Names(), ", "))

return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

func machineHasOtherPreTerminateHooks(machine *clusterv1.Machine) bool {
for k := range machine.Annotations {
if strings.HasPrefix(k, clusterv1.PreTerminateDeleteHookAnnotationPrefix) && k != controlplanev1.PreTerminateHookCleanupAnnotation {
return true
}
}

return false
}

// getWorkloadCluster gets a cluster object.
// The cluster comes with an etcd client generator to connect to any etcd pod living on a managed machine.
func (r *RKE2ControlPlaneReconciler) getWorkloadCluster(ctx context.Context, clusterKey types.NamespacedName) (rke2.WorkloadCluster, error) {
Expand Down
32 changes: 26 additions & 6 deletions controlplane/internal/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
Expand Down Expand Up @@ -159,11 +161,7 @@ func (r *RKE2ControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, err
}

if err := r.workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove etcd member for machine")

return ctrl.Result{}, err
}
// NOTE: etcd member removal will be performed by the rke2-cleanup hook after machine completes drain & all volumes are detached.

logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
Expand All @@ -178,6 +176,25 @@ func (r *RKE2ControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{Requeue: true}, nil
}

func (r *RKE2ControlPlaneReconciler) removePreTerminateHookAnnotationFromMachine(ctx context.Context, machine *clusterv1.Machine) error {
if _, exists := machine.Annotations[controlplanev1.PreTerminateHookCleanupAnnotation]; !exists {
// Nothing to do, the annotation is not set (anymore) on the Machine
return nil
}

log := ctrl.LoggerFrom(ctx)
log.Info("Removing pre-terminate hook from control plane Machine")

machineOriginal := machine.DeepCopy()
delete(machine.Annotations, controlplanev1.PreTerminateHookCleanupAnnotation)

if err := r.Client.Patch(ctx, machine, client.MergeFrom(machineOriginal)); err != nil {
return errors.Wrapf(err, "failed to remove pre-terminate hook from control plane Machine %s", klog.KObj(machine))
}

return nil
}

// preflightChecks checks if the control plane is stable before proceeding with a scale up/scale down operation,
// where stable means that:
// - There are no machine deletion in progress
Expand Down Expand Up @@ -447,7 +464,10 @@ func (r *RKE2ControlPlaneReconciler) generateMachine(
return errors.Wrap(err, "failed to marshal cluster configuration")
}

machine.SetAnnotations(map[string]string{controlplanev1.RKE2ServerConfigurationAnnotation: string(serverConfig)})
machine.SetAnnotations(map[string]string{
controlplanev1.RKE2ServerConfigurationAnnotation: string(serverConfig),
controlplanev1.PreTerminateHookCleanupAnnotation: "",
})

if err := r.Client.Create(ctx, machine); err != nil {
return errors.Wrap(err, "failed to create machine")
Expand Down
44 changes: 44 additions & 0 deletions pkg/rke2/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package rke2

import (
"context"
"sort"

"github.com/go-logr/logr"
"github.com/pkg/errors"
Expand Down Expand Up @@ -268,6 +269,23 @@ func (c *ControlPlane) HasDeletingMachine() bool {
return len(c.Machines.Filter(collections.HasDeletionTimestamp)) > 0
}

// DeletingMachines returns machines in the control plane that are in the process of being deleted.
func (c *ControlPlane) DeletingMachines() collections.Machines {
return c.Machines.Filter(collections.HasDeletionTimestamp)
}

// SortedByDeletionTimestamp returns the machines sorted by deletion timestamp.
func (c *ControlPlane) SortedByDeletionTimestamp(s collections.Machines) []*clusterv1.Machine {
res := make(machinesByDeletionTimestamp, 0, len(s))
for _, value := range s {
res = append(res, value)
}

sort.Sort(res)

return res
}

// MachinesNeedingRollout return a list of machines that need to be rolled out.
func (c *ControlPlane) MachinesNeedingRollout() collections.Machines {
// Ignore machines to be deleted.
Expand Down Expand Up @@ -383,3 +401,29 @@ func (c *ControlPlane) PatchMachines(ctx context.Context) error {

return kerrors.NewAggregate(errList)
}

// machinesByDeletionTimestamp sorts a list of Machines by deletion timestamp, using their names as a tie breaker.
// Machines without DeletionTimestamp go after machines with this field set.
type machinesByDeletionTimestamp []*clusterv1.Machine

func (o machinesByDeletionTimestamp) Len() int { return len(o) }
func (o machinesByDeletionTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o machinesByDeletionTimestamp) Less(i, j int) bool {
if o[i].DeletionTimestamp == nil && o[j].DeletionTimestamp == nil {
return o[i].Name < o[j].Name
}

if o[i].DeletionTimestamp == nil {
return false
}

if o[j].DeletionTimestamp == nil {
return true
}

if o[i].DeletionTimestamp.Equal(o[j].DeletionTimestamp) {
return o[i].Name < o[j].Name
}

return o[i].DeletionTimestamp.Before(o[j].DeletionTimestamp)
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ spec:
cni: calico
disableComponents:
kubernetesComponents: [ "cloudController"]
nodeDrainTimeout: 2m
rolloutStrategy:
type: "RollingUpdate"
rollingUpdate:
Expand Down

0 comments on commit 7820d2d

Please sign in to comment.