Skip to content

Commit 779d603

Browse files
authored
Merge pull request #10 from unknown-studio-dev/feat/config-ui
feat(memory): event-bus emits/subscribes in graph + memory_event_trace MCP tool
2 parents 912a12d + 07fc27b commit 779d603

6 files changed

Lines changed: 1310 additions & 2 deletions

File tree

crates/hoangsa-memory-graph/src/lib.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ pub enum EdgeKind {
5454
Extends,
5555
/// `A` is declared in module `B`.
5656
DeclaredIn,
57+
/// `A` emits / publishes event `B`. `B` is a synthetic event FQN
58+
/// of the form `event::<bus>::<topic>` (bus is `*` when receiver
59+
/// can't be statically named).
60+
Emits,
61+
/// `A` subscribes to / listens for event `B`. Same `event::*::*`
62+
/// FQN convention as `Emits`; the direction is event → handler so
63+
/// that `subscribers_of(event_fqn)` is a plain incoming-edge scan.
64+
Subscribes,
5765
}
5866

5967
impl EdgeKind {
@@ -65,6 +73,8 @@ impl EdgeKind {
6573
EdgeKind::References => "references",
6674
EdgeKind::Extends => "extends",
6775
EdgeKind::DeclaredIn => "declared_in",
76+
EdgeKind::Emits => "emits",
77+
EdgeKind::Subscribes => "subscribes",
6878
}
6979
}
7080

@@ -76,6 +86,8 @@ impl EdgeKind {
7686
"references" => EdgeKind::References,
7787
"extends" => EdgeKind::Extends,
7888
"declared_in" => EdgeKind::DeclaredIn,
89+
"emits" => EdgeKind::Emits,
90+
"subscribes" => EdgeKind::Subscribes,
7991
_ => return None,
8092
})
8193
}

crates/hoangsa-memory-mcp/src/server.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,7 @@ impl Server {
468468
"memory_skill_propose" => self.tool_skill_propose(arguments).await,
469469
"memory_impact" => self.tool_impact(arguments).await,
470470
"memory_symbol_context" => self.tool_symbol_context(arguments).await,
471+
"memory_event_trace" => self.tool_event_trace(arguments).await,
471472
"memory_detect_changes" => self.tool_detect_changes(arguments).await,
472473
"memory_turn_save" => self.tool_turn_save(arguments).await,
473474
"memory_turns_search" => self.tool_turns_search(arguments).await,
@@ -1705,12 +1706,36 @@ impl Server {
17051706
.out_unresolved(&fqn, hoangsa_memory_graph::EdgeKind::Imports)
17061707
.await?;
17071708

1709+
// Event-bus relations — usually empty for plain functions; the
1710+
// sections render only when populated, so cheap to always
1711+
// request.
1712+
// - `emits` : `fqn → event_node` (out Emits)
1713+
// - `subscribes_to` : `event_node → fqn` (in Subscribes)
1714+
// - `emitted_by` : `function → event_node` (in Emits) — when fqn IS the event node
1715+
// - `subscribers` : `event_node → function` (out Subscribes) — when fqn IS the event node
1716+
let mut emits = g
1717+
.out_neighbors(&fqn, hoangsa_memory_graph::EdgeKind::Emits)
1718+
.await?;
1719+
let mut subscribes_to = g
1720+
.in_neighbors(&fqn, hoangsa_memory_graph::EdgeKind::Subscribes)
1721+
.await?;
1722+
let mut emitted_by = g
1723+
.in_neighbors(&fqn, hoangsa_memory_graph::EdgeKind::Emits)
1724+
.await?;
1725+
let mut subscribers = g
1726+
.out_neighbors(&fqn, hoangsa_memory_graph::EdgeKind::Subscribes)
1727+
.await?;
1728+
17081729
for v in [
17091730
&mut callers,
17101731
&mut callees,
17111732
&mut extends,
17121733
&mut extended_by,
17131734
&mut references,
1735+
&mut emits,
1736+
&mut subscribes_to,
1737+
&mut emitted_by,
1738+
&mut subscribers,
17141739
] {
17151740
v.truncate(limit);
17161741
}
@@ -1740,6 +1765,10 @@ impl Server {
17401765
"references": references.iter().map(node_to_json).collect::<Vec<_>>(),
17411766
"imports_unresolved": unresolved_imports,
17421767
"siblings": siblings.iter().map(node_to_json).collect::<Vec<_>>(),
1768+
"emits": emits.iter().map(node_to_json).collect::<Vec<_>>(),
1769+
"subscribes_to": subscribes_to.iter().map(node_to_json).collect::<Vec<_>>(),
1770+
"emitted_by": emitted_by.iter().map(node_to_json).collect::<Vec<_>>(),
1771+
"subscribers": subscribers.iter().map(node_to_json).collect::<Vec<_>>(),
17431772
});
17441773

17451774
let mut text = format!(
@@ -1769,6 +1798,10 @@ impl Server {
17691798
section("extends", &extends, &mut text);
17701799
section("extended_by", &extended_by, &mut text);
17711800
section("references", &references, &mut text);
1801+
section("emits", &emits, &mut text);
1802+
section("subscribes_to", &subscribes_to, &mut text);
1803+
section("emitted_by", &emitted_by, &mut text);
1804+
section("subscribers", &subscribers, &mut text);
17721805
section("siblings", &siblings, &mut text);
17731806
if !unresolved_imports.is_empty() {
17741807
text.push_str(" imports (external):\n");
@@ -1780,6 +1813,116 @@ impl Server {
17801813
Ok(ToolOutput::new(data, text))
17811814
}
17821815

1816+
/// Find every emitter and subscriber for a given event topic.
1817+
///
1818+
/// Event FQNs are stored as `event::<bus>::<topic>`. With only
1819+
/// `topic` supplied, returns hits across all buses; with `bus`
1820+
/// supplied, restricts to that receiver. The lookup uses the
1821+
/// existing `find_nodes_by_suffix` infrastructure plus a `kind ==
1822+
/// "event"` filter so non-event FQNs that happen to end with the
1823+
/// topic string don't pollute the result.
1824+
async fn tool_event_trace(&self, args: Value) -> anyhow::Result<ToolOutput> {
1825+
#[derive(Deserialize)]
1826+
struct Args {
1827+
topic: String,
1828+
#[serde(default)]
1829+
bus: Option<String>,
1830+
#[serde(default)]
1831+
limit: Option<usize>,
1832+
}
1833+
let Args { topic, bus, limit } = serde_json::from_value(args)?;
1834+
let limit = limit.unwrap_or(32).clamp(1, 128);
1835+
if topic.trim().is_empty() {
1836+
return Ok(ToolOutput::error("topic must be non-empty".to_string()));
1837+
}
1838+
let res = self.resources().await?;
1839+
let g = &res.graph;
1840+
let store = &res.store;
1841+
1842+
let bus_prefix = bus
1843+
.as_deref()
1844+
.map(|b| format!("event::{b}::"))
1845+
.unwrap_or_else(|| "event::".to_string());
1846+
1847+
// O(|NODES|) scan via the existing suffix index. Event nodes
1848+
// share the same `nodes` table as everything else, so this is
1849+
// the same cost as `find_suffix_candidates` already pays.
1850+
let candidates = store.kv.find_nodes_by_suffix(&topic).await?;
1851+
let mut event_fqns: Vec<String> = candidates
1852+
.into_iter()
1853+
.filter(|r| r.kind == "event" && r.id.starts_with(&bus_prefix))
1854+
.map(|r| r.id)
1855+
.collect();
1856+
event_fqns.sort();
1857+
event_fqns.dedup();
1858+
1859+
let mut events_payload: Vec<serde_json::Value> = Vec::new();
1860+
let node_to_json = |n: &hoangsa_memory_graph::Node| {
1861+
json!({
1862+
"fqn": n.fqn,
1863+
"kind": n.kind,
1864+
"path": n.path.to_string_lossy(),
1865+
"line": n.line,
1866+
})
1867+
};
1868+
let mut text = String::new();
1869+
if event_fqns.is_empty() {
1870+
text.push_str(&format!("no event matches: topic={topic}"));
1871+
if let Some(b) = &bus {
1872+
text.push_str(&format!(" bus={b}"));
1873+
}
1874+
text.push('\n');
1875+
}
1876+
for event_fqn in &event_fqns {
1877+
let mut emitters = g
1878+
.in_neighbors(event_fqn, hoangsa_memory_graph::EdgeKind::Emits)
1879+
.await?;
1880+
let mut subscribers = g
1881+
.out_neighbors(event_fqn, hoangsa_memory_graph::EdgeKind::Subscribes)
1882+
.await?;
1883+
emitters.truncate(limit);
1884+
subscribers.truncate(limit);
1885+
1886+
text.push_str(&format!("{event_fqn}\n"));
1887+
if !emitters.is_empty() {
1888+
text.push_str(" emitted by:\n");
1889+
for n in &emitters {
1890+
text.push_str(&format!(
1891+
" {} ({}) {}:{}\n",
1892+
n.fqn,
1893+
n.kind,
1894+
n.path.display(),
1895+
n.line
1896+
));
1897+
}
1898+
}
1899+
if !subscribers.is_empty() {
1900+
text.push_str(" subscribers:\n");
1901+
for n in &subscribers {
1902+
text.push_str(&format!(
1903+
" {} ({}) {}:{}\n",
1904+
n.fqn,
1905+
n.kind,
1906+
n.path.display(),
1907+
n.line
1908+
));
1909+
}
1910+
}
1911+
events_payload.push(json!({
1912+
"fqn": event_fqn,
1913+
"emitters": emitters.iter().map(&node_to_json).collect::<Vec<_>>(),
1914+
"subscribers": subscribers.iter().map(&node_to_json).collect::<Vec<_>>(),
1915+
}));
1916+
}
1917+
1918+
let data = json!({
1919+
"topic": topic,
1920+
"bus": bus,
1921+
"events": events_payload,
1922+
});
1923+
Ok(ToolOutput::new(data, text))
1924+
}
1925+
17831926
/// Given a unified diff, return the symbols the edit touches plus
17841927
/// their upstream blast radius (who calls / references / inherits
17851928
/// from them). Handy as a PR pre-check: "these 7 functions need
@@ -2752,6 +2895,32 @@ fn tools_catalog() -> Vec<Tool> {
27522895
"required": ["fqn"]
27532896
}),
27542897
},
2898+
Tool {
2899+
name: "memory_event_trace".to_string(),
2900+
description: "Trace publishers and subscribers of an event-bus topic. \
2901+
Given a `topic` string (and optionally a `bus` receiver \
2902+
name), returns every indexed function that emits the \
2903+
topic and every handler subscribed to it. Use this when \
2904+
following a pub/sub flow that `memory_symbol_context` \
2905+
can't connect because publisher and subscriber are \
2906+
decoupled by a broker."
2907+
.to_string(),
2908+
input_schema: json!({
2909+
"type": "object",
2910+
"properties": {
2911+
"topic": { "type": "string", "description": "Event topic string (e.g. \"user.created\")." },
2912+
"bus": { "type": "string", "description": "Optional receiver name to disambiguate (`bus`, `socket`, …). Omit to scan all buses." },
2913+
"limit": {
2914+
"type": "integer",
2915+
"minimum": 1,
2916+
"maximum": 128,
2917+
"default": 32,
2918+
"description": "Per-section cap on emitters / subscribers."
2919+
}
2920+
},
2921+
"required": ["topic"]
2922+
}),
2923+
},
27552924
Tool {
27562925
name: "memory_detect_changes".to_string(),
27572926
description: "Parse a unified diff (e.g. `git diff`), find every indexed symbol \

0 commit comments

Comments
 (0)