Skip to content
Merged
44 changes: 44 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,50 @@

---

## 🐛 Bug Fixes from Code Review — 4 Concurrency & Null Safety Issues (2026-06-10)

**Repo:** EDDI (`fix/code-review-bugs`)
**What changed:** Fixed 4 verified bugs from code review (priority HIGH to MEDIUM). All fixes include regression tests.

### Fix #1 — PropertySetterTask NPE on blank input (HIGH)

- **Root cause:** `CATCH_ANY_INPUT_AS_PROPERTY` handler dereferences `getLatestData("input:initial")` without null check. When a client sends an empty/whitespace-only message, `Conversation.storeUserInputInMemory` skips storing `input:initial` → `getLatestData` returns null → NPE → pipeline dies → conversation enters ERROR state.
- **Fix:** Added null guards for both `initialInputData` and `initialInput`.
- **Tests:** 3 new tests — missing `input:initial`, null result, empty string result.
- **Files:** `PropertySetterTask.java`, `PropertySetterTaskTest.java`

### Fix #2 — Config version race condition (HIGH PG / MEDIUM-LOW Mongo)

- **Root cause:** `HistorizedResourceStore.update()` does non-atomic read→increment→write. Two concurrent edits both read version N, both write N+1 — last write wins silently. On PostgreSQL: `ON CONFLICT DO UPDATE` silently merges history. On MongoDB: history `insertOne` throws unhandled `MongoWriteException` (HTTP 500 instead of 409).
- **Fix:** Introduced optimistic locking via `storeIfCurrentVersion()` default method on `IResourceStorage`. MongoDB overrides with version-conditioned `updateOne` (check `matchedCount`). PostgreSQL overrides with `UPDATE WHERE version = ?` (check affected rows). History inserts hardened: Mongo catches duplicate-key 11000; Postgres uses `ON CONFLICT DO NOTHING`.
- **Tests:** 1 new test for concurrent modification detection (mock throws `ResourceModifiedException`); existing update test updated to verify `storeIfCurrentVersion` delegation.
- **Files:** `IResourceStorage.java`, `MongoResourceStorage.java`, `PostgresResourceStorage.java`, `HistorizedResourceStore.java`, `HistorizedResourceStoreTest.java`

### Fix #3 — ComponentCache HashMap race (MEDIUM)

- **Root cause:** `ComponentCache` is `@ApplicationScoped` (singleton) using plain `HashMap`. `computeIfAbsent` on `HashMap` is not thread-safe. Concurrent reads (every conversation turn via `LifecycleManager`) and writes (lazy agent deployment via `WorkflowStoreClientLibrary`) can corrupt the map.
- **Fix:** Replaced `HashMap` with `ConcurrentHashMap` for both outer and inner maps.
- **Tests:** 1 new concurrent stress test (8 threads, 500 ops each, mixed read/write).
- **Files:** `ComponentCache.java`, `ComponentCacheTest.java`

### Fix #4 — Zombie-write snapshot clobber after timeout (MEDIUM)

- **Root cause:** When an agent times out, `future.cancel(true)` sets the interrupt flag but doesn't stop threads blocked in non-interruptible I/O (LLM HTTP calls). When the call eventually completes, `onComplete` callback fires → `storeConversationMemory` → unconditional `replaceOne` overwrites the newer conversation state.
- **Fix:** Check `Thread.currentThread().isInterrupted()` before calling `onComplete()`. If interrupted, route to `onFailure()` instead (with log warning).
- **Tests:** 2 new tests — cancelled thread routes to `onFailure`; non-interrupted thread still routes to `onComplete`.
- **Files:** `BaseRuntime.java`, `BaseRuntimeTest.java`
Comment thread
ginccc marked this conversation as resolved.

### Design Decisions

- **Optimistic locking as default method:** `storeIfCurrentVersion()` was added as a `default` method on the `IResourceStorage` interface (delegating to `store()`) rather than an abstract method. This avoids breaking all existing implementations while letting backends opt into conditional writes. The Javadoc clearly states that the default does _not_ provide optimistic locking.
- **Interrupt check over Future.isCancelled():** The zombie-write fix checks `Thread.currentThread().isInterrupted()` inside the submitted lambda rather than inspecting `Future.isCancelled()` from outside, because the interrupt flag is the only signal visible from within the executing thread after a non-interruptible I/O completes.
- **Return null on interruption:** When the interrupt flag is set, the lambda now returns `null` instead of the stale result. This prevents callers who `future.get()` the returned Future from receiving a stale value that was already routed to `onFailure`.
- **ConcurrentHashMap over synchronized blocks:** For `ComponentCache`, `ConcurrentHashMap` was chosen over `Collections.synchronizedMap` or explicit locking because `computeIfAbsent` provides exactly the atomic read-or-create semantics needed, with better concurrency than full map locking.
- **No conversation context in BaseRuntime logs:** `BaseRuntime` is generic executor infrastructure with no access to conversation/agent IDs. The warning log includes the thread name for traceability; richer context is logged by the downstream `onFailure` callback in `ConversationService`.

---


## 🐛 Fix: Swagger UI Broken by CSP — Per-Path Filter Override (2026-06-03)

**Repo:** EDDI (`fix/swagger-ui-csp`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Integer update(String id, Integer version, T content) throws ResourceStor
try {
Integer newVersion = resource.getVersion() + 1;
var newResource = resourceStorage.newResource(resource.getId(), newVersion, content);
resourceStorage.store(newResource);
resourceStorage.storeIfCurrentVersion(newResource, version);
return newVersion;
} catch (IOException e) {
throw new ResourceStoreException(e.getLocalizedMessage(), e);
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/ai/labs/eddi/datastore/IResourceStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,30 @@ public interface IResourceStorage<T> {

Integer getCurrentVersion(String id);

/**
* Store a new version of a resource only if the current version in the database
* matches {@code expectedCurrentVersion}.
* <p>
* Implementations that support conditional writes should override this method
* to enforce the check atomically. The default implementation delegates to
* {@link #store(IResource)} and does <strong>not</strong> provide optimistic
* locking.
* <p>
* Used by {@link ai.labs.eddi.datastore.HistorizedResourceStore#update} for
* optimistic locking.
*
* @param newResource
* the resource with the new version to store
* @param expectedCurrentVersion
* the version the caller believes is currently stored
* @throws IResourceStore.ResourceModifiedException
* if the current version no longer matches (concurrent edit)
*/
Comment thread
ginccc marked this conversation as resolved.
default void storeIfCurrentVersion(IResource<T> newResource, int expectedCurrentVersion)
throws IResourceStore.ResourceModifiedException {
store(newResource);
}

/**
* Find resource IDs where the JSON data contains the given value at the given
* path. Used by AgentStore/WorkflowStore for "find configs containing resource"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ai.labs.eddi.datastore.IResourceStorage;
import ai.labs.eddi.datastore.IResourceStore;
import ai.labs.eddi.datastore.serialization.IDocumentBuilder;
import com.mongodb.MongoWriteException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
Expand Down Expand Up @@ -93,6 +94,22 @@ public void store(IResource<T> currentResource) {
}
}

@Override
public void storeIfCurrentVersion(IResource<T> newResource, int expectedCurrentVersion)
throws IResourceStore.ResourceModifiedException {
Resource resource = checkInternalResource(newResource);
var result = currentCollection.updateOne(
Filters.and(
Filters.eq(ID_FIELD, new ObjectId(resource.getId())),
Filters.eq(VERSION_FIELD, expectedCurrentVersion)),
new Document("$set", resource.getMongoDocument()));
if (result.getMatchedCount() == 0) {
throw new IResourceStore.ResourceModifiedException(
String.format("Resource was modified concurrently (id=%s, expected version=%d)",
resource.getId(), expectedCurrentVersion));
}
}

@Override
public void createNew(IResource<T> currentResource) {
Resource resource = checkInternalResource(currentResource);
Expand Down Expand Up @@ -204,7 +221,16 @@ public Integer getCurrentVersion(String id) {
@Override
public void store(IHistoryResource<T> resource) {
HistoryResource historyResource = checkInternalHistoryResource(resource);
historyCollection.insertOne(historyResource.getMongoDocument());
try {
historyCollection.insertOne(historyResource.getMongoDocument());
} catch (MongoWriteException e) {
if (e.getError().getCode() == 11000) {
// Duplicate key — another thread already archived this version.
// Safe to ignore: the history row is identical (same id + version).
return;
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ ON CONFLICT (id, collection_name) DO UPDATE
}
}

@Override
public void storeIfCurrentVersion(IResource<T> newResource, int expectedCurrentVersion)
throws IResourceStore.ResourceModifiedException {
Resource pgResource = checkInternalResource(newResource);
String sql = """
UPDATE resources SET version = ?, data = ?::jsonb
WHERE id = ?::uuid AND collection_name = ? AND version = ?
""";
try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, pgResource.getVersion());
ps.setString(2, pgResource.getJson());
ps.setString(3, pgResource.getId());
ps.setString(4, collectionName);
ps.setInt(5, expectedCurrentVersion);
int rows = ps.executeUpdate();
if (rows == 0) {
throw new IResourceStore.ResourceModifiedException(
String.format("Resource was modified concurrently (id=%s, expected version=%d)",
pgResource.getId(), expectedCurrentVersion));
}
} catch (SQLException e) {
throw new RuntimeException("Failed to store resource with version check", e);
}
}

@Override
public void createNew(IResource<T> resource) {
Resource pgResource = checkInternalResource(resource);
Expand Down Expand Up @@ -236,8 +261,7 @@ public void store(IHistoryResource<T> history) {
String sql = """
INSERT INTO resources_history (id, collection_name, version, data, deleted)
VALUES (?::uuid, ?, ?, ?::jsonb, ?)
ON CONFLICT (id, collection_name, version) DO UPDATE
SET data = EXCLUDED.data, deleted = EXCLUDED.deleted
ON CONFLICT (id, collection_name, version) DO NOTHING
""";
try (Connection conn = dataSource.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, pgHistory.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@
import ai.labs.eddi.engine.lifecycle.IComponentCache;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ApplicationScoped
public class ComponentCache implements IComponentCache {
private final Map<String, Map<String, Object>> componentMaps = new HashMap<>();
private final Map<String, Map<String, Object>> componentMaps = new ConcurrentHashMap<>();

@Override
public Map<String, Object> getComponentMap(String componentType) {
return componentMaps.computeIfAbsent(componentType, k -> new HashMap<>());
return componentMaps.computeIfAbsent(componentType, k -> new ConcurrentHashMap<>());
}

@Override
public void put(String componentType, String key, Object component) {
componentMaps.computeIfAbsent(componentType, k -> new HashMap<>()).put(key, component);
componentMaps.computeIfAbsent(componentType, k -> new ConcurrentHashMap<>()).put(key, component);
}
}
14 changes: 13 additions & 1 deletion src/main/java/ai/labs/eddi/engine/runtime/BaseRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,19 @@ public <T> Future<T> submitCallable(final Callable<T> callable, final IFinishedE
}

final T result = callable.call();
callback.onComplete(result);
if (Thread.currentThread().isInterrupted()) {
// Execution was cancelled (e.g., agent timeout) but the callable
// completed anyway (non-interruptible I/O). Route to onFailure
// to skip stale persistence that would overwrite newer state.
// Return null to prevent leaking the stale result via the Future.
log.warnf("Execution completed after cancellation — discarding result to prevent stale persistence (thread=%s)",
Thread.currentThread().getName());
callback.onFailure(new InterruptedException(
"Execution completed after cancellation — result discarded"));
return null;
} else {
callback.onComplete(result);
}
return result;
} catch (Throwable t) {
Comment thread
ginccc marked this conversation as resolved.
log.error(t.getLocalizedMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ public void execute(IConversationMemory memory, Object component) throws Lifecyc
List<String> actions = actionsData.getResult();
if (actions != null && actions.contains(CATCH_ANY_INPUT_AS_PROPERTY_ACTION)) {
IData<String> initialInputData = currentStep.getLatestData(INPUT_INITIAL_IDENTIFIER);
String initialInput = initialInputData.getResult();
if (!initialInput.isEmpty()) {
properties.add(new Property(EXPRESSION_MEANING_USER_INPUT, initialInput, conversation));
if (initialInputData != null) {
String initialInput = initialInputData.getResult();
if (initialInput != null && !initialInput.isEmpty()) {
properties.add(new Property(EXPRESSION_MEANING_USER_INPUT, initialInput, conversation));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;
import org.mockito.InOrder;

import static org.mockito.Mockito.*;

/**
Expand Down Expand Up @@ -95,8 +97,35 @@ void shouldUpdateResourceAndStoreHistory() throws Exception {
Integer newVersion = store.update("id1", 1, "updated");

assertEquals(2, newVersion);
verify(storage).store(historyResource); // archived old version
verify(storage).store(newResource); // stored new version
// Verify ordering: history MUST be archived BEFORE the conditional store.
// If swapped, a successful storeIfCurrentVersion followed by a failed
// history archive could leave the update without a corresponding history entry.
InOrder inOrder = inOrder(storage);
inOrder.verify(storage).store(historyResource); // archived old version first
inOrder.verify(storage).storeIfCurrentVersion(newResource, 1); // then conditional store
}

@Test
void shouldThrowResourceModifiedWhenConcurrentUpdate() throws Exception {
IResourceStorage.IResource<String> currentResource = mock(IResourceStorage.IResource.class);
when(storage.read("id1", 1)).thenReturn(currentResource);
when(currentResource.getId()).thenReturn("id1");
when(currentResource.getVersion()).thenReturn(1);

IResourceStorage.IHistoryResource<String> historyResource = mock(IResourceStorage.IHistoryResource.class);
when(storage.newHistoryResourceFor(currentResource, false)).thenReturn(historyResource);

IResourceStorage.IResource<String> newResource = mock(IResourceStorage.IResource.class);
when(storage.newResource("id1", 2, "updated")).thenReturn(newResource);

doThrow(new IResourceStore.ResourceModifiedException("concurrent edit"))
.when(storage).storeIfCurrentVersion(newResource, 1);

assertThrows(IResourceStore.ResourceModifiedException.class,
() -> store.update("id1", 1, "updated"));

// History should still have been archived before the conditional store failed
verify(storage).store(historyResource);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.*;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -52,4 +54,56 @@ void put_overwritesSameKey() {
cache.put("parser", "key1", "new");
assertEquals("new", cache.getComponentMap("parser").get("key1"));
}

@Test
void concurrentAccess_noCorruption() throws Exception {
var cache = new ComponentCache();
int threadCount = 8;
int opsPerThread = 500;
var executor = Executors.newFixedThreadPool(threadCount);
var latch = new CountDownLatch(1);
var futures = new ArrayList<Future<?>>();

// Half the threads write (simulating deployment), half read (simulating
// conversation turns)
for (int t = 0; t < threadCount; t++) {
final int threadId = t;
futures.add(executor.submit(() -> {
try {
latch.await(); // all threads start together
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
for (int i = 0; i < opsPerThread; i++) {
String type = "type" + (i % 5); // 5 component types
if (threadId % 2 == 0) {
// Writer thread (deployment)
cache.put(type, "key-" + threadId + "-" + i, "val-" + i);
} else {
// Reader thread (conversation turn)
Map<String, Object> map = cache.getComponentMap(type);
assertNotNull(map); // should never be null
map.getOrDefault("key-any", null); // read without NPE
}
}
}));
}

latch.countDown(); // go!
executor.shutdown();
assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS));

// Verify no exceptions were thrown
for (Future<?> f : futures) {
assertDoesNotThrow(() -> f.get(1, TimeUnit.SECONDS));
}

// Verify data is consistent — writer threads wrote to type0..type4
for (int type = 0; type < 5; type++) {
Map<String, Object> map = cache.getComponentMap("type" + type);
assertNotNull(map);
assertFalse(map.isEmpty());
}
}
}
Loading