Documentation Index
Fetch the complete documentation index at: https://openntl.org/llms.txt
Use this file to discover all available pages before exploring further.
Adapters are the bridge between NTL and the outside world. This guide walks through building a custom adapter that exposes NTL signal handlers via HTTP — useful for connecting web applications, mobile apps, and existing services.
The Adapter Trait
Every adapter implements the core Adapter trait:
use ntl::adapter::{Adapter, AdapterHealth, ExternalPayload, Protocol};
use ntl::Signal;
trait Adapter: Send + Sync {
fn ingest(&self, external: ExternalPayload) -> Result<Signal>;
fn emit(&self, signal: Signal) -> Result<ExternalPayload>;
fn protocol(&self) -> Protocol;
fn health(&self) -> AdapterHealth;
}
Example: HTTP Adapter
Let’s build a minimal HTTP adapter using axum:
use ntl::{Node, Signal, SignalType};
use ntl::adapter::{Adapter, AdapterHealth, ExternalPayload, Protocol};
use axum::{Router, Json, extract::State};
use std::sync::Arc;
struct HttpAdapter {
node: Arc<Node>,
}
impl HttpAdapter {
pub fn new(node: Arc<Node>) -> Self {
Self { node }
}
pub fn router(self: Arc<Self>) -> Router {
Router::new()
.route("/signal", axum::routing::post(Self::handle_request))
.with_state(self)
}
async fn handle_request(
State(adapter): State<Arc<HttpAdapter>>,
Json(body): Json<SignalRequest>,
) -> Json<SignalResponse> {
// Translate HTTP request to NTL signal
let signal = Signal::new(body.signal_type)
.with_payload(body.payload)
.with_weight(body.weight.unwrap_or(0.5))
.with_tags(body.tags);
// Emit into NTL network
let signal_id = signal.emit(&adapter.node).await.unwrap();
// Wait for correlated response
match adapter.node
.wait_correlation(signal_id, std::time::Duration::from_secs(30))
.await
{
Ok(response) => Json(SignalResponse {
status: "ok".into(),
signal_id: signal_id.to_string(),
data: Some(response.payload),
}),
Err(_) => Json(SignalResponse {
status: "timeout".into(),
signal_id: signal_id.to_string(),
data: None,
}),
}
}
}
impl Adapter for HttpAdapter {
fn ingest(&self, external: ExternalPayload) -> Result<Signal> {
let request: SignalRequest = serde_json::from_slice(&external.data)?;
Ok(Signal::new(request.signal_type)
.with_payload(request.payload)
.with_weight(request.weight.unwrap_or(0.5)))
}
fn emit(&self, signal: Signal) -> Result<ExternalPayload> {
let body = serde_json::to_vec(&signal.payload)?;
Ok(ExternalPayload {
data: body,
content_type: "application/json".into(),
})
}
fn protocol(&self) -> Protocol {
Protocol::Http
}
fn health(&self) -> AdapterHealth {
AdapterHealth::Healthy
}
}
#[derive(serde::Deserialize)]
struct SignalRequest {
signal_type: SignalType,
payload: serde_json::Value,
weight: Option<f32>,
tags: Vec<String>,
}
#[derive(serde::Serialize)]
struct SignalResponse {
status: String,
signal_id: String,
data: Option<serde_json::Value>,
}
Running the Adapter
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let node = Arc::new(
Node::builder()
.with_config_file("~/.ntl/config.toml")
.build()
.await?
);
let adapter = Arc::new(HttpAdapter::new(node.clone()));
let router = adapter.router();
// Run NTL node and HTTP server concurrently
tokio::select! {
_ = node.run_until_shutdown() => {},
_ = axum::Server::bind(&"0.0.0.0:3000".parse()?)
.serve(router.into_make_service()) => {},
}
Ok(())
}
Now any HTTP client can interact with the NTL network:
curl -X POST http://localhost:3000/signal \
-H "Content-Type: application/json" \
-d '{
"signal_type": "query",
"payload": {"key": "greeting"},
"tags": ["kv", "get"]
}'
Adapter Patterns
Stateless Translation
The simplest pattern — translate external format to signal and back. No state held in the adapter.
Correlation Bridge
Hold external connections open while waiting for NTL correlated responses. Essential for request-response protocols like HTTP.
Event Stream
For protocols that support streaming (WebSocket, SSE, gRPC streams), the adapter subscribes to signal types and pushes them to the external client as they arrive.
Batch Ingestion
For high-throughput sources, the adapter accumulates external payloads and emits them as weighted signal batches at intervals.
Registering Adapters
Adapters register with the node so they appear in health checks and topology:
node.register_adapter(adapter.clone()).await?;
// Now visible in:
// ntl status
// ntl adapters list
Testing Adapters
NTL provides a test harness for adapter development:
#[cfg(test)]
mod tests {
use ntl::testing::MockNode;
#[tokio::test]
async fn test_http_adapter_ingest() {
let node = MockNode::new();
let adapter = HttpAdapter::new(Arc::new(node));
let payload = ExternalPayload {
data: serde_json::to_vec(&serde_json::json!({
"signal_type": "query",
"payload": {"key": "test"},
"tags": ["kv"]
})).unwrap(),
content_type: "application/json".into(),
};
let signal = adapter.ingest(payload).unwrap();
assert_eq!(signal.signal_type, SignalType::Query);
}
}