Skip to content

Commit d5ba47d

Browse files
Conformance finding: worker-versioning no-compatible-worker behavior remains unproved (#200)
1 parent e2530d3 commit d5ba47d

2 files changed

Lines changed: 74 additions & 7 deletions

File tree

src/durable_workflow/client.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3202,20 +3202,33 @@ async def heartbeat_worker(
32023202
body["heartbeat_interval_seconds"] = heartbeat_interval_seconds
32033203
return await self._request("POST", "/worker/heartbeat", worker=True, json=body)
32043204

3205-
async def poll_workflow_task(
3206-
self, *, worker_id: str, task_queue: str, timeout: float = 35.0
3207-
) -> Any:
3205+
async def poll_workflow_task_response(
3206+
self,
3207+
*,
3208+
worker_id: str,
3209+
task_queue: str,
3210+
timeout: float = 35.0,
3211+
build_id: str | None = None,
3212+
history_page_size: int | None = None,
3213+
poll_request_id: str | None = None,
3214+
) -> dict[str, Any]:
32083215
"""Long-poll for the next workflow task on ``task_queue``.
32093216
3210-
Returns the task payload, or ``None`` on poll timeout. Worker-plane
3217+
Returns the full worker-protocol response envelope, including
3218+
``poll_status`` when the server has no task to lease. Worker-plane
32113219
endpoint — most applications use :class:`~durable_workflow.Worker`
32123220
rather than calling this directly.
32133221
"""
32143222
body: dict[str, Any] = {
32153223
"worker_id": worker_id,
32163224
"task_queue": task_queue,
3217-
"poll_request_id": f"wf-poll-{uuid.uuid4().hex}",
3225+
"poll_request_id": poll_request_id or f"wf-poll-{uuid.uuid4().hex}",
32183226
}
3227+
if build_id:
3228+
body["build_id"] = build_id
3229+
if history_page_size is not None:
3230+
body["history_page_size"] = history_page_size
3231+
32193232
for _ in range(2):
32203233
try:
32213234
data = await self._request(
@@ -3224,9 +3237,26 @@ async def poll_workflow_task(
32243237
except httpx.TimeoutException:
32253238
continue
32263239

3227-
return (data or {}).get("task")
3240+
return data if isinstance(data, dict) else {}
32283241

3229-
return None
3242+
return {"task": None, "poll_status": "timeout"}
3243+
3244+
async def poll_workflow_task(
3245+
self, *, worker_id: str, task_queue: str, timeout: float = 35.0
3246+
) -> Any:
3247+
"""Long-poll for the next workflow task on ``task_queue``.
3248+
3249+
Returns the task payload, or ``None`` on poll timeout. Worker-plane
3250+
endpoint — most applications use :class:`~durable_workflow.Worker`
3251+
rather than calling this directly.
3252+
"""
3253+
data = await self.poll_workflow_task_response(
3254+
worker_id=worker_id,
3255+
task_queue=task_queue,
3256+
timeout=timeout,
3257+
)
3258+
3259+
return data.get("task")
32303260

32313261
async def complete_workflow_task(
32323262
self,

tests/test_client.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2270,6 +2270,43 @@ async def test_poll_workflow_task_matches_polyglot_fixture(self, client: Client)
22702270
assert isinstance(request_body["poll_request_id"], str)
22712271
assert request_body["poll_request_id"] != ""
22722272

2273+
@pytest.mark.asyncio
2274+
async def test_poll_workflow_task_response_preserves_no_compatible_status(self, client: Client) -> None:
2275+
response = {
2276+
"task": None,
2277+
"poll_status": "no_compatible_worker",
2278+
}
2279+
2280+
with patch.object(client, "_request", new_callable=AsyncMock, return_value=response) as mock:
2281+
envelope = await client.poll_workflow_task_response(
2282+
worker_id="worker-v2",
2283+
task_queue="queue-1",
2284+
build_id="build-v2",
2285+
history_page_size=100,
2286+
timeout=3.0,
2287+
)
2288+
2289+
assert envelope == response
2290+
request_body = mock.await_args.kwargs["json"]
2291+
assert request_body["worker_id"] == "worker-v2"
2292+
assert request_body["task_queue"] == "queue-1"
2293+
assert request_body["build_id"] == "build-v2"
2294+
assert request_body["history_page_size"] == 100
2295+
assert isinstance(request_body["poll_request_id"], str)
2296+
assert mock.await_args.kwargs["timeout"] == 3.0
2297+
2298+
@pytest.mark.asyncio
2299+
async def test_poll_workflow_task_keeps_returning_none_for_no_compatible_status(self, client: Client) -> None:
2300+
response = {
2301+
"task": None,
2302+
"poll_status": "no_compatible_worker",
2303+
}
2304+
2305+
with patch.object(client, "poll_workflow_task_response", new_callable=AsyncMock, return_value=response):
2306+
task = await client.poll_workflow_task(worker_id="worker-v2", task_queue="queue-1")
2307+
2308+
assert task is None
2309+
22732310
@pytest.mark.asyncio
22742311
async def test_poll_workflow_task_retries_once_with_same_poll_request_id_after_timeout(
22752312
self, client: Client

0 commit comments

Comments
 (0)