Skip to content

MeshService

Typed RPC between distributed application instances. Each node gets a unique integer ID and can invoke handlers on any other node (or itself) with full type safety. Uses Redis pub/sub for messaging and sorted sets for node tracking.

Usage

Define a message map

typescript
import { MeshService, MeshMessageMap } from '@zyno-io/dk-server-foundation';

type MyMessages = {
    getStatus: { request: { verbose: boolean }; response: { status: string; uptime: number } };
    invalidateCache: { request: { keys: string[] }; response: { cleared: number } };
};

Create and start a node

typescript
const mesh = new MeshService<MyMessages>('my-app');

mesh.registerHandler('getStatus', async data => {
    return { status: 'ok', uptime: process.uptime() };
});

mesh.registerHandler('invalidateCache', async data => {
    const cleared = await cache.delete(data.keys);
    return { cleared };
});

await mesh.start();
console.log(`Node started with instance ID: ${mesh.instanceId}`);

Invoke a handler on another node

typescript
// Call a specific node by its instance ID
const result = await mesh.invoke(targetInstanceId, 'getStatus', { verbose: true });
console.log(result.status); // fully typed

// Calling your own instance ID routes directly to the local handler (no pub/sub)
const local = await mesh.invoke(mesh.instanceId, 'getStatus', { verbose: false });

List nodes in the mesh

typescript
const nodes = await mesh.getNodes();
for (const node of nodes) {
    console.log(`${node.instanceId} @ ${node.hostname}${node.self ? ' (self)' : ''}`);
}
// 1 @ web-server-01
// 2 @ web-server-02 (self)
// 3 @ web-server-03

Track node departures

typescript
mesh.setNodeCleanedUpCallback(async instanceId => {
    console.log(`Node ${instanceId} left the mesh`);
    // Clean up resources associated with that node
});

Graceful shutdown

typescript
await mesh.stop();

API

new MeshService<T extends MeshMessageMap>(key: string, options?: MeshServiceOptions)

Creates a new mesh node.

  • key -- Logical mesh name. All nodes using the same key form one mesh. Different keys are fully independent.
  • options -- Optional tuning parameters (see below).

MeshServiceOptions

OptionTypeDefaultDescription
heartbeatIntervalMsnumber5000How often this node refreshes its heartbeat in the registry.
nodeTtlMsnumber15000How long a node can go without a heartbeat before the leader removes it.
requestTimeoutMsnumber10000Default timeout for remote invocations. Reset on each heartbeat from the handler, so long-running handlers won't time out.
leaderOptionsLeaderServiceOptionsOptions passed to the internal LeaderService used for cleanup leader election.

Properties

PropertyTypeDescription
instanceIdnumberUnique integer ID assigned to this node on start(). 0 before start.

Methods

registerHandler<K>(type: K, handler: (data: T[K]['request']) => T[K]['response'] | Promise<T[K]['response']>): void

Register a handler for a message type. Handlers can be registered before or after start(). Registering a handler for a type that already has one replaces it.

invoke<K>(instanceId: number, type: K, data: T[K]['request']): Promise<T[K]['response']>

Send a typed request to a specific node and wait for the response.

  • If instanceId matches the local node, the handler is called directly (no pub/sub).
  • If the target node doesn't exist or doesn't respond, the promise rejects with MeshRequestTimeoutError.
  • If the target has no handler for the type, rejects with MeshNoHandlerError.
  • If the handler throws, rejects with MeshHandlerError containing the error message.

getNodes(): Promise<MeshNode[]>

Returns all live nodes in the mesh. Each MeshNode contains:

typescript
interface MeshNode {
    instanceId: number; // The node's unique integer ID
    hostname: string; // The OS hostname of the machine running the node
    self: boolean; // true if this is the calling node
}

Only nodes with active heartbeats are returned. Stopped or expired nodes are excluded.

setNodeCleanedUpCallback(cb: (instanceId: number) => void | Promise<void>): void

Register a callback invoked when the leader detects and removes an expired node. Only fires on the current leader instance. Errors in the callback are logged but don't affect cleanup of other nodes.

start(): Promise<void>

Join the mesh. Acquires a unique instance ID, subscribes to its pub/sub channel, registers in the heartbeat set, and starts leader election. Throws if already running.

stop(): Promise<void>

Leave the mesh. Stops heartbeats, rejects all pending outbound requests with Error('MeshService stopped'), unsubscribes from pub/sub, and removes itself from the heartbeat registry. Safe to call before start() or multiple times.

Error Classes

MeshRequestTimeoutError

The target node did not respond within the timeout period. This can happen if the target has crashed, is unreachable, or the Redis pub/sub connection is disrupted.

MeshHandlerError

The target node's handler threw an error. The message property contains the original error message from the remote handler.

MeshNoHandlerError

No handler is registered for the requested message type -- either locally (direct invocation) or on the remote node.

Architecture

Node Registry

Nodes are tracked in a Redis sorted set ({prefix}:mesh:{key}:heartbeats) where the score is the last heartbeat timestamp (from Redis server time, avoiding clock skew) and the member is the instance ID. Node metadata (hostname) is stored in a Redis hash ({prefix}:mesh:{key}:nodes).

Unique instance IDs are assigned via INCR on {prefix}:mesh:{key}:next_id. Both the heartbeat entry and the nodes hash entry are removed on graceful stop or leader-driven cleanup of expired nodes.

Messaging

Each node subscribes to its own pub/sub channel: {prefix}:mesh:{key}:node:{instanceId}.

Three message types flow over these channels:

TypeDirectionPurpose
RequestCaller -> Handler{ requestId, senderInstanceId, type, data, timeoutMs }
ResponseHandler -> Caller{ requestId, reply: true, data?, error? }
HeartbeatHandler -> Caller{ requestId, heartbeat: true }

Request Heartbeats

While a handler is executing, the handling node sends periodic heartbeat messages back to the caller (every requestTimeoutMs * 0.75). The caller resets its timeout timer on each heartbeat. This allows short absolute timeouts while supporting arbitrarily long handler execution.

The handler uses the caller's requestTimeoutMs (sent in the request message) for its heartbeat interval, so mixed-timeout configurations work correctly.

Leader Election

Each mesh uses an internal LeaderService (key: mesh:{key}) for leader election. The leader is responsible for:

  1. Running the cleanup Lua script on each heartbeat cycle
  2. Removing expired nodes from the sorted set
  3. Invoking the nodeCleanedUpCallback for each removed node

Only one node (the leader) performs cleanup at any time.

Local Invocation

When invoke is called with the node's own instance ID, the handler is called directly as a function call -- no serialization, no pub/sub, no timeout. Errors propagate as-is.

Configuration

The Redis connection is configured via environment variables with the MESH_REDIS_ prefix (falls back to REDIS_):

VariableDescription
MESH_REDIS_HOSTRedis host
MESH_REDIS_PORTRedis port
MESH_REDIS_PREFIXKey prefix (falls back to REDIS_PREFIX, then package name)
MESH_REDIS_SENTINEL_HOSTSentinel host (optional)
MESH_REDIS_SENTINEL_PORTSentinel port (optional)
MESH_REDIS_SENTINEL_NAMESentinel master name (optional)

Released under the MIT License.