OpenTelemetry instrumentation for streaQ, a fast, async, type-safe Python job queue using Redis Streams.
Disclaimer: This is a third-party package maintained independently and is not part of the official OpenTelemetry Python Contrib repository. However, it was derived from the
opentelemetry-instrumentation-celerypackage.As it is a third-party package, the
opentelemetry-bootstrap -a installcommand will not find and install this package for streaQ projects. It must be manually added to your project dependencies, before you can then useopentelemetry-instrument.
- Distributed Tracing: Automatically creates producer and consumer spans for task queue operations
- Context Propagation: Seamlessly propagates trace context from task producers to workers
- Semantic Conventions: Follows OpenTelemetry messaging semantic conventions
- Redis Integration: Captures Redis connection attributes and messaging system details
- Error Tracking: Records exceptions and task failures with full stack traces
pip install opentelemetry-instrumentation-streaqOr with the optional instruments dependency:
pip install opentelemetry-instrumentation-streaq[instruments]- Python 3.10+
- streaq >= 6.4.0, < 7.0
- OpenTelemetry API ~= 1.12
Create a worker module (worker.py):
from streaq import Worker
from opentelemetry.instrumentation.streaq import StreaqInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
# Instrument streaQ
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
StreaqInstrumentor().instrument(tracer_provider=tracer_provider)
worker = Worker(redis_url="redis://localhost")
@worker.task
async def my_task(data: str) -> str:
return f"Processed: {data}"Run the worker:
streaq run worker:workerQueue a task (script.py):
from anyio import run
from worker import worker, my_task
async def main():
async with worker:
await my_task.enqueue("hello")
run(main)from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.instrumentation.streaq import StreaqInstrumentor
provider = TracerProvider()
StreaqInstrumentor().instrument(tracer_provider=provider)from opentelemetry import trace
@worker.task
async def my_task(data: str) -> str:
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("business_logic"):
return f"Processed: {data}"from opentelemetry.instrumentation.utils import disable_instrumentation
disable_instrumentation("streaq")The instrumentation patches streaQ at two key points:
- Producer Side (
Task._enqueue): CreatesPRODUCERspans when tasks are enqueued and injects trace context into task metadata. - Consumer Side (
Worker.run_task): Extracts trace context and createsCONSUMERspans when tasks are processed by workers.
This ensures complete end-to-end tracing across the distributed task queue.
These attributes are captured when tasks are enqueued:
| Attribute | Type | Description |
|---|---|---|
messaging.system |
string | Always "redis" |
messaging.operation.type |
string | Always "send" |
messaging.destination.name |
string | The queue name (e.g., "normal") |
messaging.message.id |
string | Unique message identifier |
messaging.operation.name |
string | Name of the task function |
streaq.task.max_retries |
int | Maximum retry attempts |
streaq.task.delay_ms |
int | Task delay in milliseconds |
streaq.task.timeout_ms |
int | Task timeout in milliseconds |
streaq.task.ttl_ms |
int | Task TTL in milliseconds |
streaq.task.expire_ms |
int | Task expiration in milliseconds |
streaq.task.unique |
boolean | Whether task is unique |
streaq.task.dependencies |
string[] | Dependency task IDs |
streaq.task.crontab |
string | Crontab schedule (if scheduled) |
streaq.task.scheduled_time |
string | Scheduled execution time (if delayed) |
These attributes are captured when tasks are executed:
| Attribute | Type | Description |
|---|---|---|
messaging.system |
string | Always "redis" |
messaging.operation.type |
string | Always "process" |
messaging.destination.name |
string | The queue name |
messaging.message.id |
string | Unique message identifier |
messaging.operation.name |
string | Name of the task function |
messaging.consumer.id |
string | Worker consumer identifier |
streaq.task.retry_count |
int | Current retry attempt |
streaq.task.timeout_ms |
int | Task timeout in milliseconds |
error.type |
string | Exception class name (on failure only) |
These attributes are added to consumer spans after task execution:
| Attribute | Type | Description |
|---|---|---|
streaq.task.success |
boolean | Whether task execution succeeded |
streaq.task.execution_duration_ms |
int | Task execution duration in milliseconds |
streaq.task.start_time |
string | Task start time (ISO format) |
streaq.task.finish_time |
string | Task finish time (ISO format) |
streaq.task.result_ttl |
int | Result TTL in milliseconds |
class StreaqInstrumentor(BaseInstrumentor)The main instrumentor class for streaQ.
Enable streaQ instrumentation.
Parameters:
tracer_provider(optional): Custom tracer provider. If not provided, uses the global provider.meter_provider(optional): Custom meter provider for metrics (future).
Disable streaQ instrumentation and remove all patches.
from opentelemetry.instrumentation.streaq import StreaqInstrumentor
# Enable instrumentation
StreaqInstrumentor().instrument()
# ... your streaQ code ...
# Disable instrumentation
StreaqInstrumentor().uninstrument()Inject trace context metadata into task kwargs.
from opentelemetry.instrumentation.streaq.utils import inject_metadata
inject_metadata(task_kwargs, {"traceparent": "00-...", "tracestate": "..."})Extract trace context metadata from task kwargs.
from opentelemetry.instrumentation.streaq.utils import extract_metadata
metadata = extract_metadata(task_kwargs)
# Returns: {"traceparent": "00-...", "tracestate": "..."}The key used to store trace context in task kwargs ("__otel_metadata").
Licensed under the Apache License 2.0. See LICENSE for details.