Real-Time WebSocket
Loquent uses a persistent WebSocket connection to push server-side events to the browser. New inbound SMS, delivery status changes, and bulk operation progress all arrive this way — no polling.
Architecture
Section titled “Architecture”┌─────────────────────────────────────────────────────┐│ Server ││ ││ Twilio Webhook ──► twilio_events_api ││ Send Message ──► send_message_api ┐ ││ Bulk Enrich ──► bulk_enrich_service │ ││ ▼ ││ EventHub ││ ┌─────┴────┐ ││ org_channels user_channels│ (broadcast) (broadcast)└─────────────────────┬─────────────────────────┘ │ /api/realtime (WebSocket) │ merges org + user streams ▼┌─────────────────────────────────────────────────────┐│ Browser ││ ││ RealtimeProvider ││ └─ use_websocket → Signal<Vec<RealtimeEvent>> ││ └─ RealtimeContext (Dioxus context) ││ └─ use_realtime_messages(contact_id) ││ └─ CommunicationFeed │└─────────────────────────────────────────────────────┘Server Side
Section titled “Server Side”EventHub
Section titled “EventHub”EventHub (src/bases/realtime/event_hub.rs) is the central broadcast registry. It holds two ChannelRegistry instances — one keyed by org_id, one by user_id — each backed by a tokio::sync::broadcast channel with a capacity of 64 events.
// Publish to every connected user in the orghub.publish_org(org_id, AppEvent { event_type: EVT_MESSAGE_NEW.into(), payload: serde_json::to_value(&message)?,});
// Publish to a single user onlyhub.publish_user(user_id, AppEvent { ... });EventHub is initialized once at startup as a global singleton via init_event_hub() and accessed anywhere with event_hub() (src/bases/realtime/event_hub_static.rs).
WebSocket Endpoint
Section titled “WebSocket Endpoint”GET /api/realtime — defined in src/bases/realtime/ws_route.rs. Requires an authenticated session.
On upgrade, the handler:
- Subscribes to both the org channel and the user channel for the authenticated session.
- Merges the two
BroadcastStreams into a single stream. - Forwards every event to the client as a
RealtimeEventJSON frame. - Sends a
heartbeatping every 30 seconds to keep the connection alive. - Closes cleanly if the client disconnects (send error).
Publishing an Event
Section titled “Publishing an Event”Call event_hub() from any server function or Axum handler, then publish:
use crate::bases::realtime::{AppEvent, event_hub};use crate::mods::messaging::EVT_MESSAGE_NEW;
let hub = event_hub();if let Ok(payload) = serde_json::to_value(&message) { hub.publish_org(org_id, AppEvent { event_type: EVT_MESSAGE_NEW.into(), payload, });}Wire Protocol
Section titled “Wire Protocol”All frames sent over the WebSocket are RealtimeEvent objects serialized as JSON:
{ "event_type": "messaging.message.new", "payload": { ... }}AppEvent (server-only) and RealtimeEvent (WASM-safe) are structurally identical. The conversion is automatic in the WS route.
Event Types
Section titled “Event Types”event_type | Trigger | Payload |
|---|---|---|
messaging.message.new | Inbound SMS received or outbound message sent | Message object |
messaging.message.status | Twilio delivery status webhook (sent/delivered/failed) | Message object with updated status |
heartbeat | Every 30 seconds | null |
Event type constants live in src/mods/messaging/types/message_event_type.rs:
pub const EVT_MESSAGE_NEW: &str = "messaging.message.new";pub const EVT_MESSAGE_STATUS: &str = "messaging.message.status";Client Side
Section titled “Client Side”RealtimeProvider
Section titled “RealtimeProvider”Mount RealtimeProvider once inside the authenticated layout. It opens a single WebSocket, buffers up to 500 received events in a Signal<Vec<RealtimeEvent>>, and exposes them via Dioxus context.
#[component]pub fn RealtimeProvider(children: Element) -> Element { let mut events: Signal<Vec<RealtimeEvent>> = use_signal(Vec::new); let mut socket = use_websocket(|| realtime_ws(WebSocketOptions::new()));
use_future(move || async move { while let Ok(event) = socket.recv().await { events.with_mut(|v| { v.push(event); if v.len() > 500 { v.drain(..v.len() - 500); } }); } });
use_context_provider(|| RealtimeContext { events }); children}All descendant components share the same connection and event buffer. Do not nest RealtimeProvider.
RealtimeContext
Section titled “RealtimeContext”RealtimeContext is a plain Dioxus context struct with a single field:
#[derive(Clone, PartialEq)]pub struct RealtimeContext { pub events: Signal<Vec<RealtimeEvent>>,}Read it in any descendant component with use_context::<RealtimeContext>().
Subscribing from a Component
Section titled “Subscribing from a Component”Use the use_realtime_messages hook to subscribe to messaging events for a specific contact:
pub fn use_realtime_messages(contact_id: Uuid) -> Signal<Vec<Message>> { let ctx = use_context::<RealtimeContext>(); let mut live_messages: Signal<Vec<Message>> = use_signal(Vec::new);
// Reset when the contact changes use_effect(use_reactive!(|contact_id| { live_messages.set(Vec::new()); }));
// React to every new event in the shared buffer use_effect(move || { let events = (ctx.events)(); for event in &events { handle_event(event, contact_id, &mut live_messages); } });
live_messages}The hook filters for messaging.message.new and messaging.message.status, matches by contact_id, and deduplicates by message ID. Status updates patch in-place rather than append.
Using the Hook in a Component
Section titled “Using the Hook in a Component”#[component]fn CommunicationFeed( calls: Vec<CallDetails>, messages: Vec<Message>, contact_id: Uuid,) -> Element { let live_messages = use_realtime_messages(contact_id);
// Merge live events with server-fetched messages let mut combined = live_messages(); // live first — wins dedup combined.extend(messages.clone()); combined.sort_by(|a, b| a.created_at.cmp(&b.created_at)); combined.dedup_by_key(|m| m.id);
// render combined...}Live messages take priority in the merge so that real-time status updates override stale server-fetched copies.
Adding a New Event Type
Section titled “Adding a New Event Type”- Define a constant in the relevant module’s
types/directory. - Publish via
event_hub().publish_org(org_id, AppEvent { event_type: MY_EVT.into(), payload })in the server handler. - On the client, read from
RealtimeContextand match onevent.event_typein ause_effect.
No changes to the WebSocket infrastructure are needed — it is event-type-agnostic.