balancer/pickfirst: fix flaky metrics tests by awaiting async metrics#9020
balancer/pickfirst: fix flaky metrics tests by awaiting async metrics#9020mbissa wants to merge 6 commits into
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #9020 +/- ##
==========================================
- Coverage 83.28% 83.18% -0.11%
==========================================
Files 418 419 +1
Lines 33741 33895 +154
==========================================
+ Hits 28102 28194 +92
- Misses 4232 4274 +42
- Partials 1407 1427 +20
🚀 New features to boost your workflow:
|
| t.Fatalf("EmptyCall() failed: %v", err) | ||
| } | ||
|
|
||
| if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_succeeded"); got != 1 { |
There was a problem hiding this comment.
All pick_first metrics (I'm not sure about subchannel metrics) MUST be emitted before an RPC returns, as the LB policy is responsible for making the channel active (which is what allows the RPC to succeed). If the metrics aren't emitted in this order, it likely indicates a bug causing race conditions within the channel. Because of this, we shouldn't need to change the validation logic. Could you please revert the changes to the pick_first metrics?
There was a problem hiding this comment.
reverted pick first and metrics that do not need waiting out of the new construct.
| return gotMetrics | ||
| } | ||
|
|
||
| func awaitMetric(ctx context.Context, t *testing.T, tmr *stats.TestMetricsRecorder, name string, want float64) { |
There was a problem hiding this comment.
Instead of a frequent polling, we should use channels to efficiently block the test until the expected metric is available. The TestMetricsRecorder (TMR) already has a method that allows waiting for a particular metric to be emitted:
grpc-go/internal/testutils/stats/test_metrics_recorder.go
Lines 160 to 170 in c20fba0
However, it currently has a few issues that make it difficult to use:
- Strict Ordering: It fails if the very next metric emitted doesn't match the expected one. When writing e2e tests, it is difficult to account for all metrics and the exact order in which they will be emitted.
- Data Loss: It consumes and discards buffered metrics while waiting.
Currently, the only users of these WaitForX methods seem to be the metrics recorder list tests. I think we should improve this test utility to make it more ergonomic for other e2e tests to use.
I propose we combine the logic of the Metric() and WaitForX methods.
- To address the first issue, we can update the
WaitForXmethod to keep reading from the channel until either the context expires or a metric with a matching name is found for comparision. - To address the second issue, we can store all read
MetricsDatain the TMR's inner map. WhenWaitForXis called, it can first check this map for the required data; if it's found, it can proceed with the match directly without waiting on the channel.
What do you think?
There was a problem hiding this comment.
sounds attempt in correct direction
What do you think about below @arjan-bal :
- Instead of WaitForX consuming the channel directly, the TMR will internally "pump" all incoming metrics into a thread-safe map.
- The internal map will store a slice of values (map[string][]float64) instead of just the latest value. This allows tests to verify state transitions (e.g., checking that a counter hit 1 and then 2) and nothing is discarded and users would know that in case of races if more than one values can come then they want to read the first one or the last one.
There was a problem hiding this comment.
For point 2, the usual way to verify transitions is to pause the test, verify the current value of the metric, trigger the event that causes the metric to update, and then verify the new value. The test controls the events that cause the metrics to be emitted, instead of running to completion and verifying the metrics afterward. This seems like a stricter way to verify metrics. Is there a case where doing this is currently difficult?
There was a problem hiding this comment.
We do follow the "Trigger -> Verify -> Trigger -> Verify" pattern (e.g., connect -> verify 1 -> disconnect -> verify -1).
Even after the connection state transitions, the test thread can run ahead of the metrics recorder.
Because of this asynchronous delay, we still need the WaitForX methods to block until the metric is recorded. Furthermore, under CPU load, the background goroutines can record multiple values (e.g., both 1 and -1) back-to-back before the test thread is rescheduled to verify. The slice-based map prevents the later value from overwriting the earlier one, ensuring both transitions are verified in the correct sequence.
Fixed a race condition in pickfirst metrics tests where the assertions would immediately poll the stats.TestMetricsRecorder. Depending on the goroutine scheduling, asynchronous metrics like 'grpc.subchannel.open_connections' would occasionally not be fully recorded before the test checked them, causing flaky failures. This introduces an awaitMetric helper to wait for the metrics to reach their expected values.
… in TestMetricsRecorder
d5d8759 to
7d50135
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the TestMetricsRecorder in internal/testutils/stats to record and track all metric updates sequentially rather than only storing the most recent value. It also updates pickfirst balancer metrics tests to use the new sequential waiting helpers. A critical race condition was identified in the Record* methods of TestMetricsRecorder where metrics are sent to channels before being appended to the internal data slice under lock, which can cause waitForMetric to miss updates and block indefinitely.
|
|
||
| data: make(map[string]float64), | ||
| data: make(map[string][]MetricsData), | ||
| consumed: make(map[string]int), |
There was a problem hiding this comment.
Instead of using the consumed map to track the head pointer of the queue, we can remove the first element by slicing:
data[name] = data[name][1:]| @@ -68,15 +71,27 @@ | |||
| func (r *TestMetricsRecorder) Metric(name string) (float64, bool) { | |||
There was a problem hiding this comment.
We should mention in the godoc that this doesn't consume the metric value.
| intCountCh *testutils.Channel | ||
| floatCountCh *testutils.Channel | ||
| intHistoCh *testutils.Channel | ||
| floatHistoCh *testutils.Channel | ||
| intGaugeCh *testutils.Channel | ||
| intUpDownCountCh *testutils.Channel |
There was a problem hiding this comment.
These channels are used only to signal that a metric has been received; the actual metric is read directly from the data map. We can replace them with a single channel. The only side effect is that the loop in waitForMetric might unblock a few extra times for different metric types. However, it will simply check the data map and block again, so correctness is maintained.
| // WaitForInt64CountIncr waits for an int64 count metric to be recorded and | ||
| // verifies that the recorded metrics data incr matches the expected incr. | ||
| // Returns an error if failed to wait or received wrong data. | ||
| func (r *TestMetricsRecorder) WaitForInt64CountIncr(ctx context.Context, incrWant int64) error { |
There was a problem hiding this comment.
We should replace WaitForInt64CountIncr with WaitForInt64Count, which checks the metric type, not just the value. This would allow us to use a single channel for signaling.
| got, err := r.waitForMetric(ctx, r.intUpDownCountCh, want.Handle.Name) | ||
| if err != nil { | ||
| return fmt.Errorf("timeout waiting for int64UpDownCount") | ||
| } |
There was a problem hiding this comment.
We should propagate the error instead of returning a fresh error. Same for other methods.
| // WaitForInt64UpDownCount waits for an int64 up-down count metric and | ||
| // verifies that it matches want. Returns an error if failed to wait or | ||
| // received wrong data. | ||
| func (r *TestMetricsRecorder) WaitForInt64UpDownCount(ctx context.Context, want MetricsData) error { |
There was a problem hiding this comment.
We should mention that this method consumes the metrics until finds the expected metric in the godoc. Same for other methods.
| func waitForInt64Count(ctx context.Context, t *testing.T, tmr *stats.TestMetricsRecorder, name string, want int64) { | ||
| t.Helper() | ||
| err := tmr.WaitForInt64Count(ctx, stats.MetricsData{ | ||
| Handle: &estats.MetricDescriptor{Name: name}, | ||
| IntIncr: want, | ||
| }) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } |
There was a problem hiding this comment.
style: Assertion helpers are not considered idiomatic in Go, see go/go-style/best-practices#test-functions
Instead, you can should inline the calls, even if it results in a little repetition.
Fixes #9012
Fixed a race condition in pickfirst metrics tests where the assertions would immediately poll the stats.TestMetricsRecorder. Depending on the goroutine scheduling, capturing of metrics like 'grpc.subchannel.open_connections' would occasionally not be fully recorded before the test checked them, causing flaky failures. This introduces an awaitMetric helper to wait for the metrics to reach their expected values.
RELEASE NOTES: none