Skip to content

Real-Time WebSocket

Loquent uses a persistent WebSocket connection to push server-side events to the browser. New inbound SMS, delivery status changes, and bulk operation progress all arrive this way — no polling.

┌─────────────────────────────────────────────────────┐
│ Server │
│ │
│ Twilio Webhook ──► twilio_events_api │
│ Send Message ──► send_message_api ┐ │
│ Bulk Enrich ──► bulk_enrich_service │ │
│ ▼ │
│ EventHub │
│ ┌─────┴────┐ │
│ org_channels user_channels
│ (broadcast) (broadcast)
└─────────────────────┬─────────────────────────┘
│ /api/realtime (WebSocket)
│ merges org + user streams
┌─────────────────────────────────────────────────────┐
│ Browser │
│ │
│ RealtimeProvider │
│ └─ use_websocket → Signal<Vec<RealtimeEvent>> │
│ └─ RealtimeContext (Dioxus context) │
│ └─ use_realtime_messages(contact_id) │
│ └─ CommunicationFeed │
└─────────────────────────────────────────────────────┘

EventHub (src/bases/realtime/event_hub.rs) is the central broadcast registry. It holds two ChannelRegistry instances — one keyed by org_id, one by user_id — each backed by a tokio::sync::broadcast channel with a capacity of 64 events.

// Publish to every connected user in the org
hub.publish_org(org_id, AppEvent {
event_type: EVT_MESSAGE_NEW.into(),
payload: serde_json::to_value(&message)?,
});
// Publish to a single user only
hub.publish_user(user_id, AppEvent { ... });

EventHub is initialized once at startup as a global singleton via init_event_hub() and accessed anywhere with event_hub() (src/bases/realtime/event_hub_static.rs).

GET /api/realtime — defined in src/bases/realtime/ws_route.rs. Requires an authenticated session.

On upgrade, the handler:

  1. Subscribes to both the org channel and the user channel for the authenticated session.
  2. Merges the two BroadcastStreams into a single stream.
  3. Forwards every event to the client as a RealtimeEvent JSON frame.
  4. Sends a heartbeat ping every 30 seconds to keep the connection alive.
  5. Closes cleanly if the client disconnects (send error).

Call event_hub() from any server function or Axum handler, then publish:

use crate::bases::realtime::{AppEvent, event_hub};
use crate::mods::messaging::EVT_MESSAGE_NEW;
let hub = event_hub();
if let Ok(payload) = serde_json::to_value(&message) {
hub.publish_org(org_id, AppEvent {
event_type: EVT_MESSAGE_NEW.into(),
payload,
});
}

All frames sent over the WebSocket are RealtimeEvent objects serialized as JSON:

{
"event_type": "messaging.message.new",
"payload": { ... }
}

AppEvent (server-only) and RealtimeEvent (WASM-safe) are structurally identical. The conversion is automatic in the WS route.

event_typeTriggerPayload
messaging.message.newInbound SMS received or outbound message sentMessage object
messaging.message.statusTwilio delivery status webhook (sent/delivered/failed)Message object with updated status
heartbeatEvery 30 secondsnull

Event type constants live in src/mods/messaging/types/message_event_type.rs:

pub const EVT_MESSAGE_NEW: &str = "messaging.message.new";
pub const EVT_MESSAGE_STATUS: &str = "messaging.message.status";

Mount RealtimeProvider once inside the authenticated layout. It opens a single WebSocket, buffers up to 500 received events in a Signal<Vec<RealtimeEvent>>, and exposes them via Dioxus context.

src/shared/context/realtime_context.rs
#[component]
pub fn RealtimeProvider(children: Element) -> Element {
let mut events: Signal<Vec<RealtimeEvent>> = use_signal(Vec::new);
let mut socket = use_websocket(|| realtime_ws(WebSocketOptions::new()));
use_future(move || async move {
while let Ok(event) = socket.recv().await {
events.with_mut(|v| {
v.push(event);
if v.len() > 500 {
v.drain(..v.len() - 500);
}
});
}
});
use_context_provider(|| RealtimeContext { events });
children
}

All descendant components share the same connection and event buffer. Do not nest RealtimeProvider.

RealtimeContext is a plain Dioxus context struct with a single field:

#[derive(Clone, PartialEq)]
pub struct RealtimeContext {
pub events: Signal<Vec<RealtimeEvent>>,
}

Read it in any descendant component with use_context::<RealtimeContext>().

Use the use_realtime_messages hook to subscribe to messaging events for a specific contact:

src/mods/messaging/hooks/use_realtime_messages.rs
pub fn use_realtime_messages(contact_id: Uuid) -> Signal<Vec<Message>> {
let ctx = use_context::<RealtimeContext>();
let mut live_messages: Signal<Vec<Message>> = use_signal(Vec::new);
// Reset when the contact changes
use_effect(use_reactive!(|contact_id| {
live_messages.set(Vec::new());
}));
// React to every new event in the shared buffer
use_effect(move || {
let events = (ctx.events)();
for event in &events {
handle_event(event, contact_id, &mut live_messages);
}
});
live_messages
}

The hook filters for messaging.message.new and messaging.message.status, matches by contact_id, and deduplicates by message ID. Status updates patch in-place rather than append.

#[component]
fn CommunicationFeed(
calls: Vec<CallDetails>,
messages: Vec<Message>,
contact_id: Uuid,
) -> Element {
let live_messages = use_realtime_messages(contact_id);
// Merge live events with server-fetched messages
let mut combined = live_messages(); // live first — wins dedup
combined.extend(messages.clone());
combined.sort_by(|a, b| a.created_at.cmp(&b.created_at));
combined.dedup_by_key(|m| m.id);
// render combined...
}

Live messages take priority in the merge so that real-time status updates override stale server-fetched copies.

  1. Define a constant in the relevant module’s types/ directory.
  2. Publish via event_hub().publish_org(org_id, AppEvent { event_type: MY_EVT.into(), payload }) in the server handler.
  3. On the client, read from RealtimeContext and match on event.event_type in a use_effect.

No changes to the WebSocket infrastructure are needed — it is event-type-agnostic.