Skip to content

definite-d/opentelemetry-instrumentation-streaq

Repository files navigation

OpenTelemetry streaQ Instrumentation

License: Apache-2.0 Python Versions

OpenTelemetry instrumentation for streaQ, a fast, async, type-safe Python job queue using Redis Streams.

Disclaimer: This is a third-party package maintained independently and is not part of the official OpenTelemetry Python Contrib repository. However, it was derived from the opentelemetry-instrumentation-celery package.

As it is a third-party package, the opentelemetry-bootstrap -a install command will not find and install this package for streaQ projects. It must be manually added to your project dependencies, before you can then use opentelemetry-instrument.

Features

  • Distributed Tracing: Automatically creates producer and consumer spans for task queue operations
  • Context Propagation: Seamlessly propagates trace context from task producers to workers
  • Semantic Conventions: Follows OpenTelemetry messaging semantic conventions
  • Redis Integration: Captures Redis connection attributes and messaging system details
  • Error Tracking: Records exceptions and task failures with full stack traces

Installation

pip install opentelemetry-instrumentation-streaq

Or with the optional instruments dependency:

pip install opentelemetry-instrumentation-streaq[instruments]

Requirements

  • Python 3.10+
  • streaq >= 6.4.0, < 7.0
  • OpenTelemetry API ~= 1.12

Usage

Quick Start

Create a worker module (worker.py):

from streaq import Worker
from opentelemetry.instrumentation.streaq import StreaqInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# Instrument streaQ
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
StreaqInstrumentor().instrument(tracer_provider=tracer_provider)

worker = Worker(redis_url="redis://localhost")

@worker.task
async def my_task(data: str) -> str:
    return f"Processed: {data}"

Run the worker:

streaq run worker:worker

Queue a task (script.py):

from anyio import run
from worker import worker, my_task

async def main():
    async with worker:
        await my_task.enqueue("hello")

run(main)

With Custom Tracer Provider

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.instrumentation.streaq import StreaqInstrumentor

provider = TracerProvider()
StreaqInstrumentor().instrument(tracer_provider=provider)

Combining with Manual Instrumentation

from opentelemetry import trace

@worker.task
async def my_task(data: str) -> str:
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("business_logic"):
        return f"Processed: {data}"

Disabling Instrumentation

from opentelemetry.instrumentation.utils import disable_instrumentation

disable_instrumentation("streaq")

How It Works

The instrumentation patches streaQ at two key points:

  1. Producer Side (Task._enqueue): Creates PRODUCER spans when tasks are enqueued and injects trace context into task metadata.
  2. Consumer Side (Worker.run_task): Extracts trace context and creates CONSUMER spans when tasks are processed by workers.

This ensures complete end-to-end tracing across the distributed task queue.

Span Attributes

Producer Span Attributes

These attributes are captured when tasks are enqueued:

Attribute Type Description
messaging.system string Always "redis"
messaging.operation.type string Always "send"
messaging.destination.name string The queue name (e.g., "normal")
messaging.message.id string Unique message identifier
messaging.operation.name string Name of the task function
streaq.task.max_retries int Maximum retry attempts
streaq.task.delay_ms int Task delay in milliseconds
streaq.task.timeout_ms int Task timeout in milliseconds
streaq.task.ttl_ms int Task TTL in milliseconds
streaq.task.expire_ms int Task expiration in milliseconds
streaq.task.unique boolean Whether task is unique
streaq.task.dependencies string[] Dependency task IDs
streaq.task.crontab string Crontab schedule (if scheduled)
streaq.task.scheduled_time string Scheduled execution time (if delayed)

Consumer Span Attributes

These attributes are captured when tasks are executed:

Attribute Type Description
messaging.system string Always "redis"
messaging.operation.type string Always "process"
messaging.destination.name string The queue name
messaging.message.id string Unique message identifier
messaging.operation.name string Name of the task function
messaging.consumer.id string Worker consumer identifier
streaq.task.retry_count int Current retry attempt
streaq.task.timeout_ms int Task timeout in milliseconds
error.type string Exception class name (on failure only)

Completion Attributes

These attributes are added to consumer spans after task execution:

Attribute Type Description
streaq.task.success boolean Whether task execution succeeded
streaq.task.execution_duration_ms int Task execution duration in milliseconds
streaq.task.start_time string Task start time (ISO format)
streaq.task.finish_time string Task finish time (ISO format)
streaq.task.result_ttl int Result TTL in milliseconds

API Reference

StreaqInstrumentor

class StreaqInstrumentor(BaseInstrumentor)

The main instrumentor class for streaQ.

Methods

instrument(**kwargs)

Enable streaQ instrumentation.

Parameters:

  • tracer_provider (optional): Custom tracer provider. If not provided, uses the global provider.
  • meter_provider (optional): Custom meter provider for metrics (future).
uninstrument()

Disable streaQ instrumentation and remove all patches.

Example

from opentelemetry.instrumentation.streaq import StreaqInstrumentor

# Enable instrumentation
StreaqInstrumentor().instrument()

# ... your streaQ code ...

# Disable instrumentation
StreaqInstrumentor().uninstrument()

Utilities

inject_metadata(task_kwargs, metadata)

Inject trace context metadata into task kwargs.

from opentelemetry.instrumentation.streaq.utils import inject_metadata

inject_metadata(task_kwargs, {"traceparent": "00-...", "tracestate": "..."})

extract_metadata(task_kwargs)

Extract trace context metadata from task kwargs.

from opentelemetry.instrumentation.streaq.utils import extract_metadata

metadata = extract_metadata(task_kwargs)
# Returns: {"traceparent": "00-...", "tracestate": "..."}

OTEL_METADATA_KEY

The key used to store trace context in task kwargs ("__otel_metadata").

License

Licensed under the Apache License 2.0. See LICENSE for details.

References

Releases

No releases published

Packages

 
 
 

Contributors

Languages