# RPC Dart RPC Dart is a transport-independent Remote Procedure Call (RPC) framework written entirely in Dart. It enables you to write RPC services once and run them on any platform—mobile, web, desktop, or server—without coupling your business logic to a particular transport or serialization strategy. The framework supports all standard RPC communication patterns: unary calls, server streaming, client streaming, and bidirectional streaming. The library features zero-copy data transfer with InMemory transport for maximum performance, full type safety, cooperative cancellation with deadlines, and built-in health monitoring. It integrates seamlessly with existing gRPC infrastructure while remaining pure Dart with no external dependencies. ## Creating a Contract Contracts define the service interface with method names and signatures, serving as the single source of truth for both client and server implementations. ```dart // calculator_contract.dart abstract interface class ICalculatorContract { static const name = 'Calculator'; static const methodAdd = 'add'; static const methodSubtract = 'subtract'; static const methodMultiply = 'multiply'; static const methodDivide = 'divide'; } class MathRequest { final double a; final double b; MathRequest(this.a, this.b); } class MathResponse { final double result; MathResponse(this.result); } ``` ## Implementing a Responder (Server) The responder handles incoming RPC calls by registering method handlers that process requests and return responses. ```dart import 'package:rpc_dart/rpc_dart.dart'; class CalculatorResponder extends RpcResponderContract { CalculatorResponder() : super(ICalculatorContract.name); @override void setup() { addUnaryMethod( methodName: ICalculatorContract.methodAdd, handler: _add, ); addUnaryMethod( methodName: ICalculatorContract.methodDivide, handler: _divide, ); } Future _add(MathRequest request, {RpcContext? context}) async { return MathResponse(request.a + request.b); } Future _divide(MathRequest request, {RpcContext? context}) async { if (request.b == 0) { throw RpcException( code: 'DIVISION_BY_ZERO', message: 'Cannot divide by zero', details: {'operand': 'b', 'value': request.b}, ); } return MathResponse(request.a / request.b); } } ``` ## Implementing a Caller (Client) The caller provides a typed interface for making RPC calls to remote services through an endpoint. ```dart import 'package:rpc_dart/rpc_dart.dart'; class CalculatorCaller extends RpcCallerContract { CalculatorCaller(RpcCallerEndpoint endpoint) : super(ICalculatorContract.name, endpoint); Future add(MathRequest request) { return callUnary( methodName: ICalculatorContract.methodAdd, request: request, ); } Future divide(MathRequest request) { return callUnary( methodName: ICalculatorContract.methodDivide, request: request, ); } } ``` ## Using InMemory Transport InMemory transport provides zero-copy object transfer for maximum performance within a single process, ideal for testing and high-throughput applications. ```dart import 'package:rpc_dart/rpc_dart.dart'; void main() async { // Create InMemory transport pair (zero-copy, no serialization) final (clientTransport, serverTransport) = RpcInMemoryTransport.pair(); // Setup responder endpoint final responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(CalculatorResponder()); responder.start(); // Setup caller endpoint final caller = RpcCallerEndpoint(transport: clientTransport); final calculator = CalculatorCaller(caller); try { // Perform calculations final sum = await calculator.add(MathRequest(10, 5)); print('10 + 5 = ${sum.result}'); // Output: 10 + 5 = 15.0 final quotient = await calculator.divide(MathRequest(15, 3)); print('15 / 3 = ${quotient.result}'); // Output: 15 / 3 = 5.0 // Test error handling try { await calculator.divide(MathRequest(10, 0)); } on RpcException catch (e) { print('Error: ${e.code} - ${e.message}'); // Error: DIVISION_BY_ZERO - Cannot divide by zero } } finally { await caller.close(); await responder.close(); } } ``` ## Using HTTP/2 Transport HTTP/2 transport provides gRPC-compatible communication for distributed services with multiplexed streams over a single TCP connection. ```dart import 'package:rpc_dart/rpc_dart.dart'; import 'package:rpc_dart_transports/rpc_dart_transports.dart'; // Server setup Future startServer() async { final server = RpcHttp2Server.createWithContracts( port: 8765, host: '0.0.0.0', logger: RpcLogger('Http2Server'), contracts: [ DemoServiceContract(), ], ); await server.start(); } // Client setup Future main() async { // Connect with TLS final transport = await RpcHttp2CallerTransport.secureConnect( host: 'api.example.com', port: 443, logger: RpcLogger('Http2Client'), ); final endpoint = RpcCallerEndpoint( transport: transport, debugLabel: 'demo-http2-client', ); // Make unary request final response = await endpoint.unaryRequest( serviceName: 'DemoService', methodName: 'Echo', requestCodec: RpcString.codec, responseCodec: RpcString.codec, request: RpcString('Hello over HTTP/2!'), ); print('Server replied: ${response.value}'); await transport.close(); } ``` ## Using WebSocket Transport WebSocket transport maintains a persistent connection for real-time bidirectional communication, ideal for dashboards and collaborative applications. ```dart import 'package:rpc_dart/rpc_dart.dart'; import 'package:rpc_dart_transports/rpc_dart_transports.dart'; Future main() async { final transport = RpcWebSocketCallerTransport.connect( Uri.parse('wss://api.example.com/rpc'), protocols: const ['rpc-dart'], logger: RpcLogger('WebSocketClient'), ); final caller = RpcCallerEndpoint(transport: transport); final chat = ChatCaller(caller); // Subscribe to a chat room (server streaming) final stream = chat.joinRoom('general', 'user-42'); stream.listen( (message) => print('[${message.senderId}]: ${message.text}'), onError: (e) => print('Stream error: $e'), onDone: () => print('Stream closed'), ); } ``` ## Server Streaming Server streaming allows the server to send multiple responses to a single client request, useful for live updates and notifications. ```dart // Responder implementation class ChatContract extends RpcResponderContract { ChatContract() : super('Chat'); @override void setup() { addServerStreamMethod( methodName: 'JoinRoom', handler: _joinRoom, ); } Stream _joinRoom(JoinRequest request, {RpcContext? context}) async* { // Emit messages as they arrive for (var i = 0; i < 10; i++) { await Future.delayed(const Duration(milliseconds: 500)); yield ChatMessage( roomId: request.roomId, senderId: 'system', text: 'Update #$i for ${request.userId}', ); } } } // Caller implementation class ChatCaller extends RpcCallerContract { ChatCaller(RpcCallerEndpoint endpoint) : super('Chat', endpoint); Stream joinRoom(String roomId, String userId) { return callServerStream( methodName: 'JoinRoom', request: JoinRequest(roomId, userId), ); } } // Usage final chat = ChatCaller(caller); await for (final message in chat.joinRoom('general', 'u-42')) { print('[${message.roomId}] ${message.text}'); } ``` ## Client Streaming Client streaming allows the client to send multiple requests while the server responds once after processing all input. ```dart // Responder implementation class DataContract extends RpcResponderContract { DataContract() : super('Data'); @override void setup() { addClientStreamMethod( methodName: 'SumNumbers', handler: _sumNumbers, ); } Future _sumNumbers(Stream numbers, {RpcContext? context}) async { double sum = 0; int count = 0; await for (final numberData in numbers) { context?.cancellationToken?.throwIfCancelled(); sum += numberData.value; count++; } return SumResult(total: sum, count: count); } } // Caller implementation class DataCaller extends RpcCallerContract { DataCaller(RpcCallerEndpoint endpoint) : super('Data', endpoint); Future sumNumbers(Stream numbers) { return callClientStream( methodName: 'SumNumbers', requests: numbers, ); } } // Usage final dataService = DataCaller(caller); final numbers = Stream.fromIterable([ NumberData(1), NumberData(2), NumberData(3), NumberData(4), NumberData(5), ]); final result = await dataService.sumNumbers(numbers); print('Sum: ${result.total}, Count: ${result.count}'); // Sum: 15, Count: 5 ``` ## Bidirectional Streaming Bidirectional streaming enables full-duplex communication where both client and server can send multiple messages concurrently. ```dart // Responder implementation class EchoContract extends RpcResponderContract { EchoContract() : super('Echo'); @override void setup() { addBidirectionalMethod( methodName: 'Chat', handler: _chat, requestCodec: RpcString.codec, responseCodec: RpcString.codec, ); } Stream _chat(Stream requests, {RpcContext? context}) async* { await for (final message in requests) { yield RpcString('echo: ${message.value}'); } } } // Usage with HTTP/2 transport final responses = endpoint.bidirectionalStream( serviceName: 'Echo', methodName: 'Chat', requestCodec: RpcString.codec, responseCodec: RpcString.codec, requests: Stream.periodic( const Duration(milliseconds: 250), (i) => RpcString('message #$i'), ).take(5), ); await for (final reply in responses) { print('Reply: ${reply.value}'); } ``` ## Using RpcContext for Metadata and Deadlines RpcContext carries headers, deadlines, cancellation tokens, and trace identifiers across RPC calls for observability and control. ```dart import 'package:rpc_dart/rpc_dart.dart'; // Creating context with timeout and headers final ctx = RpcContextBuilder() .withGeneratedTraceId() .withHeader('x-user-id', 'user-123') .withHeader('x-locale', 'en-US') .withTimeout(const Duration(seconds: 5)) .build(); // Making call with context final result = await calculator.add( MathRequest(5, 3), context: ctx, ); // Server-side: accessing context metadata Future getUser(GetUserRequest request, {RpcContext? context}) async { final traceId = context?.traceId; final locale = context?.getHeader('x-locale') ?? 'en'; final userId = context?.getHeader('x-user-id'); if (context?.isCancelled ?? false) { throw RpcCancelledException('Client aborted the request'); } if (context?.isExpired == true) { throw RpcDeadlineExceededException(context!.deadline!, Duration.zero); } return userRepository.fetch(request.id, locale: locale, traceId: traceId); } ``` ## Cancellation and Deadlines Cooperative cancellation allows clients to abort long-running operations while responders can check and honor cancellation requests. ```dart // Client-side cancellation final token = RpcCancellationToken(); final ctx = RpcContextBuilder() .withCancellation(token) .withTimeout(const Duration(seconds: 2)) .build(); final future = callerEndpoint.unaryRequest( serviceName: 'MyService', methodName: 'LongOperation', requestCodec: RpcString.codec, responseCodec: RpcString.codec, request: const RpcString('start'), context: ctx, ); // Cancel after 500ms Future.delayed(const Duration(milliseconds: 500), () { token.cancel('User navigated away'); }); try { await future; } on RpcCancelledException catch (e) { print('Operation cancelled: ${e.message}'); } // Server-side: checking cancellation in handler Future longOperation(RpcString request, {RpcContext? context}) async { for (var i = 0; i < 1000; i++) { // Check for cancellation context?.cancellationToken?.throwIfCancelled(); // Check for deadline if (context?.isExpired == true) { throw RpcDeadlineExceededException(context!.deadline!, Duration.zero); } await Future.delayed(const Duration(milliseconds: 10)); } return const RpcString('done'); } ``` ## Transport Routing RpcTransportRouter multiplexes calls to different transports based on service name, metadata, or custom predicates. ```dart import 'package:rpc_dart/rpc_dart.dart'; import 'package:rpc_dart_transports/rpc_dart_transports.dart'; final router = RpcTransportRouterBuilder.client() .routeCall( calledServiceName: 'UserService', toTransport: httpTransport, priority: 100, ) .routeWhen( toTransport: websocketTransport, whenCondition: (service, method, context) => service == 'NotificationService' && context?.getHeader('x-realtime') == 'true', priority: 200, description: 'Real-time notifications via WebSocket', ) .routeWhen( toTransport: inMemoryTransport, whenCondition: (service, method, context) => context?.getHeader('x-route-service') == 'CacheService', priority: 50, description: 'Internal cache requests stay in-process', ) .build(); final callerEndpoint = RpcCallerEndpoint(transport: router); // UserService goes to HTTP transport final user = await userService.getUser(userId); // NotificationService with x-realtime header goes to WebSocket final ctx = RpcContextBuilder() .withHeader('x-realtime', 'true') .build(); final notifications = notificationService.subscribe(userId, context: ctx); ``` ## StreamDistributor for Fan-Out StreamDistributor manages server-side broadcast to multiple subscribers with automatic cleanup and metrics. ```dart import 'package:rpc_dart/rpc_dart.dart'; final distributor = StreamDistributor(); class NotificationResponder extends RpcResponderContract { NotificationResponder() : super('Notifications'); @override void setup() { addServerStreamMethod( methodName: 'Subscribe', handler: _subscribe, ); } Stream _subscribe(RpcString request, {RpcContext? context}) { final clientId = request.value; return distributor.getOrCreateClientStream(clientId); } void publish(ChatMessage notification) { distributor.publish(notification, metadata: { 'roomId': notification.roomId, 'sender': notification.senderId, }); } void publishToRoom(String roomId, ChatMessage message) { distributor.publishFiltered( message, filter: (clientId, metadata) => metadata['roomId'] == roomId, ); } Map getMetrics() { return distributor.metrics; // total/current streams, total messages, average size } } ``` ## Health Monitoring and Diagnostics RPC endpoints expose aggregated diagnostics combining endpoint state and transport status for observability. ```dart import 'package:rpc_dart/rpc_dart.dart'; // Check endpoint health final report = await callerEndpoint.health(); print('Healthy: ${report.isHealthy}'); print('Transport status: ${report.transportStatus?.message}'); if (!report.isHealthy) { // Attempt reconnection await callerEndpoint.reconnect(); } // Transport-level health check final transportStatus = await transport.health(); if (transportStatus.level == RpcHealthLevel.unhealthy) { print('Transport unhealthy: ${transportStatus.message}'); await transport.reconnect(); } // Router health shows all underlying transports final routerHealth = await router.health(); ``` ## Error Handling RPC Dart provides structured error handling with status codes following gRPC semantics. ```dart import 'package:rpc_dart/rpc_dart.dart'; // Server-side: throwing structured errors Future placeOrder(PlaceOrderRequest request, {RpcContext? context}) async { final inventory = await inventoryClient.reserve(request.items, context: context); if (!inventory.isSuccess) { throw RpcException( code: 'INVENTORY_CHECK_FAILED', message: 'Inventory check failed: ${inventory.reason}', details: {'items': request.items}, ); } try { return await payments.charge(request.paymentMethod); } on PaymentDeclined catch (error) { throw RpcException( code: 'PAYMENT_DECLINED', message: 'Payment declined: ${error.reason}', ); } } // Client-side: handling errors Future safeCall(Future Function() call) async { try { return await call(); } on RpcException catch (e) { print('RPC Error [${e.code}]: ${e.message}'); if (e.details != null) { print('Details: ${e.details}'); } rethrow; } on RpcCancelledException catch (e) { print('Request cancelled: ${e.message}'); rethrow; } on RpcDeadlineExceededException catch (e) { print('Request timed out'); rethrow; } } ``` ## Integration Testing with InMemory Transport InMemory transport makes unit and integration testing fast and isolated without network dependencies. ```dart import 'package:test/test.dart'; import 'package:rpc_dart/rpc_dart.dart'; void main() { group('Calculator Service Integration', () { late RpcResponderEndpoint responder; late RpcCallerEndpoint caller; late CalculatorCaller calculator; setUp(() async { final (clientTransport, serverTransport) = RpcInMemoryTransport.pair(); responder = RpcResponderEndpoint(transport: serverTransport); responder.registerServiceContract(CalculatorResponder()); responder.start(); caller = RpcCallerEndpoint(transport: clientTransport); calculator = CalculatorCaller(caller); }); tearDown(() async { await caller.close(); await responder.close(); }); test('should add numbers correctly', () async { final result = await calculator.add(MathRequest(2, 3)); expect(result.result, equals(5)); }); test('should handle division by zero', () async { expect( () => calculator.divide(MathRequest(10, 0)), throwsA(isA()), ); }); test('streaming emits expected messages', () async { final chat = ChatCaller(caller); final stream = chat.joinRoom('general', 'u-42'); expectLater( stream, emitsThrough(predicate((msg) => msg.roomId == 'general')), ); }); }); } ``` ## Summary RPC Dart excels in building cross-platform distributed systems where the same service contracts work seamlessly across different deployment scenarios. The framework is particularly well-suited for microservices architectures, real-time collaborative applications, and high-performance monolithic apps that need internal service isolation without serialization overhead. Its transport-agnostic design allows starting with InMemory transport during development and testing, then deploying with HTTP/2 for public APIs and WebSocket for real-time features. Key integration patterns include using the contract-first approach to define service interfaces, leveraging RpcContext for cross-cutting concerns like authentication and tracing, employing RpcTransportRouter for hybrid deployments, and utilizing StreamDistributor for scalable pub/sub scenarios. The framework's cooperative cancellation model, structured error handling, and built-in health monitoring make it production-ready while maintaining the simplicity of pure Dart without external dependencies.