### Install River and Dependencies Source: https://github.com/replit/river/blob/main/README.md This command installs the River library and its dependency, @sinclair/typebox, using npm. These packages are essential for utilizing River's features for remote procedure calls. ```bash npm i @replit/river @sinclair/typebox ``` -------------------------------- ### Implement Replit River Client Source: https://github.com/replit/river/blob/main/README.md Demonstrates how to create a Replit River client that connects to a WebSocket server. It shows how to establish a connection, make an RPC call to the 'add' procedure of the 'example' service, and handle the response, including type-safe access to the result. ```typescript import { WebSocketClientTransport } from '@replit/river/transport/ws/client'; import { createClient } from '@replit/river'; import { WebSocket } from 'ws'; const transport = new WebSocketClientTransport( async () => new WebSocket('ws://localhost:3000'), 'my-client-id', ); const client = createClient( transport, 'SERVER', // transport id of the server in the previous step { eagerlyConnect: true }, // whether to eagerly connect to the server on creation (optional argument) ); // we get full type safety on `client` // client...() // e.g. const result = await client.example.add.rpc({ n: 3 }); if (result.ok) { const msg = result.payload; console.log(msg.result); // 0 + 3 = 3 } ``` -------------------------------- ### TypeScript: Extend ParsedMetadata for Custom Handshake Source: https://github.com/replit/river/blob/main/README.md Demonstrates how to extend the ParsedMetadata interface in TypeScript to include custom data like 'userId' for connection handshakes. This allows for richer validation and data transfer during the initial connection setup. ```typescript declare module '@replit/river' { interface ParsedMetadata { userId: number; } } ``` -------------------------------- ### TypeScript: Access Parsed Metadata in Handler Source: https://github.com/replit/river/blob/main/README.md Provides an example of how to access the parsed metadata, including custom fields like 'userId', within a River procedure handler. This demonstrates how to utilize the validated connection information passed during the handshake. ```typescript async handler(ctx, ...args) { // this contains the parsed metadata console.log(ctx.metadata) } ``` -------------------------------- ### Add Logging to Replit River Transport Source: https://github.com/replit/river/blob/main/README.md Shows how to bind a logger function to a Replit River transport to enable logging of events. It provides examples using a console logger and a colored string logger, and mentions the possibility of defining custom logging functions. ```typescript import { coloredStringLogger } from '@replit/river/logging'; const transport = new WebSocketClientTransport( async () => new WebSocket('ws://localhost:3000'), 'my-client-id', ); transport.bindLogger(console.log); // or transport.bindLogger(coloredStringLogger); ``` -------------------------------- ### TypeScript Control Flags Bit Field Source: https://github.com/replit/river/blob/main/PROTOCOL.md Details the bit field values for the 'controlFlags' property in TransportMessage, used for signaling stream events like start, cancellation, closure, and acknowledgements. ```TypeScript // The `AckBit` (`0b00001`) MUST only be set in the case of an explicit heartbeat that _only_ contains ack/seq information and no application-level payload (i.e., only used to update transport level bookkeeping). // The `StreamOpenBit` (`0b00010`) MUST be set for the first message of a new stream. // The `StreamCancelBit` (`0b00100`) MUST be set when a stream is to be abruptly closed due to cancellations or an internal error condition. // The `StreamClosedBit` (`0b01000`) MUST be set for the last message of a stream. ``` -------------------------------- ### Define ExampleService with RPC Source: https://github.com/replit/river/blob/main/README.md Defines a service named 'ExampleService' using ServiceSchema. It includes an initializer for shared state (a counter) and an 'add' RPC procedure that increments the counter and returns the new value. The procedure specifies request and response types using TypeBox. ```typescript import { ServiceSchema, Procedure, Ok } from '@replit/river'; import { Type } from '@sinclair/typebox'; export const ExampleService = ServiceSchema.define( // configuration { // initializer for shared state initializeState: () => ({ count: 0 }), }, // procedures { add: Procedure.rpc({ requestInit: Type.Object({ n: Type.Number() }), responseData: Type.Object({ result: Type.Number() }), requestErrors: Type.Never(), // note that a handler is unique per user RPC async handler({ ctx, reqInit: { n } }) { // access and mutate shared state ctx.state.count += n; return Ok({ result: ctx.state.count }); }, }), }, ); ``` -------------------------------- ### Create Replit River Server Source: https://github.com/replit/river/blob/main/README.md Sets up an HTTP server and a WebSocket server using Node.js's 'http' and 'ws' modules. It then creates a Replit River server instance, integrating the 'ExampleService' with a WebSocket transport. ```typescript import http from 'http'; import { WebSocketServer } from 'ws'; import { WebSocketServerTransport } from '@replit/river/transport/ws/server'; import { createServer } from '@replit/river'; // start websocket server on port 3000 const httpServer = http.createServer(); const port = 3000; const wss = new WebSocketServer({ server: httpServer }); const transport = new WebSocketServerTransport(wss, 'SERVER'); export const server = createServer(transport, { example: ExampleService, }); export type ServiceSurface = typeof server; httpServer.listen(port); ``` -------------------------------- ### State Transitions Diagram Source: https://github.com/replit/river/blob/main/PROTOCOL.md Visual representation of the state transitions for client and server connections, illustrating the flow from initial connection attempts to session destruction. ```plaintext 1. SessionNoConnection ◄──┐ │ reconnect / connect attempt │ ▼ │ 2. SessionConnecting │ │ connect success ──────────────┤ connect failure ▼ │ 3. SessionHandshaking │ │ handshake success ┌──────╪─ connection drop 5. WaitingForHandshake │ handshake failure ─────┤ │ │ handshake success ▼ │ │ connection drop ├───────────────────────► 4. SessionConnected │ │ heartbeat misses │ │ invalid message ───────╫──────┘ │ ▼ │ └───────────────────────► x. Destroy Session ◄─────┘ handshake failure ``` -------------------------------- ### Subscription Close Scenarios Source: https://github.com/replit/river/blob/main/PROTOCOL.md Illustrates different scenarios for closing a subscription stream, including client-initiated close, server-initiated close, and protocol errors leading to an abrupt close. ```text client: > { server: - -- - -- { ``` ```text client: > { server: - -- - { ``` ```text client: > (any further messages are ignored) server: - -- ! ``` -------------------------------- ### TypeScript: Create Client with Custom Handshake Options Source: https://github.com/replit/river/blob/main/README.md Shows how to configure a River client with custom handshake options, including schema validation and a token provider function. This enables secure and validated client connections by passing specific handshake logic during client creation. ```typescript const schema = Type.Object({ token: Type.String() }); createClient(new MockClientTransport('client'), 'SERVER', { eagerlyConnect: false, handshakeOptions: createClientHandshakeOptions(schema, async () => ({ // the type of this function is // () => Static | Promise> token: '123', })), }); ``` -------------------------------- ### Configure tsconfig.json for River Source: https://github.com/replit/river/blob/main/README.md This snippet shows the necessary configuration for the tsconfig.json file to ensure compatibility with River. It highlights the required moduleResolution, strictFunctionTypes, and strictNullChecks options for proper type checking and service definition. ```json { "compilerOptions": { "moduleResolution": "bundler", "strict": true // Other compiler options... } } ``` -------------------------------- ### Stream Lifecycle: Server Initiated Close Source: https://github.com/replit/river/blob/main/PROTOCOL.md Shows a stream procedure where the server initiates the close. '>' is an Init message with StreamOpenBit, '--' represents data messages, and '{' signifies a ControlClose message from the server. The client responds with its own closing sequence. ```text client: > -- -- -- { server: - -- { ``` -------------------------------- ### Stream Lifecycle: Client Initiated Close Source: https://github.com/replit/river/blob/main/PROTOCOL.md Depicts a stream procedure where the client initiates the close. '>' is an Init message with StreamOpenBit, '--' represents data messages, and '{' signifies a ControlClose message from the client. The server responds with its own closing sequence. ```text client: > -- - { server: - -- - -- { ``` -------------------------------- ### Upload Lifecycle: Protocol Error (Abrupt Close) Source: https://github.com/replit/river/blob/main/PROTOCOL.md Shows a protocol error during an upload procedure, leading to an abrupt close. '>' is an Init message with StreamOpenBit, '--' represents data messages, and '!' indicates a protocol error from the server. Further client messages are ignored. ```text client: > -- - (any further messages are ignored) server: ! ``` -------------------------------- ### TypeScript: Create Server with Custom Handshake Options Source: https://github.com/replit/river/blob/main/README.md Illustrates how to set up a River server with custom handshake options, including schema validation and a handshake validation function. This allows the server to validate incoming connection metadata based on a defined schema and custom logic, returning either false or the parsed metadata. ```typescript createServer(new MockServerTransport('SERVER'), services, { handshakeOptions: createServerHandshakeOptions( schema, (metadata, previousMetadata) => { // the type of this function is // (metadata: Static, previousMetadata?: ParsedMetadata) => // | false | Promise (if you reject it) // | ParsedMetadata | Promise (if you allow it) // next time a connection happens on the same session, previousMetadata will // be populated with the last returned value }, ), }); ``` -------------------------------- ### Upload Lifecycle: Client Finalizes with Protocol Error Source: https://github.com/replit/river/blob/main/PROTOCOL.md Illustrates an upload procedure where the client attempts to finalize, but a protocol error occurs. '>' is an Init message with StreamOpenBit, '--' represents data messages, '{' signifies a ControlClose message from the client, and '!' indicates a protocol error from the server. ```text client: > -- - { server: ! ``` -------------------------------- ### Stream Lifecycle: Protocol Error (Abrupt Close) Source: https://github.com/replit/river/blob/main/PROTOCOL.md Illustrates a protocol error during a stream procedure, resulting in an abrupt close. '>' is an Init message with StreamOpenBit, '--' represents data messages, and '!' indicates a protocol error from the server. Further client messages are ignored. ```text client: > -- - (any further messages are ignored) server: - -- - ! ``` -------------------------------- ### Upload Lifecycle: Client Finalizes Upload Source: https://github.com/replit/river/blob/main/PROTOCOL.md Details the upload procedure where the client finalizes the stream. '>' is an Init message with StreamOpenBit, '--' represents data messages, and '{' signifies a ControlClose message from the client. The server responds with a Result message containing StreamClosedBit ('<'). ```text client: > -- - { server: < ``` -------------------------------- ### Handle Replit River Connection Status Source: https://github.com/replit/river/blob/main/README.md Explains how to listen for session status and transition events on a Replit River transport to manage application state during connection changes. It covers handling 'created', 'closing', and 'closed' statuses, as well as specific session states like 'Connected' and 'NoConnection'. ```typescript transport.addEventListener('sessionStatus', (evt) => { if (evt.status === 'created') { // do something } else if (evt.status === 'closing') { // do other things } else if (evt.status === 'closed') { // note that evt.session only has id + to // this is useful for doing things like creating a new session if // a session just got yanked } }); // or, listen for specific session states transport.addEventListener('sessionTransition', (evt) => { if (evt.state === SessionState.Connected) { // switch on various transition states } else if (evt.state === SessionState.NoConnection) { // do something } }); ``` -------------------------------- ### TypeScript Codec for Message Encoding/Decoding Source: https://github.com/replit/river/blob/main/PROTOCOL.md Explains the Codec class in the TypeScript implementation used for encoding and decoding TransportMessages. It details two main codecs: NaiveCodec (JSON-based) and BinaryCodec (msgpack-based), and discusses message framing requirements based on transport protocols. ```TypeScript class Codec { // ... encoding and decoding logic ... } class NaiveCodec extends Codec { // Uses JSON.stringify and JSON.parse } class BinaryCodec extends Codec { // Uses msgpack format } ``` -------------------------------- ### RPC Lifecycle: Normal and Protocol Error Source: https://github.com/replit/river/blob/main/PROTOCOL.md Illustrates the message exchange for an RPC procedure. The 'x' signifies an Init message with StreamOpenBit and StreamClosedBit set from the client, and '<' represents a response with StreamClosedBit from the server for a normal operation. '!' indicates a protocol error with a StreamCancelBit. ```text client: x server: < ``` ```text client: x server: ! ``` -------------------------------- ### TypeScript: Control Payloads Source: https://github.com/replit/river/blob/main/PROTOCOL.md Defines the control payloads used in the Replit River protocol for managing communication streams. This includes messages for closing streams, acknowledging heartbeats, and initiating/responding to handshakes with version and session state information. ```TypeScript interface ControlClose { type: 'CLOSE'; } interface ControlAck { type: 'ACK'; } interface ControlHandshakeRequest { type: 'HANDSHAKE_REQ'; protocolVersion: 'v0' | 'v1' | 'v1.1' | 'v2.0'; sessionId: string; expectedSessionState: { nextExpectedSeq: number; // integer nextSentSeq: number; // integer }; metadata?: unknown; } interface ControlHandshakeResponse { type: 'HANDSHAKE_RESP'; status: | { ok: true; sessionId: string; } | { ok: false; reason: string; code: // retriable | 'SESSION_STATE_MISMATCH' // fatal | 'MALFORMED_HANDSHAKE_META' | 'MALFORMED_HANDSHAKE' | 'PROTOCOL_VERSION_MISMATCH' | 'REJECTED_BY_CUSTOM_HANDLER'; }; } type Control = | ControlClose | ControlAck | ControlHandshakeRequest | ControlHandshakeResponse; ``` -------------------------------- ### TypeScript BaseError Interface Source: https://github.com/replit/river/blob/main/PROTOCOL.md Defines the structure for error objects in River RPC, requiring a 'code' string for differentiation, a 'message' string, and an optional 'extra' field for metadata. ```TypeScript interface BaseError { // This should be a defined literal to make sure errors are easily differentiated code: string; // This can be any string message: string; // Any extra metadata extra?: any; } ``` -------------------------------- ### TypeScript TransportMessage Interface Source: https://github.com/replit/river/blob/main/PROTOCOL.md Defines the structure for all messages exchanged between client and server in River. It includes routing information, payload, stream identification, and control flags for managing message flow. ```TypeScript interface TransportMessage { // unique id id: string; // each client/server has its own unique id // who we are from: string; // who we are sending to to: string; // the service and procedure to route to serviceName?: string; procedureName?: string; // the actual payload // - `Init` or `Request` in the client to server direction // - `Result` in the server to client direction // - `Control` in either direction payload: Payload; // unique id for each specific instantiation of an RPC call // a stream of TransportMessage is grouped by streamId streamId: string; // special flags // we will cover this later controlFlags: number; // used primarily for retransmission and deduplication // we will cover this later seq: number; ack: number; } ``` -------------------------------- ### TypeScript: Protocol Error Payloads Source: https://github.com/replit/river/blob/main/PROTOCOL.md Defines various error payloads for the Replit River protocol, including invalid requests, uncaught exceptions, and cancellation errors. These errors are typically sent from server to client and are wrapped within a Result and TransportMessage with a StreamCancelBit. ```TypeScript interface InvalidRequestError extends BaseError { code: 'INVALID_REQUEST'; message: string; } interface UncaughtError extends BaseError { code: 'UNCAUGHT_ERROR'; message: string; } interface CancelError extends BaseError { code: 'CANCEL'; message: string; } type ProtocolError = UncaughtError | InvalidRequestError | CancelError; ``` -------------------------------- ### TypeScript Result Type Source: https://github.com/replit/river/blob/main/PROTOCOL.md Represents the outcome of an operation, either a successful payload or an error conforming to BaseError. It uses a discriminated union with 'ok' boolean and 'payload' fields. ```TypeScript type Result = | { ok: true; payload: SuccessPayload } | { ok: false; payload: ErrorPayload }; ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.