Skip to content

Commit bc40e36

Browse files
committed
[CASCL-1386] Implement node drain primitives and ASG eviction
Part of a stack splitting #3026 (too large to review in one piece) into small pieces that each build and pass tests on their own. The command is fully functional only once the whole stack lands.
1 parent 6c79f69 commit bc40e36

6 files changed

Lines changed: 1050 additions & 2 deletions

File tree

cmd/kubectl-datadog/autoscaling/cluster/evict/asg.go

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@ package evict
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"log"
58

9+
awssdk "github.com/aws/aws-sdk-go-v2/aws"
610
"github.com/aws/aws-sdk-go-v2/service/autoscaling"
711
"k8s.io/client-go/kubernetes"
12+
13+
commonaws "github.com/DataDog/datadog-operator/cmd/kubectl-datadog/autoscaling/cluster/common/aws"
814
)
915

1016
type AutoscalingAPI interface {
@@ -14,5 +20,114 @@ type AutoscalingAPI interface {
1420
}
1521

1622
func evictASG(ctx context.Context, clientset kubernetes.Interface, asg AutoscalingAPI, asgName string, nodes []string, drainOpts nodeDrainOptions) error {
17-
panic("TODO: evictASG — implemented in PR https://github.com/DataDog/datadog-operator/pull/3175")
23+
if drainOpts.DryRun {
24+
log.Printf("[dry-run] would suspend AZRebalance and set MinSize=0 on ASG %s", asgName)
25+
} else if err := prepareASGForTermination(ctx, asg, asgName); err != nil {
26+
return fmt.Errorf("prepare ASG %s for termination: %w", asgName, err)
27+
}
28+
29+
// Cordon every node up front so a pod evicted from one node is never
30+
// rescheduled onto another node of the same ASG that is itself about to be
31+
// drained. A node that fails to cordon is left undrained; treat that as a
32+
// drain failure so the ASG keeps its original MaxSize for a re-run instead
33+
// of being locked away with workloads still on it.
34+
cordoned, errs := cordonNodes(ctx, clientset, nodes, drainOpts.DryRun)
35+
36+
for _, node := range cordoned {
37+
nodeName := node.Name
38+
id, hasInstanceID := commonaws.ExtractEC2InstanceID(node)
39+
if !hasInstanceID {
40+
log.Printf("Warning: node %s has unexpected providerID %q; its instance will be terminated by the final scale-to-zero instead", nodeName, node.Spec.ProviderID)
41+
}
42+
if err := drainNode(ctx, clientset, nodeName, drainOpts); err != nil {
43+
errs = append(errs, fmt.Errorf("drain node %s: %w", nodeName, err))
44+
continue // do NOT terminate this instance: workloads are still on it
45+
}
46+
// The node drained cleanly: terminate its instance now, decrementing
47+
// the ASG's desired capacity so it is not relaunched. Nodes with an
48+
// unexpected providerID are left for the final scale-to-zero.
49+
if !hasInstanceID {
50+
continue
51+
}
52+
if drainOpts.DryRun {
53+
log.Printf("[dry-run] would terminate instance %s in ASG %s (decrementing desired capacity)", id, asgName)
54+
continue
55+
}
56+
if err := terminateASGInstance(ctx, asg, id); err != nil {
57+
errs = append(errs, fmt.Errorf("terminate instance %s in ASG %s: %w", id, asgName, err))
58+
}
59+
}
60+
61+
if len(errs) > 0 {
62+
log.Printf("ASG %s: at least one node failed to cordon, drain, or terminate; leaving the ASG at MinSize=0 without locking MaxSize. Re-run after addressing the errors above.", asgName)
63+
return errors.Join(errs...)
64+
}
65+
66+
// Every node drained and its instance was terminated. Lock the ASG at
67+
// min=max=desired=0 so nothing is ever relaunched, and to clean up any
68+
// instance that couldn't be terminated per-node (unexpected providerID).
69+
if drainOpts.DryRun {
70+
log.Printf("[dry-run] would scale ASG %s to min=max=desired=0", asgName)
71+
return nil
72+
}
73+
// All instances are now terminated; only the MaxSize=0 lock remains. A
74+
// crash here would leave the ASG at desired=0 but unlocked and no longer
75+
// rediscoverable by a re-run (see the crash-window note on evictASG).
76+
log.Printf("ASG %s: all nodes drained; locking the ASG at min=max=desired=0.", asgName)
77+
if err := scaleASGToZero(ctx, asg, asgName); err != nil {
78+
errs = append(errs, fmt.Errorf("scale ASG %s to 0: %w", asgName, err))
79+
}
80+
return errors.Join(errs...)
81+
}
82+
83+
// prepareASGForTermination makes the ASG safe for the per-instance termination
84+
// performed during the drain loop:
85+
//
86+
// - AZRebalance is suspended so that decrementing desired capacity one
87+
// instance at a time cannot trigger AZ rebalancing — which would terminate
88+
// a not-yet-drained instance in another availability zone.
89+
// - MinSize is set to 0 so that TerminateInstanceInAutoScalingGroup may
90+
// decrement DesiredCapacity (AWS rejects the decrement while
91+
// MinSize == DesiredCapacity).
92+
func prepareASGForTermination(ctx context.Context, asg AutoscalingAPI, asgName string) error {
93+
if _, err := asg.SuspendProcesses(ctx, &autoscaling.SuspendProcessesInput{
94+
AutoScalingGroupName: awssdk.String(asgName),
95+
ScalingProcesses: []string{"AZRebalance"},
96+
}); err != nil {
97+
return fmt.Errorf("suspend AZRebalance: %w", err)
98+
}
99+
if _, err := asg.UpdateAutoScalingGroup(ctx, &autoscaling.UpdateAutoScalingGroupInput{
100+
AutoScalingGroupName: awssdk.String(asgName),
101+
MinSize: awssdk.Int32(0),
102+
}); err != nil {
103+
return fmt.Errorf("set MinSize=0: %w", err)
104+
}
105+
log.Printf("Prepared ASG %s for termination (AZRebalance suspended, MinSize=0).", asgName)
106+
return nil
107+
}
108+
109+
// terminateASGInstance terminates a single (already drained) instance and
110+
// decrements the ASG's desired capacity so it is not relaunched.
111+
func terminateASGInstance(ctx context.Context, asg AutoscalingAPI, instanceID string) error {
112+
if _, err := asg.TerminateInstanceInAutoScalingGroup(ctx, &autoscaling.TerminateInstanceInAutoScalingGroupInput{
113+
InstanceId: awssdk.String(instanceID),
114+
ShouldDecrementDesiredCapacity: awssdk.Bool(true),
115+
}); err != nil {
116+
return err
117+
}
118+
log.Printf("Terminated instance %s and decremented ASG desired capacity.", instanceID)
119+
return nil
120+
}
121+
122+
func scaleASGToZero(ctx context.Context, asg AutoscalingAPI, asgName string) error {
123+
if _, err := asg.UpdateAutoScalingGroup(ctx, &autoscaling.UpdateAutoScalingGroupInput{
124+
AutoScalingGroupName: awssdk.String(asgName),
125+
MinSize: awssdk.Int32(0),
126+
MaxSize: awssdk.Int32(0),
127+
DesiredCapacity: awssdk.Int32(0),
128+
}); err != nil {
129+
return err
130+
}
131+
log.Printf("Scaled ASG %s to min=max=desired=0.", asgName)
132+
return nil
18133
}

0 commit comments

Comments
 (0)