Skip to content

Commit 239b4c7

Browse files
authored
feat(core): deliver request-lifecycle P0 scaffolding (#227)
Thread a single RequestContext end-to-end through dispatch_pipeline (forwarded from all 5 ingress shells, dropping the per-target throwaway context) and add a type-keyed ContextBag scratch space. Introduce a read-only PluginKernel aggregating the Hook/Vendor/Protocol registries. Also add the lifecycle and observability design RFCs. Behaviour unchanged; check/clippy/test all green.
1 parent 6694793 commit 239b4c7

11 files changed

Lines changed: 684 additions & 5 deletions

File tree

crates/nyro-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod db;
55
pub mod error;
66
pub mod integrations;
77
pub mod logging;
8+
pub mod plugin;
89
pub mod protocol;
910
pub mod provider;
1011
pub mod proxy;

crates/nyro-core/src/plugin/mod.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
//! Unified extension kernel (P0 scaffolding for the request-lifecycle RFC).
2+
//!
3+
//! Nyro already has several compile-time, `inventory`-based registries that act
4+
//! as implicit plugin systems:
5+
//!
6+
//! - [`HookRegistry`] — request/response integration hooks.
7+
//! - [`VendorRegistry`] — provider vendor presets / adapters.
8+
//! - [`ProtocolRegistry`] — protocol endpoint handlers.
9+
//!
10+
//! [`PluginKernel`] is a thin, **read-only** façade that aggregates those
11+
//! registries behind one entry point. It does not own or replace them; it only
12+
//! offers a single place for future phases (`PhaseHook`, capabilities) and admin
13+
//! tooling to enumerate "what is loaded". Introducing it now is purely additive
14+
//! and changes no runtime behaviour.
15+
//!
16+
//! See `docs/design/lifecycle.md` for the full framework design.
17+
18+
use std::sync::OnceLock;
19+
20+
use crate::integrations::HookRegistry;
21+
use crate::protocol::registry::ProtocolRegistry;
22+
use crate::provider::VendorRegistry;
23+
24+
/// The kind of extension capability a manifest entry represents.
25+
///
26+
/// Mirrors the capability taxonomy in the lifecycle RFC. Only the
27+
/// already-existing capabilities are surfaced today; `PhaseHook` /
28+
/// `TelemetryExporter` slots are reserved for later phases.
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30+
pub enum CapabilityKind {
31+
/// A request-phase integration hook ([`crate::integrations::RequestHook`]).
32+
RequestHook,
33+
/// A response-phase integration hook ([`crate::integrations::ResponseHook`]).
34+
ResponseHook,
35+
/// A provider vendor preset/adapter.
36+
ProviderVendor,
37+
/// A protocol endpoint handler.
38+
ProtocolEndpoint,
39+
}
40+
41+
impl CapabilityKind {
42+
pub fn as_str(self) -> &'static str {
43+
match self {
44+
CapabilityKind::RequestHook => "request_hook",
45+
CapabilityKind::ResponseHook => "response_hook",
46+
CapabilityKind::ProviderVendor => "provider_vendor",
47+
CapabilityKind::ProtocolEndpoint => "protocol_endpoint",
48+
}
49+
}
50+
}
51+
52+
/// A read-only description of one loaded extension.
53+
#[derive(Debug, Clone)]
54+
pub struct PluginManifest {
55+
/// Stable identifier of the extension (hook name / vendor id / protocol id).
56+
pub id: String,
57+
/// Which capability slot this extension occupies.
58+
pub capability: CapabilityKind,
59+
}
60+
61+
/// Aggregated, read-only view over Nyro's compile-time extension registries.
62+
///
63+
/// Cheap to build (it only borrows the global registries) and cached as a
64+
/// process-wide singleton via [`PluginKernel::global`].
65+
pub struct PluginKernel {
66+
hooks: &'static HookRegistry,
67+
vendors: &'static VendorRegistry,
68+
protocols: &'static ProtocolRegistry,
69+
}
70+
71+
impl PluginKernel {
72+
/// Process-wide kernel singleton.
73+
pub fn global() -> &'static PluginKernel {
74+
static KERNEL: OnceLock<PluginKernel> = OnceLock::new();
75+
KERNEL.get_or_init(|| PluginKernel {
76+
hooks: HookRegistry::global(),
77+
vendors: VendorRegistry::global(),
78+
protocols: ProtocolRegistry::global(),
79+
})
80+
}
81+
82+
/// Enumerate every loaded extension across all registries.
83+
pub fn manifests(&self) -> Vec<PluginManifest> {
84+
let mut out = Vec::new();
85+
86+
for hook in self.hooks.request_hooks() {
87+
out.push(PluginManifest {
88+
id: hook.name().to_string(),
89+
capability: CapabilityKind::RequestHook,
90+
});
91+
}
92+
for hook in self.hooks.response_hooks() {
93+
out.push(PluginManifest {
94+
id: hook.name().to_string(),
95+
capability: CapabilityKind::ResponseHook,
96+
});
97+
}
98+
for vendor in self.vendors.list_metadata() {
99+
out.push(PluginManifest {
100+
id: vendor.id.to_string(),
101+
capability: CapabilityKind::ProviderVendor,
102+
});
103+
}
104+
for handler in self.protocols.list() {
105+
out.push(PluginManifest {
106+
id: handler.id().to_string(),
107+
capability: CapabilityKind::ProtocolEndpoint,
108+
});
109+
}
110+
111+
out
112+
}
113+
114+
/// Manifests filtered to a single capability kind.
115+
pub fn manifests_of(&self, kind: CapabilityKind) -> Vec<PluginManifest> {
116+
self.manifests()
117+
.into_iter()
118+
.filter(|m| m.capability == kind)
119+
.collect()
120+
}
121+
}
122+
123+
#[cfg(test)]
124+
mod tests {
125+
use super::*;
126+
127+
#[test]
128+
fn kernel_aggregates_builtin_extensions() {
129+
let kernel = PluginKernel::global();
130+
let manifests = kernel.manifests();
131+
132+
// Built-in protocol endpoints and vendor presets are always registered,
133+
// so the aggregated view must never be empty.
134+
assert!(
135+
!manifests.is_empty(),
136+
"expected built-in extensions to be registered"
137+
);
138+
assert!(
139+
!kernel
140+
.manifests_of(CapabilityKind::ProtocolEndpoint)
141+
.is_empty(),
142+
"expected at least one protocol endpoint"
143+
);
144+
assert!(
145+
!kernel
146+
.manifests_of(CapabilityKind::ProviderVendor)
147+
.is_empty(),
148+
"expected at least one provider vendor"
149+
);
150+
}
151+
}

crates/nyro-core/src/proxy/context.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
//! - **Protocol / routing fields** – filled in progressively as the request
1515
//! moves through `intake → security → planner → dispatcher`.
1616
17+
use std::any::{Any, TypeId};
18+
use std::collections::HashMap;
1719
use std::sync::atomic::{AtomicBool, Ordering};
1820
use std::sync::{Arc, Mutex, OnceLock};
1921
use std::time::{Duration, Instant};
@@ -184,6 +186,54 @@ impl Default for TraceSink {
184186
}
185187
}
186188

189+
// ── ContextBag ──────────────────────────────────────────────────────────────────
190+
191+
/// Type-keyed, request-scoped scratch space shared across all phases (Nyro's
192+
/// equivalent of OpenResty `ngx.ctx`). Values are keyed by their concrete type,
193+
/// so producers and consumers agree by type rather than stringly-typed keys.
194+
///
195+
/// The bag is shared across `RequestContext` clones (like `outcome` / `trace`)
196+
/// so a value inserted in one phase is visible to later phases. `get` returns a
197+
/// clone to avoid borrowing through the internal lock.
198+
#[derive(Clone, Default)]
199+
pub struct ContextBag(Arc<Mutex<HashMap<TypeId, Box<dyn Any + Send + Sync>>>>);
200+
201+
impl ContextBag {
202+
pub fn new() -> Self {
203+
Self::default()
204+
}
205+
206+
/// Insert (or replace) the value stored for type `T`.
207+
pub fn insert<T: Any + Send + Sync>(&self, value: T) {
208+
if let Ok(mut guard) = self.0.lock() {
209+
guard.insert(TypeId::of::<T>(), Box::new(value));
210+
}
211+
}
212+
213+
/// Fetch a clone of the value stored for type `T`, if present.
214+
pub fn get<T: Any + Send + Sync + Clone>(&self) -> Option<T> {
215+
self.0
216+
.lock()
217+
.ok()
218+
.and_then(|guard| guard.get(&TypeId::of::<T>())?.downcast_ref::<T>().cloned())
219+
}
220+
221+
/// Returns `true` if a value of type `T` is present.
222+
pub fn contains<T: Any + Send + Sync>(&self) -> bool {
223+
self.0
224+
.lock()
225+
.map(|guard| guard.contains_key(&TypeId::of::<T>()))
226+
.unwrap_or(false)
227+
}
228+
}
229+
230+
impl std::fmt::Debug for ContextBag {
231+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232+
let len = self.0.lock().map(|g| g.len()).unwrap_or(0);
233+
f.debug_struct("ContextBag").field("len", &len).finish()
234+
}
235+
}
236+
187237
// ── RequestContext ────────────────────────────────────────────────────────────
188238

189239
/// Unified per-request context. Stored in `axum::Extension` and cloned into
@@ -221,6 +271,9 @@ pub struct RequestContext {
221271
pub outcome: Arc<OnceLock<RequestOutcome>>,
222272
/// Lightweight trace log.
223273
pub trace: TraceSink,
274+
/// Type-keyed request-scoped scratch space (Nyro's `ngx.ctx`), shared
275+
/// across clones and visible to every pipeline phase.
276+
pub extensions: ContextBag,
224277
}
225278

226279
impl RequestContext {
@@ -239,6 +292,7 @@ impl RequestContext {
239292
auth_subject: None,
240293
outcome: Arc::new(OnceLock::new()),
241294
trace: TraceSink::new(),
295+
extensions: ContextBag::new(),
242296
}
243297
}
244298

crates/nyro-core/src/proxy/dispatcher/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub async fn dispatch_pipeline(
7171
envelope: RawEnvelope,
7272
request: AiRequest,
7373
ingress: ProtocolId,
74+
mut ctx: RequestContext,
7475
) -> Response {
7576
// Derive logging strings from envelope.
7677
let method_owned = envelope.method.clone();
@@ -224,9 +225,11 @@ pub async fn dispatch_pipeline(
224225
};
225226

226227
// Resolve egress protocol + base URL via negotiate().
228+
// The request-scoped `ctx` is threaded end-to-end from the ingress
229+
// middleware (no per-target throwaway context); negotiate records its
230+
// trace/egress decision onto it.
227231
let provider_protocols = ProviderProtocols::from_provider(&provider);
228-
let mut req_ctx = RequestContext::new(ingress, std::time::Duration::from_secs(30));
229-
let plan = match negotiate(ingress, None, Some(&provider_protocols), &mut req_ctx) {
232+
let plan = match negotiate(ingress, None, Some(&provider_protocols), &mut ctx) {
230233
Ok(p) => p,
231234
Err(e) => {
232235
last_response = Some(e.render(None));
@@ -438,7 +441,7 @@ pub async fn dispatch(
438441
ingress: ProtocolId,
439442
method: &'static str,
440443
path: &'static str,
441-
_ctx: &mut RequestContext,
444+
ctx: &mut RequestContext,
442445
) -> Response {
443446
let flat_headers: std::collections::HashMap<String, String> = headers
444447
.iter()
@@ -456,7 +459,7 @@ pub async fn dispatch(
456459
Err(e) => return log_decode_error(&gw, &envelope, ingress, e),
457460
};
458461

459-
dispatch_pipeline(gw, headers, envelope, request, ingress).await
462+
dispatch_pipeline(gw, headers, envelope, request, ingress, ctx.clone()).await
460463
}
461464

462465
// ── Handler context types ─────────────────────────────────────────────────────
@@ -914,6 +917,10 @@ mod tests {
914917
envelope,
915918
request,
916919
OPENAI_COMPATIBLE_CHAT_COMPLETIONS_V1,
920+
crate::proxy::context::RequestContext::new(
921+
OPENAI_COMPATIBLE_CHAT_COMPLETIONS_V1,
922+
std::time::Duration::from_secs(30),
923+
),
917924
)
918925
.await;
919926

crates/nyro-core/src/proxy/ingress/anthropic_messages/messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub async fn handler(
4141
envelope,
4242
request,
4343
ANTHROPIC_MESSAGES_2023_06_01,
44+
ctx.0,
4445
)
4546
.await
4647
}

crates/nyro-core/src/proxy/ingress/google_generative/generate_content.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub async fn handler(
5757
envelope,
5858
request,
5959
GOOGLE_GEMINI_GENERATE_CONTENT_V1BETA,
60+
ctx.0,
6061
)
6162
.await
6263
}

crates/nyro-core/src/proxy/ingress/openai_compatible/chat_completions.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub async fn handler(
4848
envelope,
4949
request,
5050
OPENAI_COMPATIBLE_CHAT_COMPLETIONS_V1,
51+
ctx.0,
5152
)
5253
.await
5354
}

crates/nyro-core/src/proxy/ingress/openai_compatible/embeddings.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub async fn handler(
4141
envelope,
4242
request,
4343
OPENAI_COMPATIBLE_EMBEDDINGS_V1,
44+
ctx.0,
4445
)
4546
.await
4647
}

crates/nyro-core/src/proxy/ingress/openai_responses/responses.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ pub async fn handler(
3333
Ok(r) => r,
3434
Err(e) => return log_decode_error(&gw, &envelope, OPENAI_RESPONSES_V1, e),
3535
};
36-
dispatch_pipeline(gw, headers, envelope, request, OPENAI_RESPONSES_V1).await
36+
dispatch_pipeline(gw, headers, envelope, request, OPENAI_RESPONSES_V1, ctx.0).await
3737
}

0 commit comments

Comments
 (0)