Skip to content

Fix EmbeddingService blocking event loop by offloading encode to thread#262

Open
dolliecoder wants to merge 1 commit into
AOSSIE-Org:mainfrom
dolliecoder:fix/non-blocking-embedding-service
Open

Fix EmbeddingService blocking event loop by offloading encode to thread#262
dolliecoder wants to merge 1 commit into
AOSSIE-Org:mainfrom
dolliecoder:fix/non-blocking-embedding-service

Conversation

@dolliecoder

@dolliecoder dolliecoder commented Feb 2, 2026

Copy link
Copy Markdown

Fixes #261
Problem:
EmbeddingService.get_embedding() and get_embeddings() call SentenceTransformer.encode() inside async methods.
Since encode() is synchronous and CPU-heavy, it blocks the asyncio event loop and prevents concurrent requests.

Solution:
Moved encode calls to asyncio.to_thread() using a small synchronous helper method.

Result:
Embedding generation no longer blocks the event loop.
Improves concurrency without changing public API or behavior.

Summary by CodeRabbit

  • Refactor
    • Improved performance and responsiveness of the embedding service for both single and batch embedding operations.

@coderabbitai

coderabbitai Bot commented Feb 2, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

The EmbeddingService now offloads blocking model.encode() operations to a thread pool using asyncio.to_thread(), preventing synchronous encoding calls from blocking the event loop and allowing concurrent requests to proceed without stalling.

Changes

Cohort / File(s) Summary
EmbeddingService Threading
backend/app/services/embedding_service/service.py
Added _encode_sync() helper method to execute synchronous model encoding. Updated get_embedding() and get_embeddings() to invoke _encode_sync() via asyncio.to_thread(), offloading CPU-bound operations to a worker thread while maintaining async control flow.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant EventLoop as Event Loop
    participant ThreadPool as Thread Pool
    participant Model as Model.encode()
    
    rect rgba(100, 150, 200, 0.5)
    Note over Client,Model: Before: Blocking
    Client->>EventLoop: await get_embedding()
    EventLoop->>Model: model.encode() [BLOCKING]
    Note over EventLoop: ⚠ Other requests stall
    Model-->>EventLoop: result
    EventLoop-->>Client: embedding
    end
    
    rect rgba(100, 200, 100, 0.5)
    Note over Client,Model: After: Non-blocking
    Client->>EventLoop: await get_embedding()
    EventLoop->>ThreadPool: asyncio.to_thread(_encode_sync)
    Note over EventLoop: ✓ Event loop free
    ThreadPool->>Model: model.encode() [in thread]
    rect rgba(150, 200, 255, 0.5)
    Note over EventLoop: Other requests run
    end
    Model-->>ThreadPool: result
    ThreadPool-->>EventLoop: result
    EventLoop-->>Client: embedding
    end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

Poem

🐰 Hops through threads with glee,
No more blocking the loop, you see,
Async embedding runs free,
While concurrency flows with tea,
Event loop stays spry as can be! 🧵✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of the PR—fixing the EmbeddingService blocking event loop by offloading encode to a thread, which aligns with the core change.
Linked Issues check ✅ Passed The implementation meets all primary objectives: uses asyncio.to_thread for non-blocking encode calls, adds synchronous helper _encode_sync, preserves public async API, and converts tensors to lists.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing the event loop blocking issue in EmbeddingService; no unrelated modifications are present.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@vaishcodescape vaishcodescape left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM !

@dolliecoder

Copy link
Copy Markdown
Author

LGTM !

@vaishcodescape do you think its ready to be merged?

@Shevilll Shevilll left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peer Review: Fix EmbeddingService blocking event loop by offloading encode to thread

Hello @dolliecoder! Thank you for this excellent performance optimization. Offloading the synchronous and CPU-heavy SentenceTransformer.encode method to a separate worker thread using asyncio.to_thread is a great way to prevent blocking the FastAPI event loop during embedding generation.

I've conducted a deep review of the changes and have one critical feedback item regarding asynchronous safety during lazy initialization:

🚨 Critical Finding: Race Condition during Lazy Loading

In EmbeddingService, the model is loaded lazily via the model property:

    @property
    def model(self) -> SentenceTransformer:
        if self._model is None:
            # ... loading model ...
            self._model = SentenceTransformer(self.model_name, device=self.device)
        return self._model

In your PR, you wrap the encoding with a synchronous helper _encode_sync:

    def _encode_sync(self, *args, **kwargs):
        return self.model.encode(*args, **kwargs)

and then run it in a worker thread:

            embeddings = await asyncio.to_thread(
                self._encode_sync,
                text,
                convert_to_tensor=True,
                show_progress_bar=False
            )

The Issue:
If get_embedding or get_embeddings is called before the model is loaded, the _encode_sync helper will execute on the background thread. When it accesses self.model, it will trigger the lazy-loading initialization of SentenceTransformer inside the worker thread.

If multiple API requests arrive concurrently before the model is fully initialized:

  1. Thread 1 will see self._model is None and begin loading SentenceTransformer.
  2. Thread 2 will simultaneously see self._model is None and also begin loading SentenceTransformer.
  3. This leads to duplicate model loading, excessive memory usage, potential model cache/lock issues (if writing to the same cache directory concurrently), or an application crash.

💡 Suggested Fix

To ensure thread-safe initialization, we must guarantee that the model is loaded on the main thread (sequentially) before spawning the background worker thread.

We can achieve this by accessing the lazy-loaded property on the main event loop thread before calling asyncio.to_thread. For example, in both get_embedding and get_embeddings:

    async def get_embedding(self, text: str) -> List[float]:
        try:
            # ...
            # Force lazy-loading initialization on the main event loop thread
            _ = self.model  

            # Now safely perform CPU-heavy encoding in a separate thread
            embeddings = await asyncio.to_thread(
                self._encode_sync,
                text,
                convert_to_tensor=True,
                show_progress_bar=False
            )
            # ...

This simple, single-line addition ensures the model is safely loaded on the main thread first, avoiding the concurrent lazy-loading race condition entirely while still offloading the CPU-heavy encoding to the thread pool.

Great job on identifying the event loop blocking issue! Let me know what you think of this suggestion.

@Shevilll Shevilll left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peer Review: EmbeddingService Blocking Event Loop Prevention

Thank you for addressing the event-loop blocking issue in the EmbeddingService! Offloading the heavy CPU/GPU-bound .encode() method of SentenceTransformers to a background thread is a necessary step to prevent the FastAPI event loop from stalling.

However, offloading this directly with concurrent asyncio.to_thread calls introduces some critical thread safety and performance concerns under concurrent load that must be addressed.


1. Resource Contention & Nested Threading (Oversubscription)

The SentenceTransformer/PyTorch model uses internal multi-threading (via OpenMP or Intel MKL) to optimize CPU operations during tensor computation.

  • The Issue: When multiple concurrent requests call get_embedding or get_embeddings, asyncio.to_thread executes them on parallel threads from the global thread pool. Running multiple concurrent PyTorch forward passes on separate threads leads to thread oversubscription (nested multi-threading).
  • The Consequence: Instead of speeding up execution, the OS spends significant overhead on thread context switching and lock contention. Under high concurrency, CPU thrashing occurs, causing response latency to spike drastically compared to serial execution.

2. GPU/CUDA Thread Safety & OOM Risks

If the model runs on a CUDA-enabled GPU:

  • The Issue: PyTorch's execution context and memory allocation on CUDA are not designed for parallel invocations from separate OS threads on the same model instance.
  • The Consequence: Concurrent calls can trigger race conditions in CUDA streams, leading to driver crashes, memory corruption, or sudden CUDA Out-Of-Memory (OOM) failures due to uncoordinated memory allocations.

3. Proposed Fix: Serialize Inference with an asyncio.Lock

Since PyTorch already utilizes internal CPU/GPU parallelization efficiently, the safest and most performant pattern is to serialize the inference requests using an asyncio.Lock. This allows the event loop to remain non-blocking while ensuring only one thread accesses the heavy model inference at any single moment.

Here is how you can implement this safely:

import asyncio

class EmbeddingService:
    def __init__(self, ...):
        ...
        self._lock = asyncio.Lock()  # Initialize the lock

    async def get_embedding(self, text: str) -> List[float]:
        try:
            if not isinstance(text, list):
                text = [text]

            # Use lock to serialize model execution across threads safely
            async with self._lock:
                embeddings = await asyncio.to_thread(
                    self._encode_sync,
                    text,
                    convert_to_tensor=True,
                    show_progress_bar=False
                )
            
            embedding_list = embeddings[0].cpu().tolist()
            return embedding_list
        ...

By introducing this synchronization lock, you prevent race conditions, avoid CPU oversubscription, protect GPU memory state, and ensure optimal throughput.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

FEATURE REQUEST:Fix EmbeddingService blocking the event loop

3 participants