Skip to content

SRPC (Simple RPC)

Bidirectional RPC over WebSocket with HMAC authentication, ts-proto code generation, multiplexed binary streams, and OpenTelemetry tracing.

Overview

SRPC uses a prefix-based message routing system. Messages are defined in .proto files and compiled to TypeScript with dksf-gen-proto. Prefixes determine direction:

  • u* -- Upstream: client -> server requests
  • d* -- Downstream: server -> client requests

Proto Generation

Define messages in a .proto file:

protobuf
syntax = "proto3";

message ClientMessage {
    string requestId = 1;
    bytes reply = 2;
    string error = 3;
    bytes trace = 4;
    bytes pingPong = 5;
    bytes byteStreamOperation = 6;

    // Upstream (client -> server)
    bytes uEcho = 100;
    bytes uGetUser = 101;

    // Downstream (server -> client)
    bytes dNotify = 200;
}

message ServerMessage {
    string requestId = 1;
    bytes reply = 2;
    string error = 3;
    bytes trace = 4;
    bytes pingPong = 5;
    bytes byteStreamOperation = 6;

    bytes uEcho = 100;
    bytes uGetUser = 101;
    bytes dNotify = 200;
}

// Request/response types
message UEchoRequest { string message = 1; }
message UEchoResponse { string message = 1; }
message UGetUserRequest { int32 id = 1; }
message UGetUserResponse { string name = 1; string email = 2; }
message DNotifyRequest { string event = 1; }
message DNotifyResponse { bool acknowledged = 1; }

Generate TypeScript:

bash
dksf-gen-proto my-service.proto src/generated/proto/my-service

Options:

FlagDescription
--use-dateUse Date instead of Timestamp
--use-map-typeUse Map instead of plain objects
--only-typesGenerate only type definitions

Server

typescript
import { SrpcServer } from '@zyno-io/dk-server-foundation';
import { ClientMessage, ServerMessage } from './generated/proto/my-service';

const server = new SrpcServer({
    logger: myLogger,
    clientMessage: ClientMessage,
    serverMessage: ServerMessage,
    wsPath: '/rpc'
});

// Handle new connections
server.registerConnectionHandler(async stream => {
    console.log(`Client connected: ${stream.clientId}`);
    stream.meta = { userId: stream.clientId };
});

// Handle upstream messages (client -> server)
server.registerMessageHandler('uEcho', async (stream, data) => {
    return { message: data.message };
});

server.registerMessageHandler('uGetUser', async (stream, data) => {
    const user = await db.query(User).filter({ id: data.id }).findOne();
    return { name: user.name, email: user.email };
});

// Handle disconnections
server.registerDisconnectHandler(async (stream, cause) => {
    console.log(`Client ${stream.clientId} disconnected: ${cause}`);
});

Invoking Client Methods (Server -> Client)

typescript
// Send to a specific client
const stream = server.streamsById.get(streamId);
const result = await server.invoke(stream, 'dNotify', { event: 'orderUpdated' });

// Create a reusable invoke function
const invoke = SrpcServer.createInvoke(() => server);
await invoke(stream, 'dNotify', { event: 'orderUpdated' }, 5000);

ISrpcServerOptions

OptionTypeDescription
loggerScopedLoggerLogger instance
clientMessageSrpcMessageFnsts-proto generated client message type
serverMessageSrpcMessageFnsts-proto generated server message type
wsPathstringWebSocket path (e.g., /rpc)
debugbooleanEnable debug logging

Server Properties

PropertyTypeDescription
streamsByIdMap<string, SrpcStream>All active streams by stream ID
streamsByClientIdMap<string, SrpcStream>Active streams by client ID

Authentication

Default authentication uses HMAC signatures with clock drift tolerance:

typescript
// Override authorization logic
server.setClientAuthorizer(async (clientId, signature, timestamp) => {
    // Custom auth logic
    return isValid;
});

// Provide per-client secrets
server.setClientKeyFetcher(async clientId => {
    return await getClientSecret(clientId);
});

Configure via SRPC_AUTH_SECRET and SRPC_AUTH_CLOCK_DRIFT_MS (default: 30 seconds).

Client

typescript
import { SrpcClient } from '@zyno-io/dk-server-foundation';
import { ClientMessage, ServerMessage } from './generated/proto/my-service';

const client = new SrpcClient(
    logger,
    'wss://api.example.com/rpc',
    ClientMessage,
    ServerMessage,
    'client-id-123',
    { role: 'worker' }, // Optional metadata
    'shared-secret', // Optional auth secret
    { enableReconnect: true } // Options
);

// connect() returns a promise that resolves on successful handshake
await client.connect();

// Or fire-and-forget (registers handlers first, connects in background)
client.registerConnectionHandler(() => { /* connected */ });
client.registerDisconnectHandler(cause => {
    // cause: 'disconnect' | 'supersede' | 'conflict' | 'timeout' | 'badArg'
    console.log('Disconnected:', cause);
});
client.connect(); // unhandled promise is fine here

// Handle downstream messages (server -> client)
client.registerMessageHandler('dNotify', async data => {
    console.log(`Event: ${data.event}`);
    return { acknowledged: true };
});

// Invoke upstream messages (client -> server)
const result = await client.invoke('uEcho', { message: 'hello' });
console.log(result.message); // 'hello'

// Check connection status
if (client.isConnected) {
    /* ... */
}

// Disconnect
client.disconnect();

SrpcClientOptions

OptionTypeDefaultDescription
enableReconnectbooleantrueAuto-reconnect on disconnect
protocolVersionnumber2SRPC protocol version. v2 rejects connections on client ID collision unless supersede is requested

connect(options?): Promise<void>

Returns a promise that resolves on successful handshake, or rejects on failure (conflict, auth, timeout). Can also be called fire-and-forget.

OptionTypeDefaultDescription
supersedebooleanfalseWhen true, kicks the existing connection with the same client ID

With protocol v2, calling connect() when the client ID is already connected on the server will reject with SrpcConflictError (no auto-reconnect). Use connect({ supersede: true }) to explicitly take over.

typescript
import { SrpcConflictError } from '@zyno-io/dk-server-foundation';

try {
    await client.connect();
} catch (err) {
    if (err instanceof SrpcConflictError) {
        // Client ID already connected — prompt user or supersede
        await client.connect({ supersede: true });
    }
}

The disconnect handler also receives the cause:

typescript
client.registerDisconnectHandler(cause => {
    if (cause === 'supersede') {
        // Another connection superseded this one
    }
});

Binary Streams

SrpcByteStream provides multiplexed binary streaming over existing SRPC connections. Streams are Duplex node streams with backpressure support.

typescript
// Sender side
const sender = SrpcByteStream.createSender(stream);
sender.write(buffer);
sender.end();

// Receiver side (via handler)
const receiver = SrpcByteStream.createReceiver(stream, streamId);
receiver.on('data', chunk => {
    /* process binary data */
});
receiver.on('end', () => {
    /* stream finished */
});

Constants

ConstantValueDescription
HIGH_WATER_MARK256 KBWebSocket buffer threshold for backpressure
PENDING_RECEIVER_MAX_BYTES2 MBMax buffer before receiver is created
PENDING_RECEIVER_TTL_MS5 secondsTimeout for pending receiver data

Protocol Versions

SRPC supports protocol versioning via the _v query parameter. The server default is v1 (for backwards compatibility with clients that don't send _v). The SrpcClient default is v2.

v1: When a new connection arrives with the same client ID as an existing connection, the existing connection is automatically kicked (supersede cause) and the new connection proceeds. This is the server default when _v is not sent (for backwards compatibility with older clients).

v2 (client default): When a collision occurs, the new connection is rejected with a conflict close and the existing connection is left undisturbed. To explicitly supersede the existing connection, call connect({ supersede: true }).

To opt into v1 behavior on the client:

typescript
const client = new SrpcClient(logger, url, ClientMessage, ServerMessage, 'my-id', undefined, secret, {
    protocolVersion: 1
});

Disconnect Causes

typescript
type SrpcDisconnectCause = 'disconnect' | 'supersede' | 'conflict' | 'timeout' | 'badArg';
CauseDescription
disconnectNormal disconnection
supersedeAnother connection superseded this one (same client ID)
conflictConnection rejected because client ID is already connected (v2)
timeoutHeartbeat timeout
badArgInvalid connection arguments

Error Handling

typescript
import { SrpcError } from '@zyno-io/dk-server-foundation';

// Throw user-facing errors in handlers
throw new SrpcError('Invalid input', true); // isUserError: true

// Non-user errors are logged but return generic message to client
throw new SrpcError('Internal failure', false);

OpenTelemetry Integration

SRPC automatically propagates trace context between client and server. Spans are created for each RPC call with the message prefix as the span name.

Released under the MIT License.