Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ed19fe4
feat(api): update generated dataset media API
wochinge Jun 15, 2026
cd791b6
feat(datasets): support media references
wochinge Jun 15, 2026
a9a4a96
fix(datasets): address media review feedback
wochinge Jun 15, 2026
2c6718a
fix(datasets): refine media review fixes
wochinge Jun 16, 2026
592f9de
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 16, 2026
3832a4e
fix(datasets): clean up media review follow-ups
wochinge Jun 16, 2026
88905ce
fix(datasets): round-trip resolved media references
wochinge Jun 16, 2026
d7eb3ed
fix(datasets): process media inside tuples and sets
wochinge Jun 16, 2026
7db0fe7
fix(datasets): align media reference field with expectedOutput API re…
wochinge Jun 17, 2026
a2cba12
fix(datasets): drop tuple media support to avoid namedtuple breakage
wochinge Jun 17, 2026
0e2ea09
fix(media): honor per-client httpx config for media reference fetches
wochinge Jun 17, 2026
310f692
feat(datasets): scope media uploads to their dataset item
wochinge Jun 19, 2026
dcb3fc7
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 19, 2026
6c0d12c
fix(datasets): resolve dataset id once per create_dataset_item
wochinge Jun 19, 2026
d4e08f2
fix(datasets): url-encode dataset name and fix media upload job fixture
wochinge Jun 19, 2026
e40e8ea
refactor(datasets): collect media in one pass, resolve dataset id lazily
wochinge Jun 19, 2026
4e0a12d
refactor(media): rename url_is_expired to is_url_expired
wochinge Jun 22, 2026
378f767
feat(datasets): always resolve dataset item media references
wochinge Jun 22, 2026
47ca159
refactor(datasets): hand-roll media reference JSONPath, drop jsonpath-ng
wochinge Jun 22, 2026
6ad46e5
test(media): use fixed timestamps so is_url_expired ids are xdist-stable
wochinge Jun 22, 2026
5ad0c3d
refactor(datasets): namespace json_path import, drop stale -OO comment
wochinge Jun 22, 2026
b21b83a
Merge branch 'main' into feature/lfe-10289-python-sdk-changes
wochinge Jun 23, 2026
ac89d9a
chore(api): take dataset/session generated code from main
wochinge Jun 23, 2026
752a718
Merge remote-tracking branch 'origin/main' into feature/lfe-10289-pyt…
wochinge Jun 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions langfuse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LangfuseTool,
)
from ._version import __version__
from .media import LangfuseMedia, LangfuseMediaReference
from .span_filter import (
KNOWN_LLM_INSTRUMENTATION_SCOPE_PREFIXES,
is_default_export_span,
Expand All @@ -49,6 +50,8 @@

__all__ = [
"Langfuse",
"LangfuseMedia",
"LangfuseMediaReference",
"get_client",
"observe",
"propagate_attributes",
Expand Down
165 changes: 159 additions & 6 deletions langfuse/_client/client.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import backoff
import httpx
from jsonpath_ng.ext import parse as parse_jsonpath # type: ignore[import-untyped]
from opentelemetry import context as otel_context_api
from opentelemetry import trace as otel_trace_api
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
Expand Down Expand Up @@ -94,6 +95,7 @@
CreateTextPromptRequest,
Dataset,
DatasetItem,
DatasetItemMediaReferenceField,
DatasetRunWithItems,
DatasetStatus,
DeleteDatasetRunResponse,
Expand Down Expand Up @@ -126,7 +128,7 @@
_run_task,
)
from langfuse.logger import langfuse_logger
from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference
from langfuse.model import (
ChatMessageDict,
ChatMessageWithPlaceholdersDict,
Expand Down Expand Up @@ -2322,15 +2324,17 @@ def get_dataset(
*,
fetch_items_page_size: Optional[int] = 50,
version: Optional[datetime] = None,
resolve_media_references: bool = False,
Comment thread
wochinge marked this conversation as resolved.
Outdated
) -> "DatasetClient":
"""Fetch a dataset by its name.

Args:
name (str): The name of the dataset to fetch.
fetch_items_page_size (Optional[int]): All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version (Optional[datetime]): Retrieve dataset items as they existed at this specific point in time (UTC).
name: The name of the dataset to fetch.
fetch_items_page_size: All items of the dataset will be fetched in chunks of this size. Defaults to 50.
version: Retrieve dataset items as they existed at this specific point in time (UTC).
If provided, returns the state of items at the specified UTC timestamp.
If not provided, returns the latest version. Must be a timezone-aware datetime object in UTC.
resolve_media_references: If true, resolve media reference strings in dataset items to LangfuseMediaReference objects.

Returns:
DatasetClient: The dataset with the given name.
Expand All @@ -2339,7 +2343,7 @@ def get_dataset(
langfuse_logger.debug(f"Getting datasets {name}")
dataset = self.api.datasets.get(dataset_name=self._url_encode(name))

dataset_items = []
dataset_items: List[DatasetItem] = []
page = 1

while True:
Expand All @@ -2348,8 +2352,16 @@ def get_dataset(
page=page,
limit=fetch_items_page_size,
version=version,
include_media_references=resolve_media_references or None,
Comment thread
wochinge marked this conversation as resolved.
Outdated
)
dataset_items.extend(
[
self._hydrate_dataset_item_media_references(item)
for item in new_items.data
]
if resolve_media_references
else new_items.data
)
dataset_items.extend(new_items.data)

if new_items.meta.total_pages <= page:
break
Expand Down Expand Up @@ -3355,6 +3367,17 @@ def create_dataset_item(
try:
langfuse_logger.debug(f"Creating dataset item for dataset {dataset_name}")

uploaded_media_ids: set[str] = set()
input = self._process_dataset_item_media(
data=input, uploaded_media_ids=uploaded_media_ids
)
expected_output = self._process_dataset_item_media(
data=expected_output, uploaded_media_ids=uploaded_media_ids
)
metadata = self._process_dataset_item_media(
data=metadata, uploaded_media_ids=uploaded_media_ids
)

result = self.api.dataset_items.create(
dataset_name=dataset_name,
input=input,
Expand All @@ -3371,6 +3394,136 @@ def create_dataset_item(
handle_fern_exception(e)
raise e

def _process_dataset_item_media(
self, *, data: Any, uploaded_media_ids: set[str]
) -> Any:
if self._resources is None:
return data

max_levels = 10

def _process_data_recursively(
data: Any, level: int, ancestor_container_ids: set[int]
) -> Any:
# Avoid jsonpath-ng here: dataset writes should keep working
# under python -OO where parser docstrings may be stripped.
Comment thread
wochinge marked this conversation as resolved.
Outdated
if isinstance(data, LangfuseMedia):
return self._upload_dataset_item_media(
media=data, uploaded_media_ids=uploaded_media_ids
)

if isinstance(data, LangfuseMediaReference):
return data.reference_string if data.reference_string else data

# Tuples are intentionally excluded: namedtuple subclasses can't be
# rebuilt from an iterable, so media inside them is left untouched.
if not isinstance(data, (list, set, frozenset, dict)):
return data
Comment thread
wochinge marked this conversation as resolved.

# Container ids only protect against recursive cycles; media upload
# dedupe is handled by uploaded_media_ids.
data_id = id(data)
if data_id in ancestor_container_ids or level > max_levels:
return data

next_ancestor_container_ids = ancestor_container_ids | {data_id}

if isinstance(data, (list, set, frozenset)):
processed = (
_process_data_recursively(
item, level + 1, next_ancestor_container_ids
)
for item in data
)
return type(data)(processed)

return {
key: _process_data_recursively(
value, level + 1, next_ancestor_container_ids
Comment thread
wochinge marked this conversation as resolved.
)
for key, value in data.items()
}

return _process_data_recursively(data, 1, set())

def _upload_dataset_item_media(
self, *, media: LangfuseMedia, uploaded_media_ids: set[str]
) -> str:
reference_string = media._reference_string
media_id = media._media_id

if reference_string is None or media_id is None:
raise ValueError("Cannot create dataset item with invalid LangfuseMedia.")

if media_id not in uploaded_media_ids:
assert self._resources is not None
self._resources._media_manager._upload_media_sync(media=media)
uploaded_media_ids.add(media_id)

return reference_string

def _hydrate_dataset_item_media_references(self, item: DatasetItem) -> DatasetItem:
media_references = item.media_references or []
if not media_references:
return item

# Map the API enum member to the snake_case model attribute so this keeps
# working regardless of the enum's wire value (e.g. "expectedOutput").
attr_by_field = {
DatasetItemMediaReferenceField.INPUT: "input",
DatasetItemMediaReferenceField.EXPECTED_OUTPUT: "expected_output",
DatasetItemMediaReferenceField.METADATA: "metadata",
}
hydrated_fields = {
"input": item.input,
"expected_output": item.expected_output,
"metadata": item.metadata,
}

for media_reference in media_references:
media = media_reference.media
if media is None:
continue

field = attr_by_field.get(media_reference.field)
if field is None:
continue

replacement = LangfuseMediaReference(
media_id=media.media_id,
content_type=media.content_type,
url=media.url,
url_expiry=media.url_expiry,
content_length=media.content_length,
reference_string=media_reference.reference_string,
)
Comment thread
claude[bot] marked this conversation as resolved.
hydrated_fields[field] = self._replace_json_path_value(
value=hydrated_fields[field],
json_path=media_reference.json_path,
replacement=replacement,
)

return item.model_copy(
update={
"input": hydrated_fields["input"],
"expected_output": hydrated_fields["expected_output"],
"metadata": hydrated_fields["metadata"],
}
)

def _replace_json_path_value(
self, *, value: Any, json_path: str, replacement: LangfuseMediaReference
) -> Any:
try:
value = parse_jsonpath(json_path).update(value, replacement)
except Exception as e:
langfuse_logger.warning(
f"Failed to hydrate dataset media reference at JSONPath {json_path}",
exc_info=e,
)

return value

def resolve_media_references(
self,
*,
Comment thread
claude[bot] marked this conversation as resolved.
Expand Down
10 changes: 10 additions & 0 deletions langfuse/_client/resource_manager.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@
_instances: Dict[str, "LangfuseResourceManager"] = {}
_lock = threading.RLock()

@classmethod
def get_singleton_httpx_client(cls) -> Optional[httpx.Client]:
with cls._lock:
instances = list(cls._instances.values())

if not instances:
return None

return instances[0].httpx_client

Check warning on line 90 in langfuse/_client/resource_manager.py

View check run for this annotation

Claude / Claude Code Review

get_singleton_httpx_client picks wrong instance in multi-client setups

`LangfuseMediaReference.fetch_bytes/fetch_base64/fetch_data_uri` route through `LangfuseResourceManager.get_singleton_httpx_client()` (resource_manager.py:82-90), which unconditionally returns `list(cls._instances.values())[0].httpx_client` — the first-inserted instance regardless of which client produced the reference. In experimental multi-project setups (`Langfuse(public_key='pk-A', httpx_client=...)` + `Langfuse(public_key='pk-B', httpx_client=...)`), a reference hydrated from client B's `ge
Comment thread
claude[bot] marked this conversation as resolved.

def __new__(
cls,
*,
Expand Down
28 changes: 28 additions & 0 deletions langfuse/_task_manager/media_manager.py
Comment thread
wochinge marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,34 @@ def _process_media(
f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}"
)

def _upload_media_sync(self, *, media: LangfuseMedia) -> None:
if not self._enabled:
raise ValueError("Cannot upload LangfuseMedia while media upload is disabled.")

if (
media._content_length is None
or media._content_type is None
or media._content_sha256_hash is None
or media._content_bytes is None
):
raise ValueError("Cannot upload invalid LangfuseMedia.")

if media._media_id is None:
raise ValueError("Cannot upload LangfuseMedia without media ID.")

upload_media_job = UploadMediaJob(
media_id=media._media_id,
content_bytes=media._content_bytes,
content_type=media._content_type,
content_length=media._content_length,
content_sha256_hash=media._content_sha256_hash,
trace_id=None,
observation_id=None,
field=None,
)

self._process_upload_media_job(data=upload_media_job)

def _process_upload_media_job(
self,
*,
Expand Down
4 changes: 2 additions & 2 deletions langfuse/_task_manager/media_upload_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ class UploadMediaJob(TypedDict):
content_length: int
content_bytes: bytes
content_sha256_hash: str
trace_id: str
trace_id: Optional[str]
observation_id: Optional[str]
field: str
field: Optional[str]
Comment thread
claude[bot] marked this conversation as resolved.
8 changes: 7 additions & 1 deletion langfuse/_utils/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from pydantic import BaseModel

from langfuse.media import LangfuseMedia
from langfuse.media import LangfuseMedia, LangfuseMediaReference

# Attempt to import Serializable
try:
Expand Down Expand Up @@ -62,6 +62,12 @@ def _default_inner(self, obj: Any) -> Any:
or f"<Upload handling failed for LangfuseMedia of type {obj._content_type}>"
)

if (
isinstance(obj, LangfuseMediaReference)
and obj.reference_string is not None
):
return obj.reference_string

# Check if numpy is available and if the object is a numpy scalar
# If so, convert it to a Python scalar using the item() method
if np is not None and isinstance(obj, np.generic):
Comment thread
wochinge marked this conversation as resolved.
Expand Down
9 changes: 9 additions & 0 deletions langfuse/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
CreateScoreValue,
Dataset,
DatasetItem,
DatasetItemMediaReference,
DatasetItemMediaReferenceField,
DatasetItemMediaReferenceMedia,
DatasetRun,
DatasetRunItem,
DatasetRunWithItems,
Expand Down Expand Up @@ -396,6 +399,9 @@
"CreateTextPromptType": ".prompts",
"Dataset": ".commons",
"DatasetItem": ".commons",
"DatasetItemMediaReference": ".commons",
"DatasetItemMediaReferenceField": ".commons",
"DatasetItemMediaReferenceMedia": ".commons",
"DatasetRun": ".commons",
"DatasetRunItem": ".commons",
"DatasetRunWithItems": ".commons",
Expand Down Expand Up @@ -717,6 +723,9 @@ def __dir__():
"CreateTextPromptType",
"Dataset",
"DatasetItem",
"DatasetItemMediaReference",
"DatasetItemMediaReferenceField",
"DatasetItemMediaReferenceMedia",
"DatasetRun",
"DatasetRunItem",
"DatasetRunWithItems",
Expand Down
9 changes: 9 additions & 0 deletions langfuse/api/commons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
CreateScoreValue,
Dataset,
DatasetItem,
DatasetItemMediaReference,
DatasetItemMediaReferenceField,
DatasetItemMediaReferenceMedia,
DatasetRun,
DatasetRunItem,
DatasetRunWithItems,
Expand Down Expand Up @@ -84,6 +87,9 @@
"CreateScoreValue": ".types",
"Dataset": ".types",
"DatasetItem": ".types",
"DatasetItemMediaReference": ".types",
"DatasetItemMediaReferenceField": ".types",
"DatasetItemMediaReferenceMedia": ".types",
"DatasetRun": ".types",
"DatasetRunItem": ".types",
"DatasetRunWithItems": ".types",
Expand Down Expand Up @@ -174,6 +180,9 @@ def __dir__():
"CreateScoreValue",
"Dataset",
"DatasetItem",
"DatasetItemMediaReference",
"DatasetItemMediaReferenceField",
"DatasetItemMediaReferenceMedia",
"DatasetRun",
"DatasetRunItem",
"DatasetRunWithItems",
Expand Down
Loading
Loading