Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,32 @@ internal sealed class PostgresMessageBusOutboxWorker(
PostgresMessageOutboxOptions options,
PostgresOutboxProcessor processor) : IHostedService
{
private readonly object _lock = new();
private NpgsqlDataSource? _dataSource;
private ContinuousTask? _task;

/// <summary>
/// Starts the outbox processing background task.
/// Starts the outbox processing background task. This call is idempotent: invoking it again
/// while the worker is already running is a no-op that returns without starting a second loop.
/// </summary>
/// <param name="cancellationToken">A token that signals when startup should be aborted.</param>
/// <returns>A completed task once the background loop has been initiated.</returns>
/// <exception cref="InvalidOperationException">Thrown if the worker is already running.</exception>
public Task StartAsync(CancellationToken cancellationToken)
{
if (_task is not null)
lock (_lock)
{
throw new InvalidOperationException("The worker is already running.");
}
if (_task is not null)
{
return Task.CompletedTask;
}

_dataSource = NpgsqlDataSource.Create(options.ConnectionString);
_task = new ContinuousTask(ProcessAsync);
// The loop captures its own data source rather than reading the field, so that
// StopAsync (or a concurrent restart) can clear and dispose the field without
// affecting an already-running loop.
var dataSource = NpgsqlDataSource.Create(options.ConnectionString);
_task = new ContinuousTask(token => ProcessAsync(dataSource, token));
_dataSource = dataSource;
Comment thread
alisan3 marked this conversation as resolved.
}

return Task.CompletedTask;
}
Expand All @@ -42,24 +50,40 @@ public Task StartAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken">A token that signals when shutdown should be forced.</param>
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_task is null)
ContinuousTask? task;
NpgsqlDataSource? dataSource;

lock (_lock)
{
return;
task = _task;
dataSource = _dataSource;
_task = null;
_dataSource = null;
}

await _task.DisposeAsync();
_task = null;
if (task is null)
{
return;
}

if (_dataSource is not null)
// Dispose the data source even if the loop fails to shut down cleanly, so the
// underlying connection pool is always released.
try
{
Comment thread
alisan3 marked this conversation as resolved.
await _dataSource.DisposeAsync();
_dataSource = null;
await task.DisposeAsync();
}
finally
{
if (dataSource is not null)
{
await dataSource.DisposeAsync();
}
}
Comment thread
alisan3 marked this conversation as resolved.
}

private async Task ProcessAsync(CancellationToken stoppingToken)
private async Task ProcessAsync(NpgsqlDataSource dataSource, CancellationToken stoppingToken)
{
await using var connection = await _dataSource!.OpenConnectionAsync(stoppingToken);
await using var connection = await dataSource.OpenConnectionAsync(stoppingToken);

await processor.ProcessAsync(connection, stoppingToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,32 @@ internal sealed class ScheduledMessageWorker(
ScheduledMessageDispatcher dispatcher)
: IHostedService
{
private readonly object _lock = new();
private NpgsqlDataSource? _dataSource;
private ContinuousTask? _task;

/// <summary>
/// Starts the scheduled message processing background task.
/// Starts the scheduled message processing background task. This call is idempotent: invoking it
/// again while the worker is already running is a no-op that returns without starting a second loop.
/// </summary>
/// <param name="cancellationToken">A token that signals when startup should be aborted.</param>
/// <returns>A completed task once the background loop has been initiated.</returns>
/// <exception cref="InvalidOperationException">Thrown if the worker is already running.</exception>
public Task StartAsync(CancellationToken cancellationToken)
{
if (_task is not null)
lock (_lock)
{
throw new InvalidOperationException("The worker is already running.");
}
if (_task is not null)
{
return Task.CompletedTask;
}

_dataSource = NpgsqlDataSource.Create(options.ConnectionString);
_task = new ContinuousTask(ProcessAsync);
// The loop captures its own data source rather than reading the field, so that
// StopAsync (or a concurrent restart) can clear and dispose the field without
// affecting an already-running loop.
var dataSource = NpgsqlDataSource.Create(options.ConnectionString);
_task = new ContinuousTask(token => ProcessAsync(dataSource, token));
_dataSource = dataSource;
}
Comment thread
alisan3 marked this conversation as resolved.

return Task.CompletedTask;
}
Expand All @@ -43,24 +51,40 @@ public Task StartAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken">A token that signals when shutdown should be forced.</param>
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_task is null)
ContinuousTask? task;
NpgsqlDataSource? dataSource;

lock (_lock)
{
return;
task = _task;
dataSource = _dataSource;
_task = null;
_dataSource = null;
}

await _task.DisposeAsync();
_task = null;
if (task is null)
{
return;
}

if (_dataSource is not null)
// Dispose the data source even if the loop fails to shut down cleanly, so the
// underlying connection pool is always released.
try
{
Comment thread
alisan3 marked this conversation as resolved.
await _dataSource.DisposeAsync();
_dataSource = null;
await task.DisposeAsync();
}
Comment thread
alisan3 marked this conversation as resolved.
finally
{
if (dataSource is not null)
{
await dataSource.DisposeAsync();
}
}
Comment thread
alisan3 marked this conversation as resolved.
}

private async Task ProcessAsync(CancellationToken stoppingToken)
private async Task ProcessAsync(NpgsqlDataSource dataSource, CancellationToken stoppingToken)
{
await using var connection = await _dataSource!.OpenConnectionAsync(stoppingToken);
await using var connection = await dataSource.OpenConnectionAsync(stoppingToken);

await dispatcher.ProcessAsync(connection, stoppingToken);
}
Expand Down
33 changes: 22 additions & 11 deletions src/Mocha/src/Mocha.Inbox/MessageBusInboxWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ internal sealed class MessageBusInboxWorker(
ILogger<InboxCleanupProcessor> cleanupLogger,
ILogger<MessageBusInboxWorker> logger) : IHostedService
{
private readonly object _lock = new();
private ContinuousTask? _task;

/// <summary>
/// Starts the inbox cleanup background task.
/// Starts the inbox cleanup background task. This call is idempotent: invoking it again
/// while the worker is already running is a no-op that returns without starting a second loop.
/// </summary>
/// <param name="cancellationToken">A token that signals when startup should be aborted.</param>
/// <returns>A completed task once the background loop has been initiated.</returns>
/// <exception cref="InvalidOperationException">Thrown if the worker is already running.</exception>
public Task StartAsync(CancellationToken cancellationToken)
Comment thread
alisan3 marked this conversation as resolved.
{
if (_task is not null)
lock (_lock)
{
throw new InvalidOperationException("The worker is already running.");
}
if (_task is not null)
{
return Task.CompletedTask;
}
Comment thread
alisan3 marked this conversation as resolved.

logger.InboxWorkerStarting();
logger.InboxWorkerStarting();

_task = new ContinuousTask(ProcessAsync);
_task = new ContinuousTask(ProcessAsync);
}

return Task.CompletedTask;
}
Expand All @@ -49,15 +53,22 @@ public Task StartAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken">A token that signals when shutdown should be forced.</param>
public async Task StopAsync(CancellationToken cancellationToken)
{
logger.InboxWorkerStopping();
ContinuousTask? task;

lock (_lock)
{
task = _task;
_task = null;
}

if (_task is null)
if (task is null)
{
return;
}

await _task.DisposeAsync();
_task = null;
logger.InboxWorkerStopping();

await task.DisposeAsync();
}

private async Task ProcessAsync(CancellationToken stoppingToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ public async Task UsePostgresInbox_Should_RegisterHostedService_When_Called()
Assert.Contains(hostedServices, s => s is MessageBusInboxWorker);
}

[Fact]
public async Task StartAsync_Should_NotThrow_When_CalledMultipleTimes()
{
// arrange
await using var provider = BuildProvider();
var worker = provider.GetServices<IHostedService>()
.OfType<MessageBusInboxWorker>()
.Single();

// act
await worker.StartAsync(CancellationToken.None);
await worker.StartAsync(CancellationToken.None);

// assert
await worker.StopAsync(CancellationToken.None);
}

[Fact]
public async Task UsePostgresInbox_Should_RegisterScopedInbox_When_Called()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ public async Task UsePostgresOutbox_Should_RegisterHostedService_When_Called()
Assert.Contains(hostedServices, s => s is PostgresMessageBusOutboxWorker);
}

[Fact]
public async Task StartAsync_Should_NotThrow_When_CalledMultipleTimes()
{
// arrange
await using var provider = BuildProvider();
var worker = provider.GetServices<IHostedService>()
.OfType<PostgresMessageBusOutboxWorker>()
.Single();

// act
await worker.StartAsync(CancellationToken.None);
await worker.StartAsync(CancellationToken.None);

// assert
await worker.StopAsync(CancellationToken.None);
}

[Fact]
public async Task UsePostgresOutbox_Should_RegisterScopedOutbox_When_Called()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Mocha.EntityFrameworkCore.Postgres.Tests.Helpers;
Comment thread
alisan3 marked this conversation as resolved.
using Mocha.Scheduling;
using Mocha.Transport.InMemory;

namespace Mocha.EntityFrameworkCore.Postgres.Tests;

public sealed class SchedulingServiceRegistrationTests
{
private const string ConnectionString = "Host=localhost;Database=test";
Comment thread
alisan3 marked this conversation as resolved.
Comment thread
alisan3 marked this conversation as resolved.

[Fact]
public async Task UsePostgresScheduling_Should_RegisterHostedService_When_Called()
{
// arrange
await using var provider = BuildProvider();

// act
var hostedServices = provider.GetServices<IHostedService>();

// assert
Assert.Contains(hostedServices, s => s is ScheduledMessageWorker);
}

[Fact]
public async Task StartAsync_Should_NotThrow_When_CalledMultipleTimes()
{
// arrange
await using var provider = BuildProvider();
var worker = provider.GetServices<IHostedService>()
.OfType<ScheduledMessageWorker>()
.Single();

// act
await worker.StartAsync(CancellationToken.None);
await worker.StartAsync(CancellationToken.None);

// assert
await worker.StopAsync(CancellationToken.None);
Comment thread
alisan3 marked this conversation as resolved.
}

private static ServiceProvider BuildProvider()
{
var services = new ServiceCollection();
services.AddLogging();
services.AddDbContext<TestDbContext>(o => o.UseNpgsql(ConnectionString));

var builder = services.AddMessageBus();
builder.AddEntityFramework<TestDbContext>(ef => ef.UsePostgresScheduling());
builder.AddInMemory();

var provider = services.BuildServiceProvider();

// Build the runtime so that all singleton factories resolve
_ = provider.GetRequiredService<IMessagingRuntime>();

return provider;
}
}
Loading