Skip to content

Commit b804478

Browse files
feat(samantha): Phase 1-3 evolution + thread-safe Memory
Phase 1 — Memory (verbatim log): - Add messages table + FTS5 alongside facts (MemPalace insight: raw phrasing preserves nuance that LLM-extracted facts lose) - Auto-log every chat turn through pipeline._log_turn - Dual-path recall in context.py (facts + verbatim messages) - Dream cycle consolidates from both sources Phase 2 — Planner (ambient priors): - Planner reads Thronglets ambient_priors from Perception - failure-residue >=0.6 -> tighter caps + optional confirm - mixed-residue -> no false confidence + optional confirm - success-prior >=0.7 -> allow more ambition - plan() signature: plan(stimulus, perception) - Defensive isinstance checks against malformed external JSON Phase 3 — Architecture: - Extract pipeline.py: run_pipeline hook-based orchestration, all I/O delegated via Callables (testable without Samantha) - Extract http.py: make_handler closure replaces module-level global - Psyche constitutive: _perceive always returns a Perception, baseline kernel when Psyche server down (never None) - server.py slimmed ~720 -> ~655 lines Thread safety — Memory: - sqlite3.Connection moved from instance state to threading.local. Each thread lazily opens its own connection via _connect(); WAL + SQLite file-layer serialization handles concurrency. No Python-level locks needed. - Fixes silent cross-thread crashes that had been killing recall, log_message, prune, and dream — all previously masked by except Exception: pass at debug level. - Memory-failure logs upgraded to warning so real issues surface. Tests: 300 passed, including cross-thread regression test and ambient-priors coverage. 8-thread x 50-op stress test: 0 errors, all data durable. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a9bd658 commit b804478

9 files changed

Lines changed: 755 additions & 226 deletions

File tree

oasyce_sdk/agent/runtime.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Perception:
4949
dynamic_context: str = ""
5050
response_contract: ResponseContract | None = None
5151
generation_controls: GenerationControls | None = None
52+
ambient_priors: dict | None = None
5253

5354
@property
5455
def has_collective_experience(self) -> bool:

oasyce_sdk/samantha/context.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ def build_messages(
206206
history_summary: str = "",
207207
image_urls: list[str] | None = None,
208208
recent_posts: list[dict[str, Any]] | None = None,
209+
message_matches: list[dict[str, Any]] | None = None,
209210
context_window: int = DEFAULT_CONTEXT_WINDOW,
210211
) -> list[dict[str, Any]]:
211212
"""Assemble the full message list with budget-aware truncation.
@@ -311,14 +312,28 @@ def build_messages(
311312
system_parts.append(posts_text)
312313
retrieval_used += _estimate_tokens(posts_text)
313314

314-
# Memories (FTS5 recalled facts)
315+
# Memories (FTS5 recalled facts) + verbatim message matches
316+
# Split remaining retrieval budget: ~half for facts, ~half for messages
317+
remaining_retrieval = budget.retrieval - retrieval_used
318+
fact_budget = remaining_retrieval // 2 if message_matches else remaining_retrieval
319+
msg_budget = remaining_retrieval - fact_budget
320+
315321
if memories:
316-
mem_budget = budget.retrieval - retrieval_used
317322
mem_lines = [f"- ({m['category']}) {m['content']}" for m in memories[:5]]
318323
mem_text = "[Your memories about this user]\n" + "\n".join(mem_lines)
319-
mem_text = _truncate_text(mem_text, mem_budget)
324+
mem_text = _truncate_text(mem_text, fact_budget)
320325
system_parts.append(mem_text)
321326

327+
if message_matches:
328+
msg_lines = []
329+
for m in message_matches[:5]:
330+
role = m.get("role", "user")
331+
who = "they said" if role == "user" else "you said"
332+
msg_lines.append(f'- {who}: "{m.get("content", "")[:200]}"')
333+
msg_text = "[Relevant past exchanges]\n" + "\n".join(msg_lines)
334+
msg_text = _truncate_text(msg_text, msg_budget)
335+
system_parts.append(msg_text)
336+
322337
messages.append({"role": "system", "content": "\n\n".join(system_parts)})
323338

324339
# ── Vision: inject recent post images (base64, concurrent) ─

oasyce_sdk/samantha/http.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""HTTP webhook server for Samantha.
2+
3+
Two endpoints:
4+
POST /hook/message — chat message webhook from the App backend
5+
POST /hook/post_mention — @mention webhook from the App backend
6+
GET /health — liveness + active session list
7+
8+
Kept in its own module so server.py focuses on Samantha itself.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import json
14+
import logging
15+
from http.server import BaseHTTPRequestHandler, HTTPServer
16+
from typing import TYPE_CHECKING
17+
18+
if TYPE_CHECKING:
19+
from .server import Samantha
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def make_handler(samantha: "Samantha") -> type[BaseHTTPRequestHandler]:
25+
"""Bind a Samantha instance to a BaseHTTPRequestHandler subclass.
26+
27+
Closure-based wiring avoids a module-level global. The returned class
28+
is what HTTPServer expects (it instantiates per request).
29+
"""
30+
from .server import Stimulus # local import: break cycle
31+
32+
class WebhookHandler(BaseHTTPRequestHandler):
33+
def do_POST(self):
34+
length = int(self.headers.get("Content-Length", 0))
35+
body = json.loads(self.rfile.read(length)) if length else {}
36+
37+
if self.path == "/hook/message":
38+
session_id = body.get("session_id", 0)
39+
sender_id = body.get("sender_id", 0)
40+
content = body.get("content", "")
41+
42+
if not content:
43+
self._respond(200, {"ok": True})
44+
return
45+
46+
samantha.submit(Stimulus(
47+
kind="chat", content=content,
48+
sender_id=sender_id, session_id=session_id,
49+
))
50+
self._respond(200, {"ok": True})
51+
52+
elif self.path == "/hook/post_mention":
53+
post_id = body.get("post_id", 0)
54+
comment_id = body.get("comment_id", 0)
55+
sender_id = body.get("sender_id", 0)
56+
title = body.get("title", "")
57+
content = body.get("content", "")
58+
59+
if not post_id and not content:
60+
self._respond(200, {"ok": True})
61+
return
62+
63+
samantha.submit(Stimulus(
64+
kind="mention",
65+
content=content,
66+
sender_id=sender_id,
67+
post_id=post_id,
68+
comment_id=comment_id,
69+
metadata={"post_title": title},
70+
))
71+
self._respond(200, {"ok": True})
72+
73+
else:
74+
self._respond(404, {"error": "not found"})
75+
76+
def do_GET(self):
77+
if self.path == "/health":
78+
sessions = list(samantha._sessions.keys())
79+
self._respond(200, {"status": "ok", "active_sessions": sessions})
80+
else:
81+
self._respond(404, {"error": "not found"})
82+
83+
def _respond(self, code: int, body: dict):
84+
self.send_response(code)
85+
self.send_header("Content-Type", "application/json")
86+
self.end_headers()
87+
self.wfile.write(json.dumps(body).encode())
88+
89+
def log_message(self, fmt, *args):
90+
logger.info(fmt, *args)
91+
92+
return WebhookHandler
93+
94+
95+
def run_http_server(samantha: "Samantha", port: int) -> None:
96+
"""Block forever serving Samantha's HTTP endpoints on 127.0.0.1:{port}."""
97+
handler_cls = make_handler(samantha)
98+
server = HTTPServer(("127.0.0.1", port), handler_cls)
99+
logger.info("Health endpoint on http://127.0.0.1:%d/health", port)
100+
server.serve_forever()

oasyce_sdk/samantha/loop.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,24 @@ def proactive_loop(samantha: Samantha, interval: int = 300) -> None:
4141

4242

4343
def _memory_maintenance(samantha: Samantha) -> None:
44-
"""Prune stale facts + Dream consolidation across all active sessions."""
44+
"""Prune stale facts + Dream consolidation across all active sessions.
45+
46+
Errors here are warnings, not debug: if maintenance is silently failing
47+
then core memory stops updating and stale facts never get pruned, which
48+
is a quiet drift of agent behavior over time.
49+
"""
4550
for user_id, sess in list(samantha._sessions.items()):
4651
try:
4752
pruned = sess.memory.prune(max_age_days=90, min_access=0)
4853
if pruned:
4954
logger.info("User %d: pruned %d stale memories", user_id, pruned)
5055
except Exception:
51-
logger.debug("Prune failed for user %d", user_id, exc_info=True)
56+
logger.warning("Prune failed for user %d", user_id, exc_info=True)
5257

5358
try:
5459
samantha.dream(user_id, sess)
5560
except Exception:
56-
logger.debug("Dream failed for user %d", user_id, exc_info=True)
61+
logger.warning("Dream failed for user %d", user_id, exc_info=True)
5762

5863

5964
def _scan_feed(samantha: Samantha, seen: set[int]) -> None:

oasyce_sdk/samantha/memory.py

Lines changed: 162 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import json
1414
import logging
1515
import sqlite3
16+
import threading
1617
from dataclasses import dataclass
1718
from datetime import datetime, timezone
1819
from pathlib import Path
@@ -31,38 +32,108 @@ class Fact(NamedTuple):
3132
access_count: int
3233

3334

35+
class Message(NamedTuple):
36+
id: int
37+
role: str # 'user' | 'assistant'
38+
content: str
39+
session_id: int
40+
created_at: str
41+
42+
43+
# Schema is idempotent (IF NOT EXISTS everywhere) so every freshly-opened
44+
# connection can run it safely. That's what makes per-thread connections
45+
# work without coordination: each thread opens its own, runs the same DDL,
46+
# and gets the same view of the DB.
47+
_SCHEMA = """
48+
CREATE TABLE IF NOT EXISTS facts (
49+
id INTEGER PRIMARY KEY AUTOINCREMENT,
50+
content TEXT NOT NULL,
51+
category TEXT DEFAULT 'general',
52+
created_at TEXT NOT NULL,
53+
last_accessed TEXT,
54+
access_count INTEGER DEFAULT 0
55+
);
56+
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
57+
USING fts5(content, category, content='facts', content_rowid='id');
58+
59+
CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
60+
INSERT INTO facts_fts(rowid, content, category)
61+
VALUES (new.id, new.content, new.category);
62+
END;
63+
CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
64+
INSERT INTO facts_fts(facts_fts, rowid, content, category)
65+
VALUES ('delete', old.id, old.content, old.category);
66+
END;
67+
68+
CREATE TABLE IF NOT EXISTS messages (
69+
id INTEGER PRIMARY KEY AUTOINCREMENT,
70+
role TEXT NOT NULL,
71+
content TEXT NOT NULL,
72+
session_id INTEGER DEFAULT 0,
73+
created_at TEXT NOT NULL
74+
);
75+
CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts
76+
USING fts5(content, content='messages', content_rowid='id');
77+
CREATE INDEX IF NOT EXISTS idx_messages_session
78+
ON messages(session_id, created_at);
79+
80+
CREATE TRIGGER IF NOT EXISTS messages_ai AFTER INSERT ON messages BEGIN
81+
INSERT INTO messages_fts(rowid, content)
82+
VALUES (new.id, new.content);
83+
END;
84+
CREATE TRIGGER IF NOT EXISTS messages_ad AFTER DELETE ON messages BEGIN
85+
INSERT INTO messages_fts(messages_fts, rowid, content)
86+
VALUES ('delete', old.id, old.content);
87+
END;
88+
"""
89+
90+
3491
class Memory:
35-
"""Persistent fact store backed by SQLite FTS5."""
92+
"""Persistent fact store backed by SQLite FTS5.
93+
94+
Two tables:
95+
facts — LLM-extracted semantic knowledge (the agent's "beliefs")
96+
messages — verbatim turn-by-turn log (the raw conversation)
97+
98+
Extracted facts are lossy but searchable by concept. Verbatim messages
99+
preserve nuance and exact phrasing for recall of specific moments.
100+
Recall from both paths is cheap (FTS5) and complementary.
101+
102+
Threading model
103+
---------------
104+
One Memory instance is shared across threads; each thread that touches
105+
it opens its own `sqlite3.Connection` stored in a `threading.local`.
106+
This matches how the stdlib `sqlite3` module is designed to be used
107+
from multi-threaded code: share the database, not the connection.
108+
109+
SQLite's WAL mode lets readers proceed in parallel with a writer and
110+
serializes writers at the file layer, so no Python-level lock is
111+
needed — the correctness guarantee comes from SQLite itself.
112+
"""
36113

37114
def __init__(self, db_path: Path | None = None):
38115
p = db_path or DEFAULT_DB_PATH
39116
p.parent.mkdir(parents=True, exist_ok=True)
40-
self._conn = sqlite3.connect(str(p))
41-
self._conn.execute("PRAGMA journal_mode=WAL")
42-
self._init_schema()
43-
44-
def _init_schema(self) -> None:
45-
self._conn.executescript("""
46-
CREATE TABLE IF NOT EXISTS facts (
47-
id INTEGER PRIMARY KEY AUTOINCREMENT,
48-
content TEXT NOT NULL,
49-
category TEXT DEFAULT 'general',
50-
created_at TEXT NOT NULL,
51-
last_accessed TEXT,
52-
access_count INTEGER DEFAULT 0
53-
);
54-
CREATE VIRTUAL TABLE IF NOT EXISTS facts_fts
55-
USING fts5(content, category, content='facts', content_rowid='id');
56-
57-
CREATE TRIGGER IF NOT EXISTS facts_ai AFTER INSERT ON facts BEGIN
58-
INSERT INTO facts_fts(rowid, content, category)
59-
VALUES (new.id, new.content, new.category);
60-
END;
61-
CREATE TRIGGER IF NOT EXISTS facts_ad AFTER DELETE ON facts BEGIN
62-
INSERT INTO facts_fts(facts_fts, rowid, content, category)
63-
VALUES ('delete', old.id, old.content, old.category);
64-
END;
65-
""")
117+
self._path = str(p)
118+
self._local = threading.local()
119+
# Eager open: if the path is unwritable or the schema fails, surface
120+
# the error at construction time rather than on the first operation.
121+
self._connect()
122+
123+
def _connect(self) -> sqlite3.Connection:
124+
"""Return the current thread's connection, opening it on first use."""
125+
conn = getattr(self._local, "conn", None)
126+
if conn is None:
127+
conn = sqlite3.connect(self._path)
128+
conn.execute("PRAGMA journal_mode=WAL")
129+
conn.executescript(_SCHEMA)
130+
self._local.conn = conn
131+
return conn
132+
133+
@property
134+
def _conn(self) -> sqlite3.Connection:
135+
"""Thread-local connection — lazy-created per thread, never shared."""
136+
return self._connect()
66137

67138
def save(self, content: str, category: str = "general") -> int:
68139
"""Store a fact. Returns its id."""
@@ -111,6 +182,59 @@ def count(self) -> int:
111182
row = self._conn.execute("SELECT COUNT(*) FROM facts").fetchone()
112183
return row[0] if row else 0
113184

185+
# ── Verbatim messages ─────────────────────────────────────
186+
187+
def log_message(self, role: str, content: str, session_id: int = 0) -> int:
188+
"""Store a verbatim turn. Returns its id."""
189+
if not content:
190+
return 0
191+
now = datetime.now(timezone.utc).isoformat()
192+
cur = self._conn.execute(
193+
"INSERT INTO messages (role, content, session_id, created_at) "
194+
"VALUES (?, ?, ?, ?)",
195+
(role, content, session_id, now),
196+
)
197+
self._conn.commit()
198+
return cur.lastrowid # type: ignore[return-value]
199+
200+
def search_messages(self, query: str, limit: int = 5) -> list[Message]:
201+
"""FTS5 search across verbatim messages, ranked by relevance."""
202+
if not query.strip():
203+
return []
204+
rows = self._conn.execute(
205+
"""
206+
SELECT m.id, m.role, m.content, m.session_id, m.created_at
207+
FROM messages_fts
208+
JOIN messages m ON m.id = messages_fts.rowid
209+
WHERE messages_fts MATCH ?
210+
ORDER BY messages_fts.rank
211+
LIMIT ?
212+
""",
213+
(query, limit),
214+
).fetchall()
215+
return [Message(*r) for r in rows]
216+
217+
def recent_messages(self, session_id: int = 0, limit: int = 20) -> list[Message]:
218+
"""Latest messages for a session (or all sessions if session_id=0)."""
219+
if session_id:
220+
rows = self._conn.execute(
221+
"SELECT id, role, content, session_id, created_at "
222+
"FROM messages WHERE session_id = ? "
223+
"ORDER BY created_at DESC LIMIT ?",
224+
(session_id, limit),
225+
).fetchall()
226+
else:
227+
rows = self._conn.execute(
228+
"SELECT id, role, content, session_id, created_at "
229+
"FROM messages ORDER BY created_at DESC LIMIT ?",
230+
(limit,),
231+
).fetchall()
232+
return [Message(*r) for r in rows]
233+
234+
def message_count(self) -> int:
235+
row = self._conn.execute("SELECT COUNT(*) FROM messages").fetchone()
236+
return row[0] if row else 0
237+
114238
def prune(self, max_age_days: int = 90, min_access: int = 0) -> int:
115239
"""Remove stale facts. Returns count of deleted rows.
116240
@@ -134,7 +258,17 @@ def prune(self, max_age_days: int = 90, min_access: int = 0) -> int:
134258
return deleted
135259

136260
def close(self) -> None:
137-
self._conn.close()
261+
"""Close the current thread's connection. Best-effort cleanup.
262+
263+
Connections held by other threads are reclaimed when those threads
264+
exit or when the garbage collector runs over their thread-locals.
265+
Under WAL every `commit()` is durable, so this cannot cause data
266+
loss — it only releases the current thread's file handle.
267+
"""
268+
conn = getattr(self._local, "conn", None)
269+
if conn is not None:
270+
conn.close()
271+
self._local.conn = None
138272

139273

140274
# ── Core Memory (MemGPT-inspired) ──────────────────────────────

0 commit comments

Comments
 (0)