Skip to content

Commit f5ca419

Browse files
feat(openai): native content-block streaming for the Responses API
1 parent b67eab5 commit f5ca419

6 files changed

Lines changed: 406 additions & 17 deletions

File tree

libs/partners/openai/langchain_openai/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from langchain_openai.chat_models._client_utils import StreamChunkTimeoutError
55
from langchain_openai.chat_models._stream_events import (
66
aconvert_openai_completions_stream,
7+
aconvert_openai_responses_stream,
78
convert_openai_completions_stream,
9+
convert_openai_responses_stream,
810
)
911
from langchain_openai.embeddings import AzureOpenAIEmbeddings, OpenAIEmbeddings
1012
from langchain_openai.llms import AzureOpenAI, OpenAI
@@ -19,6 +21,8 @@
1921
"OpenAIEmbeddings",
2022
"StreamChunkTimeoutError",
2123
"aconvert_openai_completions_stream",
24+
"aconvert_openai_responses_stream",
2225
"convert_openai_completions_stream",
26+
"convert_openai_responses_stream",
2327
"custom_tool",
2428
]

libs/partners/openai/langchain_openai/chat_models/_stream_events.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434
# Bound `BaseChatOpenAI._convert_chunk_to_generation_chunk`.
3535
MakeChunk = Callable[..., "ChatGenerationChunk | None"]
3636

37+
# Bound `_convert_responses_chunk_to_generation_chunk`:
38+
# (chunk, idx, out_idx, sub_idx, *, schema, metadata, has_reasoning, output_version)
39+
# -> (idx, out_idx, sub_idx, ChatGenerationChunk | None)
40+
ConvertResponsesChunk = Callable[
41+
..., "tuple[int, int, int, ChatGenerationChunk | None]"
42+
]
43+
3744

3845
def _message_start(
3946
message_id: str | None, model: str | None, provider: str
@@ -160,3 +167,156 @@ async def aconvert_openai_completions_stream(
160167
for ev in tracker.finish_all():
161168
yield ev
162169
yield build_message_finish(usage=usage, response_metadata=response_metadata)
170+
171+
172+
def convert_openai_responses_stream(
173+
raw: Iterator[Any],
174+
convert_chunk: ConvertResponsesChunk,
175+
*,
176+
schema: Any = None,
177+
output_version: str | None = None,
178+
message_id: str | None = None,
179+
provider: str = "openai",
180+
) -> Iterator[MessagesData]:
181+
"""Convert a raw OpenAI Responses API event stream to protocol events.
182+
183+
Reuses `_convert_responses_chunk_to_generation_chunk` (injected as
184+
`convert_chunk` to avoid a circular import) for per-event content, threading
185+
its index state. Emits true `content-block-finish` boundaries by closing the
186+
open block when the monotonic `current_index` advances.
187+
188+
Args:
189+
raw: Raw Responses API events.
190+
convert_chunk: `_convert_responses_chunk_to_generation_chunk`.
191+
schema: `response_format` schema, forwarded to `convert_chunk`.
192+
output_version: `self.output_version`, forwarded to `convert_chunk`.
193+
message_id: Left empty by default so the v3 stream's seeded run id stands.
194+
provider: `model_provider` id for downstream reuse.
195+
196+
Yields:
197+
Protocol `MessagesData` lifecycle events.
198+
"""
199+
tracker = BlockStreamTracker()
200+
started = False
201+
current_index = current_output_index = current_sub_index = -1
202+
has_reasoning = False
203+
usage: dict[str, Any] | None = None
204+
response_metadata: dict[str, Any] = {"model_provider": provider}
205+
model: str | None = None
206+
open_index: Any = None
207+
208+
for chunk in raw:
209+
(
210+
current_index,
211+
current_output_index,
212+
current_sub_index,
213+
gen,
214+
) = convert_chunk(
215+
chunk,
216+
current_index,
217+
current_output_index,
218+
current_sub_index,
219+
schema=schema,
220+
metadata={},
221+
has_reasoning=has_reasoning,
222+
output_version=output_version,
223+
)
224+
if gen is None:
225+
continue
226+
msg = gen.message
227+
if model is None:
228+
model = (msg.response_metadata or {}).get("model_name") or (
229+
msg.response_metadata or {}
230+
).get("model")
231+
if not started:
232+
started = True
233+
yield _message_start(message_id, model, provider)
234+
if "reasoning" in msg.additional_kwargs:
235+
has_reasoning = True
236+
for key, block in iter_protocol_blocks(msg):
237+
if open_index is not None and key != open_index:
238+
# Monotonic index advanced: the previous block is complete.
239+
yield from tracker.finish_block(open_index)
240+
yield from tracker.feed(key, block)
241+
open_index = key
242+
usage_metadata = getattr(msg, "usage_metadata", None)
243+
if usage_metadata:
244+
usage = accumulate_usage(usage, usage_metadata)
245+
merged = {**(gen.generation_info or {}), **(msg.response_metadata or {})}
246+
if merged:
247+
response_metadata.update(merged)
248+
response_metadata["model_provider"] = provider
249+
250+
if not started:
251+
return
252+
yield from tracker.finish_all()
253+
yield build_message_finish(usage=usage, response_metadata=response_metadata)
254+
255+
256+
async def aconvert_openai_responses_stream(
257+
raw: AsyncIterator[Any],
258+
convert_chunk: ConvertResponsesChunk,
259+
*,
260+
schema: Any = None,
261+
output_version: str | None = None,
262+
message_id: str | None = None,
263+
provider: str = "openai",
264+
) -> AsyncIterator[MessagesData]:
265+
"""Async twin of `convert_openai_responses_stream`. `convert_chunk` is sync."""
266+
tracker = BlockStreamTracker()
267+
started = False
268+
current_index = current_output_index = current_sub_index = -1
269+
has_reasoning = False
270+
usage: dict[str, Any] | None = None
271+
response_metadata: dict[str, Any] = {"model_provider": provider}
272+
model: str | None = None
273+
open_index: Any = None
274+
275+
async for chunk in raw:
276+
(
277+
current_index,
278+
current_output_index,
279+
current_sub_index,
280+
gen,
281+
) = convert_chunk(
282+
chunk,
283+
current_index,
284+
current_output_index,
285+
current_sub_index,
286+
schema=schema,
287+
metadata={},
288+
has_reasoning=has_reasoning,
289+
output_version=output_version,
290+
)
291+
if gen is None:
292+
continue
293+
msg = gen.message
294+
if model is None:
295+
model = (msg.response_metadata or {}).get("model_name") or (
296+
msg.response_metadata or {}
297+
).get("model")
298+
if not started:
299+
started = True
300+
yield _message_start(message_id, model, provider)
301+
if "reasoning" in msg.additional_kwargs:
302+
has_reasoning = True
303+
for key, block in iter_protocol_blocks(msg):
304+
if open_index is not None and key != open_index:
305+
for ev in tracker.finish_block(open_index):
306+
yield ev
307+
for ev in tracker.feed(key, block):
308+
yield ev
309+
open_index = key
310+
usage_metadata = getattr(msg, "usage_metadata", None)
311+
if usage_metadata:
312+
usage = accumulate_usage(usage, usage_metadata)
313+
merged = {**(gen.generation_info or {}), **(msg.response_metadata or {})}
314+
if merged:
315+
response_metadata.update(merged)
316+
response_metadata["model_provider"] = provider
317+
318+
if not started:
319+
return
320+
for ev in tracker.finish_all():
321+
yield ev
322+
yield build_message_finish(usage=usage, response_metadata=response_metadata)

libs/partners/openai/langchain_openai/chat_models/base.py

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@
154154
)
155155
from langchain_openai.chat_models._stream_events import (
156156
aconvert_openai_completions_stream,
157+
aconvert_openai_responses_stream,
157158
convert_openai_completions_stream,
159+
convert_openai_responses_stream,
158160
)
159161
from langchain_openai.data._profiles import _PROFILES
160162

@@ -1892,22 +1894,24 @@ def _stream_chat_model_events(
18921894
message_id: str | None = None,
18931895
**kwargs: Any,
18941896
) -> Iterator[MessagesData]:
1895-
"""Emit OpenAI-native content-block events for the Chat Completions path.
1897+
"""Emit OpenAI-native content-block events for Completions and Responses.
18961898
1897-
Defers to the compat bridge for cases this converter does not yet
1898-
specialize: the Responses API, structured output (`response_format`),
1899-
and raw-header mode. Detected by core's `_iter_v2_events`.
1899+
The standard Completions and Responses API paths run through their
1900+
native converters. Structured output (`response_format`) and raw-header
1901+
mode still defer to the compat bridge over `_stream`, since those keep
1902+
the final-completion handling only `_stream` performs. Detected by
1903+
core's `_iter_v2_events`.
19001904
"""
1901-
# Responses API / structured output / raw headers: bridge over `_stream`,
1902-
# which (on `ChatOpenAI`) routes to the Responses path when applicable.
1905+
use_responses = self._use_responses_api({**kwargs, **self.model_kwargs})
19031906
# `response_format` may arrive via call kwargs or be baked into
19041907
# `model_kwargs`; both fold into the payload, so check both.
1905-
if (
1906-
self._use_responses_api({**kwargs, **self.model_kwargs})
1907-
or kwargs.get("response_format") is not None
1908+
has_response_format = (
1909+
kwargs.get("response_format") is not None
19081910
or self.model_kwargs.get("response_format") is not None
1909-
or self.include_response_headers
1910-
):
1911+
)
1912+
# Structured output and raw-header mode keep the post-loop /
1913+
# final-completion handling that only `_stream` performs — defer those.
1914+
if has_response_format or self.include_response_headers:
19111915
# Forward kwargs untouched (as core's `_iter_v2_events` would):
19121916
# `_stream` handles `stream_usage` itself, and the Responses path
19131917
# rejects a stray `stream_usage` kwarg, so we must not inject one.
@@ -1921,6 +1925,35 @@ def _stream_chat_model_events(
19211925
message_id=message_id,
19221926
)
19231927
return
1928+
if use_responses:
1929+
self._ensure_sync_client_available()
1930+
kwargs["stream"] = True
1931+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
1932+
try:
1933+
with self.root_client.responses.create(**payload) as response:
1934+
for event in convert_openai_responses_stream(
1935+
response,
1936+
_convert_responses_chunk_to_generation_chunk,
1937+
# Always None here: the `response_format` (structured
1938+
# output) path is handled by the bridge branch above.
1939+
schema=None,
1940+
output_version=self.output_version,
1941+
message_id=message_id,
1942+
):
1943+
if (
1944+
run_manager is not None
1945+
and event["event"] == "content-block-delta"
1946+
and event["delta"].get("type") == "text-delta"
1947+
):
1948+
run_manager.on_llm_new_token(
1949+
str(event["delta"].get("text", ""))
1950+
)
1951+
yield event
1952+
except openai.BadRequestError as e:
1953+
_handle_openai_bad_request(e)
1954+
except openai.APIError as e:
1955+
_handle_openai_api_error(e)
1956+
return
19241957

19251958
self._ensure_sync_client_available()
19261959
kwargs["stream"] = True
@@ -1964,12 +1997,14 @@ async def _astream_chat_model_events(
19641997
**kwargs: Any,
19651998
) -> AsyncIterator[MessagesData]:
19661999
"""Async twin of `_stream_chat_model_events`."""
1967-
if (
1968-
self._use_responses_api({**kwargs, **self.model_kwargs})
1969-
or kwargs.get("response_format") is not None
2000+
use_responses = self._use_responses_api({**kwargs, **self.model_kwargs})
2001+
has_response_format = (
2002+
kwargs.get("response_format") is not None
19702003
or self.model_kwargs.get("response_format") is not None
1971-
or self.include_response_headers
1972-
):
2004+
)
2005+
# Structured output and raw-header mode keep the post-loop /
2006+
# final-completion handling that only `_astream` performs — defer those.
2007+
if has_response_format or self.include_response_headers:
19732008
# Forward kwargs untouched (as core's `_aiter_v2_events` would):
19742009
# `_astream` handles `stream_usage` itself, and the Responses path
19752010
# rejects a stray `stream_usage` kwarg, so we must not inject one.
@@ -1984,6 +2019,42 @@ async def _astream_chat_model_events(
19842019
):
19852020
yield event
19862021
return
2022+
if use_responses:
2023+
kwargs["stream"] = True
2024+
payload = self._get_request_payload(messages, stop=stop, **kwargs)
2025+
try:
2026+
response = await self.root_async_client.responses.create(**payload)
2027+
async with response as stream:
2028+
# Mirror `_astream_responses`: apply per-chunk stall
2029+
# protection before the converter consumes the stream.
2030+
timed_stream = _astream_with_chunk_timeout(
2031+
stream,
2032+
self.stream_chunk_timeout,
2033+
model_name=self.model_name,
2034+
)
2035+
async for event in aconvert_openai_responses_stream(
2036+
timed_stream,
2037+
_convert_responses_chunk_to_generation_chunk,
2038+
# Always None here: the `response_format` (structured
2039+
# output) path is handled by the bridge branch above.
2040+
schema=None,
2041+
output_version=self.output_version,
2042+
message_id=message_id,
2043+
):
2044+
if (
2045+
run_manager is not None
2046+
and event["event"] == "content-block-delta"
2047+
and event["delta"].get("type") == "text-delta"
2048+
):
2049+
await run_manager.on_llm_new_token(
2050+
str(event["delta"].get("text", ""))
2051+
)
2052+
yield event
2053+
except openai.BadRequestError as e:
2054+
_handle_openai_bad_request(e)
2055+
except openai.APIError as e:
2056+
_handle_openai_api_error(e)
2057+
return
19872058

19882059
kwargs["stream"] = True
19892060
stream_usage = self._should_stream_usage(

0 commit comments

Comments
 (0)