Skip to content

Commit fdeac0f

Browse files
committed
Extract order request processing into a dedicated pool
1 parent 304d4f2 commit fdeac0f

4 files changed

Lines changed: 397 additions & 175 deletions

File tree

Engine/TransactionHandlers/BacktestingTransactionHandler.cs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes
5858
_enableConcurrency = _brokerage.ConcurrencyEnabled && _algorithm.LiveMode;
5959

6060
base.Initialize(algorithm, brokerage, resultHandler);
61-
62-
if (!_enableConcurrency)
63-
{
64-
// non blocking implementation
65-
_orderRequestQueues = new() { new BusyCollection<OrderRequest>() };
66-
}
6761
}
6862

63+
/// <summary>
64+
/// For backtesting order requests are processed synchronously by the algorithm thread, only live
65+
/// paper deployments with a concurrency enabled brokerage use background transaction threads
66+
/// </summary>
67+
protected override bool SynchronousProcessing => !_enableConcurrency;
68+
6969
/// <summary>
7070
/// Processes all synchronous events that must take place before the next time loop for the algorithm
7171
/// </summary>
@@ -74,7 +74,7 @@ public override void ProcessSynchronousEvents()
7474
if (!_enableConcurrency)
7575
{
7676
// we process pending order requests our selves
77-
Run(0);
77+
ProcessPendingRequests();
7878
}
7979

8080
base.ProcessSynchronousEvents();
@@ -113,7 +113,7 @@ protected override void WaitForOrderSubmission(OrderTicket ticket)
113113
}
114114

115115
// we submit the order request our selves
116-
Run(0);
116+
ProcessPendingRequests();
117117

118118
if (!ticket.OrderSet.WaitOne(0))
119119
{
@@ -124,18 +124,5 @@ protected override void WaitForOrderSubmission(OrderTicket ticket)
124124
"See the OrderRequest.Response for more information");
125125
}
126126
}
127-
128-
/// <summary>
129-
/// For backtesting order requests will be processed by the algorithm thread
130-
/// sequentially at <see cref="WaitForOrderSubmission"/> and <see cref="ProcessSynchronousEvents"/>
131-
/// </summary>
132-
protected override void InitializeTransactionThread()
133-
{
134-
if (_enableConcurrency)
135-
{
136-
// let the base class handle this
137-
base.InitializeTransactionThread();
138-
}
139-
}
140127
}
141128
}

Engine/TransactionHandlers/BrokerageTransactionHandler.cs

Lines changed: 74 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,10 @@ public class BrokerageTransactionHandler : ITransactionHandler
7070
private int _failedCashSyncAttempts;
7171

7272
/// <summary>
73-
/// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once
74-
/// orders are processed they are moved into the Orders queue awaiting the brokerage response.
73+
/// Holds the worker threads and their queues, dispatching each order request to the queue pinned to
74+
/// its order and growing the pool on demand as the threads get saturated.
7575
/// </summary>
76-
protected List<IBusyCollection<OrderRequest>> _orderRequestQueues { get; set; }
77-
78-
private List<Thread> _processingThreads;
79-
// maximum number of transaction threads (and queues) the pool can grow to on demand
80-
private int _maximumTransactionThreads;
81-
// pins each order (or combo group) to one queue for its whole life, so all its requests are handled
82-
// in order by the same thread even after the pool grows and changes the modulo used for new orders
83-
private readonly Dictionary<int, int> _orderRequestQueueIndexByKey = new();
84-
// guards on demand growth of the queues/threads against concurrent reads in Run/Exit/enqueue
85-
private readonly object _processingThreadsLock = new object();
86-
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
76+
protected OrderRequestProcessingPool _threadPool;
8777

8878
private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();
8979

@@ -217,8 +207,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
217207
HandleOrderUpdated(e);
218208
};
219209

220-
IsActive = true;
221-
222210
if (_algorithm is QCAlgorithm qcAlgorithm)
223211
{
224212
_qcAlgorithmInstance = qcAlgorithm;
@@ -237,31 +225,36 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
237225
InitializeTransactionThread();
238226
}
239227

228+
/// <summary>
229+
/// Whether the transaction thread pool can grow on demand to process order requests concurrently.
230+
/// When false a single worker thread is used.
231+
/// </summary>
232+
protected virtual bool ConcurrencyEnabled => _brokerage.ConcurrencyEnabled;
233+
234+
/// <summary>
235+
/// Whether order requests are drained synchronously by the algorithm thread instead of by background
236+
/// worker threads. Used by backtesting deployments.
237+
/// </summary>
238+
protected virtual bool SynchronousProcessing => false;
239+
240240
/// <summary>
241241
/// Create and start the transaction thread, who will be in charge of processing
242242
/// the order requests
243243
/// </summary>
244244
protected virtual void InitializeTransactionThread()
245245
{
246-
// live deployments start with the minimum number of threads and grow on demand (see TryExpandProcessingThreads)
247-
// up to the maximum. No concurrency means a single thread, no growth.
248-
int initialThreadsCount;
249-
if (_brokerage.ConcurrencyEnabled)
250-
{
251-
_maximumTransactionThreads = Math.Max(1, MaximumTransactionThreads);
252-
initialThreadsCount = Math.Min(Math.Max(1, MinimumTransactionThreads), _maximumTransactionThreads);
253-
}
254-
else
246+
Action<OrderRequest> processRequest = request =>
255247
{
256-
_maximumTransactionThreads = initialThreadsCount = 1;
257-
}
248+
HandleOrderRequest(request);
249+
ProcessAsynchronousEvents();
250+
};
251+
Action<Exception> onError = error => _algorithm.SetRuntimeError(error, "HandleOrderRequest");
258252

259-
_orderRequestQueues = new(_maximumTransactionThreads);
260-
_processingThreads = new(_maximumTransactionThreads);
261-
for (var i = 0; i < initialThreadsCount; i++)
262-
{
263-
AddProcessingThread();
264-
}
253+
// backtesting drains a single queue synchronously on the algorithm thread, live deployments use
254+
// background worker threads: a single one, or growing on demand up to the maximum when concurrent.
255+
_threadPool = SynchronousProcessing
256+
? OrderRequestProcessingPool.Synchronous(processRequest, onError)
257+
: new OrderRequestProcessingPool(ConcurrencyEnabled, MinimumTransactionThreads, MaximumTransactionThreads, processRequest, onError);
265258
}
266259

267260
/// <summary>
@@ -277,59 +270,13 @@ protected virtual void InitializeTransactionThread()
277270
/// <summary>
278271
/// The number of transaction threads currently running
279272
/// </summary>
280-
protected int ProcessingThreadsCount
281-
{
282-
get
283-
{
284-
lock (_processingThreadsLock)
285-
{
286-
return _processingThreads?.Count ?? 0;
287-
}
288-
}
289-
}
290-
291-
/// <summary>
292-
/// Creates a queue and its dedicated thread and starts it.
293-
/// Callers growing the pool on demand must hold <see cref="_processingThreadsLock"/>.
294-
/// </summary>
295-
private void AddProcessingThread()
296-
{
297-
var threadId = _orderRequestQueues.Count; // matches the queue index this thread will consume
298-
_orderRequestQueues.Add(new BusyBlockingCollection<OrderRequest>());
299-
var thread = new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {threadId}" };
300-
_processingThreads.Add(thread);
301-
thread.Start();
302-
}
273+
protected int ProcessingThreadsCount => _threadPool?.ThreadCount ?? 0;
303274

304275
/// <summary>
305-
/// Grows the pool only when every thread is busy and still has pending requests, up to the maximum.
306-
/// Caller must hold <see cref="_processingThreadsLock"/>.
276+
/// Boolean flag indicating the transaction threads are busy.
277+
/// False indicates they are completely finished processing and ready to be terminated.
307278
/// </summary>
308-
private void TryExpandProcessingThreads()
309-
{
310-
if (_orderRequestQueues.Count >= _maximumTransactionThreads || _cancellationTokenSource.IsCancellationRequested)
311-
{
312-
return;
313-
}
314-
315-
// only grow when the whole pool is saturated: every thread busy and with requests still waiting
316-
for (var i = 0; i < _orderRequestQueues.Count; i++)
317-
{
318-
var queue = _orderRequestQueues[i];
319-
if (!queue.IsBusy || queue.Count == 0)
320-
{
321-
return;
322-
}
323-
}
324-
325-
AddProcessingThread();
326-
}
327-
328-
/// <summary>
329-
/// Boolean flag indicating the Run thread method is busy.
330-
/// False indicates it is completely finished processing and ready to be terminated.
331-
/// </summary>
332-
public bool IsActive { get; private set; }
279+
public bool IsActive => _threadPool?.IsActive ?? false;
333280

334281
#region Order Request Processing
335282

@@ -437,7 +384,7 @@ public OrderTicket AddOrder(SubmitOrderRequest request)
437384
}
438385

439386
/// <summary>
440-
/// Wait for the order to be handled by the <see cref="_processingThreads"/>
387+
/// Wait for the order to be handled by the <see cref="_threadPool"/>
441388
/// </summary>
442389
/// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
443390
protected virtual void WaitForOrderSubmission(OrderTicket ticket)
@@ -745,36 +692,12 @@ public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
745692
}
746693

747694
/// <summary>
748-
/// Primary thread entry point to launch the transaction thread.
695+
/// Drains the pending order requests on the calling thread. Used by synchronous (non concurrent)
696+
/// deployments, where the algorithm thread pumps the request queue itself.
749697
/// </summary>
750-
protected void Run(int threadId)
698+
protected void ProcessPendingRequests()
751699
{
752-
IBusyCollection<OrderRequest> queue;
753-
lock (_processingThreadsLock)
754-
{
755-
// capture our queue safely, the queues list may be growing on demand concurrently
756-
queue = _orderRequestQueues[threadId];
757-
}
758-
759-
try
760-
{
761-
foreach (var request in queue.GetConsumingEnumerable(_cancellationTokenSource.Token))
762-
{
763-
HandleOrderRequest(request);
764-
ProcessAsynchronousEvents();
765-
}
766-
}
767-
catch (Exception err)
768-
{
769-
// unexpected error, we need to close down shop
770-
_algorithm.SetRuntimeError(err, "HandleOrderRequest");
771-
}
772-
773-
if (_processingThreads != null)
774-
{
775-
Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}...");
776-
IsActive = false;
777-
}
700+
_threadPool.ProcessPending();
778701
}
779702

780703
/// <summary>
@@ -795,7 +718,7 @@ public virtual void ProcessSynchronousEvents()
795718
// in backtesting we need to wait for orders to be removed from the queue and finished processing
796719
if (!_algorithm.LiveMode)
797720
{
798-
if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token)))
721+
if (_threadPool.WaitForProcessing(Time.OneSecond))
799722
{
800723
Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
801724
}
@@ -878,35 +801,16 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm)
878801
public void Exit()
879802
{
880803
var timeout = TimeSpan.FromSeconds(60);
881-
if (_processingThreads != null)
804+
if (_threadPool != null)
882805
{
883-
// snapshot under the lock since the pool might still be growing on demand concurrently
884-
List<IBusyCollection<OrderRequest>> queues;
885-
List<Thread> threads;
886-
lock (_processingThreadsLock)
887-
{
888-
queues = _orderRequestQueues.ToList();
889-
threads = _processingThreads.ToList();
890-
}
891-
892-
// only wait if the processing thread is running
893-
if (queues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout)))
806+
// only wait if a queue is still processing
807+
if (_threadPool.WaitForProcessing(timeout))
894808
{
895809
Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
896810
}
897811

898-
foreach (var queue in queues)
899-
{
900-
queue.CompleteAdding();
901-
}
902-
903-
foreach (var thread in threads)
904-
{
905-
thread?.StopSafely(timeout, _cancellationTokenSource);
906-
}
812+
_threadPool.Shutdown(timeout);
907813
}
908-
IsActive = false;
909-
_cancellationTokenSource.DisposeSafely();
910814
}
911815

912816
/// <summary>
@@ -1314,6 +1218,13 @@ private void HandleOrderEvents(List<OrderEvent> orderEvents)
13141218
order.Status = orderEvent.Status;
13151219
}
13161220

1221+
// once an order reaches a final state it won't receive any more requests, so release its pinned
1222+
// processing queue to keep the pin map bounded to the orders still in flight
1223+
if (order.Status.IsClosed())
1224+
{
1225+
TryReleaseProcessingQueue(order);
1226+
}
1227+
13171228
orderEvent.Id = order.GetNewId();
13181229

13191230
// set the modified time of the order to the fill's timestamp
@@ -2027,29 +1938,43 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
20271938
private void EnqueueOrderRequest(OrderRequest request, Order order)
20281939
{
20291940
// route by OrderId (or combo group id) so requests for the same order keep their order on one queue
2030-
var queueKey = request.OrderId;
1941+
var routingKey = request.OrderId;
20311942
if (order.GroupOrderManager?.Id > 0)
20321943
{
2033-
queueKey = order.GroupOrderManager.Id;
1944+
routingKey = order.GroupOrderManager.Id;
20341945
}
20351946

2036-
IBusyCollection<OrderRequest> queue;
2037-
lock (_processingThreadsLock)
1947+
_threadPool.Dispatch(request, routingKey);
1948+
}
1949+
1950+
/// <summary>
1951+
/// Releases the processing queue pinned to a closed order so the pin map stays bounded to the orders still
1952+
/// in flight. A combo group shares a single queue keyed by its group id, so it is only released once every
1953+
/// leg has reached a final state, mirroring the routing key used in <see cref="EnqueueOrderRequest"/>.
1954+
/// </summary>
1955+
private void TryReleaseProcessingQueue(Order order)
1956+
{
1957+
var group = order.GroupOrderManager;
1958+
if (group == null || group.Id <= 0)
20381959
{
2039-
// grow the pool first if every existing thread is already saturated
2040-
TryExpandProcessingThreads();
1960+
_threadPool.Release(order.Id);
1961+
return;
1962+
}
20411963

2042-
// reuse the order's pinned queue if it has one, so it is never re-routed when the pool grows
2043-
if (!_orderRequestQueueIndexByKey.TryGetValue(queueKey, out var queueIndex))
1964+
// the whole group routes through one queue; its still-open legs must keep landing on that same queue,
1965+
// so we can only release it once every leg has been submitted and reached a final state
1966+
if (group.OrderIds.Count < group.Count)
1967+
{
1968+
return;
1969+
}
1970+
foreach (var legId in group.OrderIds)
1971+
{
1972+
if (!_completeOrders.TryGetValue(legId, out var leg) || !leg.Status.IsClosed())
20441973
{
2045-
queueIndex = queueKey % _orderRequestQueues.Count;
2046-
_orderRequestQueueIndexByKey[queueKey] = queueIndex;
1974+
return;
20471975
}
2048-
queue = _orderRequestQueues[queueIndex];
20491976
}
2050-
2051-
// add outside the lock, since it can block when the queue is at its bounded capacity
2052-
queue.Add(request);
1977+
_threadPool.Release(group.Id);
20531978
}
20541979

20551980
/// <summary>

0 commit comments

Comments
 (0)