Skip to content

Commit d391f26

Browse files
committed
[CASCL-1386] Create temporary PodDisruptionBudgets during eviction
Part of a stack splitting #3026 (too large to review in one piece) into small pieces that each build and pass tests on their own. The command is fully functional only once the whole stack lands.
1 parent f21be8d commit d391f26

3 files changed

Lines changed: 761 additions & 2 deletions

File tree

cmd/kubectl-datadog/autoscaling/cluster/evict/pdb.go

Lines changed: 328 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,341 @@ package evict
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"reflect"
9+
"slices"
10+
"strings"
511

12+
"github.com/samber/lo"
13+
appsv1 "k8s.io/api/apps/v1"
14+
corev1 "k8s.io/api/core/v1"
15+
policyv1 "k8s.io/api/policy/v1"
16+
apierrors "k8s.io/apimachinery/pkg/api/errors"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/runtime"
19+
"k8s.io/apimachinery/pkg/util/intstr"
20+
"k8s.io/apimachinery/pkg/util/validation"
621
"k8s.io/client-go/kubernetes"
22+
"k8s.io/client-go/tools/pager"
723
"sigs.k8s.io/controller-runtime/pkg/client"
24+
25+
commonk8s "github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/common/k8s"
26+
)
27+
28+
// Label keys used to mark PodDisruptionBudgets created by this command. Both
29+
// must be present for cleanup to consider the PDB ours — this makes the
30+
// cleanup safe against accidentally removing a user PDB with a colliding name.
31+
const (
32+
pdbManagedByLabelKey = "app.kubernetes.io/managed-by"
33+
pdbManagedByLabelValue = "kubectl-datadog"
34+
pdbTempLabelKey = "autoscaling.datadoghq.com/temporary-pdb"
35+
pdbTempLabelValue = "true"
36+
// pdbNameSuffix is appended to the controller name to form the temp PDB
37+
// name. Kept short so that long controller names stay under the 63-char
38+
// DNS label limit after truncation.
39+
pdbNameSuffix = "-evict-legacy"
840
)
941

1042
func ensureTempPDBs(ctx context.Context, clientset kubernetes.Interface, ctrlClient client.Client, targets []Target, dryRun bool) error {
11-
panic("TODO: ensureTempPDBs — implemented in PR https://github.com/DataDog/datadog-operator/pull/3178")
43+
// Targets across all manager types are merged: the orchestrator blocks
44+
// on waitEKSNodegroupEmpty before cleaning up the temporary PDBs, so EKS
45+
// MNG nodes observe the PDBs during their drain too — without that, EKS
46+
// could disrupt every replica of an otherwise unprotected workload at
47+
// once when all replicas happen to live on the same node group.
48+
allNodes := lo.FlatMap(targets, func(t Target, _ int) []string { return t.Nodes })
49+
nodeSet := lo.SliceToMap(allNodes, func(n string) (string, struct{}) { return n, struct{}{} })
50+
if len(nodeSet) == 0 {
51+
return nil
52+
}
53+
54+
controllers, err := discoverControllers(ctx, clientset, nodeSet)
55+
if err != nil {
56+
return fmt.Errorf("failed to discover controllers: %w", err)
57+
}
58+
if len(controllers) == 0 {
59+
return nil
60+
}
61+
62+
// Group controllers by namespace to amortize the per-namespace PDB list.
63+
byNamespace := make(map[string][]controllerInfo)
64+
for _, c := range controllers {
65+
byNamespace[c.Namespace] = append(byNamespace[c.Namespace], c)
66+
}
67+
68+
var errs []error
69+
for ns, ctrls := range byNamespace {
70+
existing, err := listNamespacePDBs(ctx, clientset, ns)
71+
if err != nil {
72+
errs = append(errs, fmt.Errorf("namespace %s: failed to list PDBs: %w", ns, err))
73+
continue
74+
}
75+
for _, c := range ctrls {
76+
if hasUserPDB(existing, c.Selector) {
77+
continue
78+
}
79+
if err := createTempPDB(ctx, ctrlClient, c, dryRun); err != nil {
80+
errs = append(errs, fmt.Errorf("controller %s/%s/%s: %w", c.Namespace, c.Kind, c.Name, err))
81+
}
82+
}
83+
}
84+
return errors.Join(errs...)
1285
}
1386

1487
func cleanupTempPDBs(ctx context.Context, ctrlClient client.Client, dryRun bool) error {
15-
panic("TODO: cleanupTempPDBs — implemented in PR https://github.com/DataDog/datadog-operator/pull/3178")
88+
list := &policyv1.PodDisruptionBudgetList{}
89+
if err := ctrlClient.List(ctx, list, client.MatchingLabels{
90+
pdbManagedByLabelKey: pdbManagedByLabelValue,
91+
pdbTempLabelKey: pdbTempLabelValue,
92+
}); err != nil {
93+
return fmt.Errorf("failed to list temporary PDBs: %w", err)
94+
}
95+
if len(list.Items) == 0 {
96+
return nil
97+
}
98+
var errs []error
99+
for i := range list.Items {
100+
pdb := &list.Items[i]
101+
if dryRun {
102+
log.Printf("[dry-run] would delete PDB %s/%s", pdb.Namespace, pdb.Name)
103+
continue
104+
}
105+
if err := commonk8s.Delete(ctx, ctrlClient, pdb); err != nil {
106+
errs = append(errs, fmt.Errorf("PDB %s/%s: %w", pdb.Namespace, pdb.Name, err))
107+
}
108+
}
109+
return errors.Join(errs...)
110+
}
111+
112+
// controllerKey identifies a top-level controller that owns evictable pods on
113+
// our target nodes. It is the dedup key in the seen map and the identity half
114+
// of controllerInfo.
115+
type controllerKey struct {
116+
Namespace string
117+
Kind string // "Deployment", "StatefulSet", "ReplicaSet"
118+
Name string
119+
}
120+
121+
// controllerInfo is a controllerKey plus the controller's pod selector — what a
122+
// PDB would match on.
123+
type controllerInfo struct {
124+
controllerKey
125+
Selector *metav1.LabelSelector
126+
}
127+
128+
// discoverControllers lists every Pod cluster-wide once (paginated) and, for
129+
// each Pod scheduled on one of the target nodes, resolves the top-level
130+
// controller. Listing once and filtering client-side avoids the N-API-calls
131+
// problem of doing one List per node, which on a large legacy fleet would
132+
// dominate the command's wall-clock time. The resulting slice contains each
133+
// controller at most once.
134+
func discoverControllers(ctx context.Context, clientset kubernetes.Interface, nodeSet map[string]struct{}) ([]controllerInfo, error) {
135+
seen := make(map[controllerKey]*metav1.LabelSelector)
136+
depCache := make(map[client.ObjectKey]*appsv1.Deployment)
137+
rsCache := make(map[client.ObjectKey]*appsv1.ReplicaSet)
138+
stsCache := make(map[client.ObjectKey]*appsv1.StatefulSet)
139+
140+
p := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
141+
return clientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, opts)
142+
})
143+
if err := p.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
144+
pod := obj.(*corev1.Pod)
145+
if _, onTarget := nodeSet[pod.Spec.NodeName]; !onTarget {
146+
return nil
147+
}
148+
if shouldSkipEviction(pod) {
149+
return nil
150+
}
151+
info, err := resolveTopLevelController(ctx, clientset, pod, depCache, rsCache, stsCache)
152+
if err != nil {
153+
log.Printf("Warning: cannot resolve controller for pod %s/%s: %v", pod.Namespace, pod.Name, err)
154+
return nil
155+
}
156+
if info == nil {
157+
return nil
158+
}
159+
seen[info.controllerKey] = info.Selector
160+
return nil
161+
}); err != nil {
162+
return nil, fmt.Errorf("list pods: %w", err)
163+
}
164+
165+
return lo.MapToSlice(seen, func(key controllerKey, selector *metav1.LabelSelector) controllerInfo {
166+
return controllerInfo{controllerKey: key, Selector: selector}
167+
}), nil
168+
}
169+
170+
// resolveTopLevelController walks a Pod's owner chain up to the workload
171+
// controller (Deployment > ReplicaSet > Pod, StatefulSet > Pod). Returns nil
172+
// for Pods whose top-level owner is not a stable workload — Jobs (TTL-managed),
173+
// DaemonSets (already skipped at eviction), or bare Pods.
174+
func resolveTopLevelController(
175+
ctx context.Context,
176+
clientset kubernetes.Interface,
177+
pod *corev1.Pod,
178+
depCache map[client.ObjectKey]*appsv1.Deployment,
179+
rsCache map[client.ObjectKey]*appsv1.ReplicaSet,
180+
stsCache map[client.ObjectKey]*appsv1.StatefulSet,
181+
) (*controllerInfo, error) {
182+
owner := metav1.GetControllerOf(pod)
183+
if owner == nil {
184+
return nil, nil
185+
}
186+
switch owner.Kind {
187+
case "ReplicaSet":
188+
rs, err := getCached(ctx, rsCache, pod.Namespace, owner.Name, clientset.AppsV1().ReplicaSets(pod.Namespace).Get)
189+
if err != nil {
190+
return nil, err
191+
}
192+
rsOwner := metav1.GetControllerOf(rs)
193+
if rsOwner != nil && rsOwner.Kind == "Deployment" {
194+
dep, err := getCached(ctx, depCache, pod.Namespace, rsOwner.Name, clientset.AppsV1().Deployments(pod.Namespace).Get)
195+
if err != nil {
196+
return nil, err
197+
}
198+
return &controllerInfo{
199+
controllerKey: controllerKey{Namespace: pod.Namespace, Kind: "Deployment", Name: dep.Name},
200+
Selector: dep.Spec.Selector,
201+
}, nil
202+
}
203+
return &controllerInfo{
204+
controllerKey: controllerKey{Namespace: pod.Namespace, Kind: "ReplicaSet", Name: rs.Name},
205+
Selector: rs.Spec.Selector,
206+
}, nil
207+
case "StatefulSet":
208+
sts, err := getCached(ctx, stsCache, pod.Namespace, owner.Name, clientset.AppsV1().StatefulSets(pod.Namespace).Get)
209+
if err != nil {
210+
return nil, err
211+
}
212+
return &controllerInfo{
213+
controllerKey: controllerKey{Namespace: pod.Namespace, Kind: "StatefulSet", Name: sts.Name},
214+
Selector: sts.Spec.Selector,
215+
}, nil
216+
default:
217+
// DaemonSet (skipped before reaching here), Job (TTL), CronJob,
218+
// custom controllers — none get a temporary PDB.
219+
return nil, nil
220+
}
221+
}
222+
223+
// getCached returns the object identified by (ns, name) from cache, fetching it
224+
// via get — the namespace-bound typed clientset accessor, e.g.
225+
// clientset.AppsV1().ReplicaSets(ns).Get — and populating the cache on a miss.
226+
// T is inferred from the cache's value type, collapsing the per-kind getters
227+
// into a single generic lookup.
228+
func getCached[T any](
229+
ctx context.Context,
230+
cache map[client.ObjectKey]*T,
231+
ns, name string,
232+
get func(ctx context.Context, name string, opts metav1.GetOptions) (*T, error),
233+
) (*T, error) {
234+
key := client.ObjectKey{Namespace: ns, Name: name}
235+
if obj, ok := cache[key]; ok {
236+
return obj, nil
237+
}
238+
obj, err := get(ctx, name, metav1.GetOptions{})
239+
if err != nil {
240+
return nil, err
241+
}
242+
cache[key] = obj
243+
return obj, nil
244+
}
245+
246+
// listNamespacePDBs returns every PDB in the namespace. Used to detect
247+
// pre-existing user PDBs covering a controller we'd otherwise PDB-protect.
248+
func listNamespacePDBs(ctx context.Context, clientset kubernetes.Interface, namespace string) ([]policyv1.PodDisruptionBudget, error) {
249+
list, err := clientset.PolicyV1().PodDisruptionBudgets(namespace).List(ctx, metav1.ListOptions{})
250+
if err != nil {
251+
return nil, err
252+
}
253+
return list.Items, nil
254+
}
255+
256+
// hasUserPDB returns true when an existing non-temporary PDB has the same
257+
// selector as the controller's pod selector. This is a conservative
258+
// equality check: a broader user PDB will NOT be detected, and we'll create
259+
// our own. Eviction will then respect the most restrictive of the two,
260+
// preserving the user's intent.
261+
func hasUserPDB(existing []policyv1.PodDisruptionBudget, controllerSelector *metav1.LabelSelector) bool {
262+
if controllerSelector == nil {
263+
return false
264+
}
265+
return slices.ContainsFunc(existing, func(pdb policyv1.PodDisruptionBudget) bool {
266+
return !isTemporaryPDB(&pdb) && reflect.DeepEqual(pdb.Spec.Selector, controllerSelector)
267+
})
268+
}
269+
270+
func isTemporaryPDB(pdb *policyv1.PodDisruptionBudget) bool {
271+
return pdb.Labels[pdbManagedByLabelKey] == pdbManagedByLabelValue &&
272+
pdb.Labels[pdbTempLabelKey] == pdbTempLabelValue
273+
}
274+
275+
// createTempPDB writes (or no-ops if our PDB already exists) a temporary
276+
// PodDisruptionBudget with maxUnavailable: 1. Existing PDBs that aren't ours
277+
// at the same name are left alone with a logged warning — that's a name
278+
// collision the user must resolve.
279+
func createTempPDB(ctx context.Context, ctrlClient client.Client, c controllerInfo, dryRun bool) error {
280+
name := tempPDBName(c.Kind, c.Name)
281+
if dryRun {
282+
log.Printf("[dry-run] would create PDB %s/%s (maxUnavailable: 1, selector: %s)", c.Namespace, name, formatSelector(c.Selector))
283+
return nil
284+
}
285+
286+
existing := &policyv1.PodDisruptionBudget{}
287+
err := ctrlClient.Get(ctx, client.ObjectKey{Namespace: c.Namespace, Name: name}, existing)
288+
switch {
289+
case err == nil:
290+
if !isTemporaryPDB(existing) {
291+
log.Printf("Warning: PDB %s/%s exists but is not labelled as temporary; leaving it untouched", c.Namespace, name)
292+
return nil
293+
}
294+
// Our PDB from a previous (possibly crashed) run. Leave as-is; the
295+
// cleanup step will remove it at the end of the current run.
296+
return nil
297+
case !apierrors.IsNotFound(err):
298+
return fmt.Errorf("failed to get PDB %s/%s: %w", c.Namespace, name, err)
299+
}
300+
301+
maxUnavailable := intstr.FromInt(1)
302+
pdb := &policyv1.PodDisruptionBudget{
303+
TypeMeta: metav1.TypeMeta{APIVersion: "policy/v1", Kind: "PodDisruptionBudget"},
304+
ObjectMeta: metav1.ObjectMeta{
305+
Name: name,
306+
Namespace: c.Namespace,
307+
Labels: map[string]string{
308+
pdbManagedByLabelKey: pdbManagedByLabelValue,
309+
pdbTempLabelKey: pdbTempLabelValue,
310+
},
311+
},
312+
Spec: policyv1.PodDisruptionBudgetSpec{
313+
Selector: c.Selector.DeepCopy(),
314+
MaxUnavailable: &maxUnavailable,
315+
},
316+
}
317+
if err := ctrlClient.Create(ctx, pdb); err != nil {
318+
return fmt.Errorf("failed to create PDB %s/%s: %w", c.Namespace, name, err)
319+
}
320+
log.Printf("Created temporary PDB %s/%s for %s/%s (maxUnavailable: 1).", c.Namespace, name, c.Kind, c.Name)
321+
return nil
322+
}
323+
324+
// tempPDBName builds a DNS-label-safe PDB name. Long controller names are
325+
// truncated so the final name (including the suffix) fits the 63-char limit.
326+
func tempPDBName(kind, controllerName string) string {
327+
prefix := strings.ToLower(kind) + "-" + controllerName
328+
if len(prefix)+len(pdbNameSuffix) > validation.DNS1123LabelMaxLength {
329+
prefix = prefix[:validation.DNS1123LabelMaxLength-len(pdbNameSuffix)]
330+
}
331+
return prefix + pdbNameSuffix
332+
}
333+
334+
func formatSelector(s *metav1.LabelSelector) string {
335+
if s == nil {
336+
return "<nil>"
337+
}
338+
parts := lo.MapToSlice(s.MatchLabels, func(k, v string) string {
339+
return k + "=" + v
340+
})
341+
return strings.Join(parts, ",")
16342
}

0 commit comments

Comments
 (0)