Fix EmbeddingService blocking event loop by offloading encode to thread#262
Fix EmbeddingService blocking event loop by offloading encode to thread#262dolliecoder wants to merge 1 commit into
Conversation
…vent loop blocking
📝 WalkthroughWalkthroughThe EmbeddingService now offloads blocking Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant EventLoop as Event Loop
participant ThreadPool as Thread Pool
participant Model as Model.encode()
rect rgba(100, 150, 200, 0.5)
Note over Client,Model: Before: Blocking
Client->>EventLoop: await get_embedding()
EventLoop->>Model: model.encode() [BLOCKING]
Note over EventLoop: ⚠ Other requests stall
Model-->>EventLoop: result
EventLoop-->>Client: embedding
end
rect rgba(100, 200, 100, 0.5)
Note over Client,Model: After: Non-blocking
Client->>EventLoop: await get_embedding()
EventLoop->>ThreadPool: asyncio.to_thread(_encode_sync)
Note over EventLoop: ✓ Event loop free
ThreadPool->>Model: model.encode() [in thread]
rect rgba(150, 200, 255, 0.5)
Note over EventLoop: Other requests run
end
Model-->>ThreadPool: result
ThreadPool-->>EventLoop: result
EventLoop-->>Client: embedding
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
@vaishcodescape do you think its ready to be merged? |
Shevilll
left a comment
There was a problem hiding this comment.
Peer Review: Fix EmbeddingService blocking event loop by offloading encode to thread
Hello @dolliecoder! Thank you for this excellent performance optimization. Offloading the synchronous and CPU-heavy SentenceTransformer.encode method to a separate worker thread using asyncio.to_thread is a great way to prevent blocking the FastAPI event loop during embedding generation.
I've conducted a deep review of the changes and have one critical feedback item regarding asynchronous safety during lazy initialization:
🚨 Critical Finding: Race Condition during Lazy Loading
In EmbeddingService, the model is loaded lazily via the model property:
@property
def model(self) -> SentenceTransformer:
if self._model is None:
# ... loading model ...
self._model = SentenceTransformer(self.model_name, device=self.device)
return self._modelIn your PR, you wrap the encoding with a synchronous helper _encode_sync:
def _encode_sync(self, *args, **kwargs):
return self.model.encode(*args, **kwargs)and then run it in a worker thread:
embeddings = await asyncio.to_thread(
self._encode_sync,
text,
convert_to_tensor=True,
show_progress_bar=False
)The Issue:
If get_embedding or get_embeddings is called before the model is loaded, the _encode_sync helper will execute on the background thread. When it accesses self.model, it will trigger the lazy-loading initialization of SentenceTransformer inside the worker thread.
If multiple API requests arrive concurrently before the model is fully initialized:
- Thread 1 will see
self._modelisNoneand begin loadingSentenceTransformer. - Thread 2 will simultaneously see
self._modelisNoneand also begin loadingSentenceTransformer. - This leads to duplicate model loading, excessive memory usage, potential model cache/lock issues (if writing to the same cache directory concurrently), or an application crash.
💡 Suggested Fix
To ensure thread-safe initialization, we must guarantee that the model is loaded on the main thread (sequentially) before spawning the background worker thread.
We can achieve this by accessing the lazy-loaded property on the main event loop thread before calling asyncio.to_thread. For example, in both get_embedding and get_embeddings:
async def get_embedding(self, text: str) -> List[float]:
try:
# ...
# Force lazy-loading initialization on the main event loop thread
_ = self.model
# Now safely perform CPU-heavy encoding in a separate thread
embeddings = await asyncio.to_thread(
self._encode_sync,
text,
convert_to_tensor=True,
show_progress_bar=False
)
# ...This simple, single-line addition ensures the model is safely loaded on the main thread first, avoiding the concurrent lazy-loading race condition entirely while still offloading the CPU-heavy encoding to the thread pool.
Great job on identifying the event loop blocking issue! Let me know what you think of this suggestion.
Shevilll
left a comment
There was a problem hiding this comment.
Peer Review: EmbeddingService Blocking Event Loop Prevention
Thank you for addressing the event-loop blocking issue in the EmbeddingService! Offloading the heavy CPU/GPU-bound .encode() method of SentenceTransformers to a background thread is a necessary step to prevent the FastAPI event loop from stalling.
However, offloading this directly with concurrent asyncio.to_thread calls introduces some critical thread safety and performance concerns under concurrent load that must be addressed.
1. Resource Contention & Nested Threading (Oversubscription)
The SentenceTransformer/PyTorch model uses internal multi-threading (via OpenMP or Intel MKL) to optimize CPU operations during tensor computation.
- The Issue: When multiple concurrent requests call
get_embeddingorget_embeddings,asyncio.to_threadexecutes them on parallel threads from the global thread pool. Running multiple concurrent PyTorch forward passes on separate threads leads to thread oversubscription (nested multi-threading). - The Consequence: Instead of speeding up execution, the OS spends significant overhead on thread context switching and lock contention. Under high concurrency, CPU thrashing occurs, causing response latency to spike drastically compared to serial execution.
2. GPU/CUDA Thread Safety & OOM Risks
If the model runs on a CUDA-enabled GPU:
- The Issue: PyTorch's execution context and memory allocation on CUDA are not designed for parallel invocations from separate OS threads on the same model instance.
- The Consequence: Concurrent calls can trigger race conditions in CUDA streams, leading to driver crashes, memory corruption, or sudden CUDA Out-Of-Memory (OOM) failures due to uncoordinated memory allocations.
3. Proposed Fix: Serialize Inference with an asyncio.Lock
Since PyTorch already utilizes internal CPU/GPU parallelization efficiently, the safest and most performant pattern is to serialize the inference requests using an asyncio.Lock. This allows the event loop to remain non-blocking while ensuring only one thread accesses the heavy model inference at any single moment.
Here is how you can implement this safely:
import asyncio
class EmbeddingService:
def __init__(self, ...):
...
self._lock = asyncio.Lock() # Initialize the lock
async def get_embedding(self, text: str) -> List[float]:
try:
if not isinstance(text, list):
text = [text]
# Use lock to serialize model execution across threads safely
async with self._lock:
embeddings = await asyncio.to_thread(
self._encode_sync,
text,
convert_to_tensor=True,
show_progress_bar=False
)
embedding_list = embeddings[0].cpu().tolist()
return embedding_list
...By introducing this synchronization lock, you prevent race conditions, avoid CPU oversubscription, protect GPU memory state, and ensure optimal throughput.
Fixes #261
Problem:
EmbeddingService.get_embedding() and get_embeddings() call SentenceTransformer.encode() inside async methods.
Since encode() is synchronous and CPU-heavy, it blocks the asyncio event loop and prevents concurrent requests.
Solution:
Moved encode calls to asyncio.to_thread() using a small synchronous helper method.
Result:
Embedding generation no longer blocks the event loop.
Improves concurrency without changing public API or behavior.
Summary by CodeRabbit