# MBC CQRS Serverless Framework The MBC CQRS Serverless Framework is a production-ready framework for building enterprise-grade serverless applications on AWS using the Command Query Responsibility Segregation (CQRS) pattern and Event Sourcing. Built on NestJS, it provides a structured approach for separating read and write operations while integrating seamlessly with AWS services including Lambda, DynamoDB, API Gateway, Cognito, SNS, SQS, Step Functions, and SES. The framework simplifies the development of highly scalable, decoupled systems capable of handling complex business logic and high-volume data processing. It provides core modules for command handling, sequential ID generation, multi-tenant management, asynchronous task execution, data import, and real-time notifications. The local development environment includes Docker-based AWS service emulation, enabling rapid iteration without cloud deployment costs. ## Installation Install the CLI and scaffold a new project. ```bash npm i -g @mbc-cqrs-serverless/cli mbc new my-project cd my-project npm install ``` ## CommandModule Registration Register the CommandModule to enable CQRS command handling with data sync handlers for a specific DynamoDB table. ```typescript import { Module } from '@nestjs/common'; import { CommandModule } from '@mbc-cqrs-serverless/core'; import { CatController } from './cat.controller'; import { CatService } from './cat.service'; import { CatDataSyncRdsHandler } from './handler/cat-rds.handler'; @Module({ imports: [ CommandModule.register({ tableName: 'cat', dataSyncHandlers: [CatDataSyncRdsHandler], skipError: false, disableDefaultHandler: false, }), ], controllers: [CatController], providers: [CatService], }) export class CatModule {} ``` ## CommandService.publishAsync Publish a full command asynchronously. Returns immediately while command processing occurs in the background via DynamoDB streams. ```typescript import { CommandService, generateId, getCommandSource, VERSION_FIRST, KEY_SEPARATOR, getUserContext, IInvoke, } from '@mbc-cqrs-serverless/core'; import { Injectable } from '@nestjs/common'; import { basename } from 'path'; @Injectable() export class CatService { constructor(private readonly commandService: CommandService) {} async createCat( dto: { name: string; breed: string; age: number }, invokeContext: IInvoke, ) { const { tenantCode } = getUserContext(invokeContext); const pk = `CAT${KEY_SEPARATOR}${tenantCode}`; const sk = `CAT${KEY_SEPARATOR}${Date.now()}`; const command = { pk, sk, id: generateId(pk, sk), tenantCode, code: sk, type: 'CAT', name: dto.name, version: VERSION_FIRST, attributes: { breed: dto.breed, age: dto.age, }, }; const result = await this.commandService.publishAsync(command, { source: getCommandSource(basename(__dirname), this.constructor.name, 'createCat'), invokeContext, }); return result; } } ``` ## CommandService.publishPartialUpdateAsync Update an existing entity by publishing only the changed fields. Requires the version field for optimistic locking. ```typescript import { CommandService, CommandPartialInputModel, DataService, IInvoke, } from '@mbc-cqrs-serverless/core'; import { Injectable, NotFoundException } from '@nestjs/common'; @Injectable() export class CatService { constructor( private readonly commandService: CommandService, private readonly dataService: DataService, ) {} async updateCat( pk: string, sk: string, updateDto: { name?: string; age?: number }, invokeContext: IInvoke, ) { const existing = await this.dataService.getItem({ pk, sk }); if (!existing) { throw new NotFoundException('Cat not found'); } const command: CommandPartialInputModel = { pk: existing.pk, sk: existing.sk, version: existing.version, name: updateDto.name ?? existing.name, attributes: { ...existing.attributes, age: updateDto.age ?? existing.attributes.age, }, }; return this.commandService.publishPartialUpdateAsync(command, { invokeContext, }); } } ``` ## DataService.getItem Retrieve a single item from the data table (read model) by its primary key. ```typescript import { DataService, DataModel } from '@mbc-cqrs-serverless/core'; import { Injectable, NotFoundException } from '@nestjs/common'; @Injectable() export class CatService { constructor(private readonly dataService: DataService) {} async getCat(pk: string, sk: string): Promise { const item = await this.dataService.getItem({ pk, sk }); if (!item) { throw new NotFoundException('Cat not found'); } return item; } } ``` ## DataService.listItemsByPk Query multiple items by partition key with optional sort key filtering and pagination. ```typescript import { DataService, KEY_SEPARATOR } from '@mbc-cqrs-serverless/core'; import { Injectable } from '@nestjs/common'; @Injectable() export class CatService { constructor(private readonly dataService: DataService) {} async listCats(tenantCode: string, limit = 20, lastSk?: string) { const pk = `CAT${KEY_SEPARATOR}${tenantCode}`; const result = await this.dataService.listItemsByPk(pk, { sk: { skExpression: 'begins_with(sk, :prefix)', skAttributeValues: { ':prefix': `CAT${KEY_SEPARATOR}` }, }, limit, startFromSk: lastSk, order: 'desc', }); return { items: result.items, lastSk: result.lastSk, }; } } ``` ## IDataSyncHandler Implementation Create a data sync handler to synchronize command data to RDS (PostgreSQL) for complex queries. ```typescript import { CommandModel, IDataSyncHandler } from '@mbc-cqrs-serverless/core'; import { Injectable, Logger } from '@nestjs/common'; import { PrismaService } from 'src/prisma'; @Injectable() export class CatDataSyncRdsHandler implements IDataSyncHandler { private readonly logger = new Logger(CatDataSyncRdsHandler.name); constructor(private readonly prismaService: PrismaService) {} async up(cmd: CommandModel): Promise { this.logger.debug(`Syncing to RDS: ${cmd.pk}/${cmd.sk}`); await this.prismaService.cat.upsert({ where: { id: cmd.id }, create: { id: cmd.id, pk: cmd.pk, sk: cmd.sk, code: cmd.code, name: cmd.name, tenantCode: cmd.tenantCode, breed: cmd.attributes?.breed, age: cmd.attributes?.age, isDeleted: cmd.isDeleted ?? false, createdAt: new Date(), updatedAt: new Date(), }, update: { name: cmd.name, breed: cmd.attributes?.breed, age: cmd.attributes?.age, isDeleted: cmd.isDeleted ?? false, updatedAt: new Date(), }, }); } async down(cmd: CommandModel): Promise { await this.prismaService.cat.delete({ where: { id: cmd.id }, }).catch(() => { /* Ignore if already deleted */ }); } } ``` ## SequencesService.generateSequenceItem Generate unique, formatted sequential IDs with support for rotation cycles (daily, monthly, yearly, fiscal yearly). ```typescript import { SequencesService, RotateByEnum } from '@mbc-cqrs-serverless/sequence'; import { Injectable } from '@nestjs/common'; @Injectable() export class OrderService { constructor(private readonly sequencesService: SequencesService) {} async generateOrderNumber(tenantCode: string, invokeContext: any) { const sequence = await this.sequencesService.generateSequenceItem( { tenantCode, typeCode: 'ORDER', rotateBy: RotateByEnum.YEARLY, params: { code1: 'ORD' }, date: new Date(), }, { invokeContext }, ); return sequence; // Returns: { id: '...', no: 1, formattedNo: 'ORD-2024-001', issuedAt: Date } } async generateInvoiceNumber(tenantCode: string, invokeContext: any) { const sequence = await this.sequencesService.generateSequenceItemWithProvideSetting( { tenantCode, typeCode: 'INVOICE', format: '%%code1%%-%%fiscal_year%%-%%no#:0>5%%', rotateBy: RotateByEnum.FISCAL_YEARLY, startMonth: 4, params: { code1: 'INV' }, prefix: 'JP-', postfix: '-FINAL', }, { invokeContext }, ); return sequence; // Returns: { formattedNo: 'JP-INV-2024-00001-FINAL', no: 1, ... } } } ``` ## TenantService Manage multi-tenant operations including creating tenants, groups, and configuring tenant hierarchies. ```typescript import { TenantService } from '@mbc-cqrs-serverless/tenant'; import { Injectable } from '@nestjs/common'; @Injectable() export class TenantManagementService { constructor(private readonly tenantService: TenantService) {} async setupNewTenant(invokeContext: any) { // Create a new tenant const tenant = await this.tenantService.createTenant( { code: 'tenant001', name: 'Acme Corporation', attributes: { industry: 'technology', plan: 'enterprise', timezone: 'Asia/Tokyo', }, }, { invokeContext }, ); // Add a group to the tenant await this.tenantService.addTenantGroup( { tenantCode: 'tenant001', groupId: 'admin-group', role: 'admin', }, { invokeContext }, ); // Create a sub-tenant/department await this.tenantService.createTenantGroup( 'tenant001', { code: 'engineering', name: 'Engineering Department', attributes: { department: 'engineering' }, }, { invokeContext }, ); return tenant; } async getTenant(pk: string, sk: string) { return this.tenantService.getTenant({ pk, sk }); } } ``` ## TaskService Create and manage asynchronous tasks with Step Functions integration for long-running operations. ```typescript import { TaskService } from '@mbc-cqrs-serverless/task'; import { Injectable } from '@nestjs/common'; @Injectable() export class BatchProcessingService { constructor(private readonly taskService: TaskService) {} async createBatchTask(items: any[], tenantCode: string, invokeContext: any) { // Create a Step Functions task that processes items in parallel const task = await this.taskService.createStepFunctionTask( { taskType: 'batch-import', tenantCode, name: 'Import batch data', input: items.map((item, index) => ({ index, data: item, })), }, { invokeContext }, ); return task; } async getTaskStatus(pk: string, sk: string) { const task = await this.taskService.getTask({ pk, sk }); // Get all subtasks and format status const subTasks = await this.taskService.getAllSubTask({ pk, sk }); const status = await this.taskService.formatTaskStatus(subTasks); return { task, progress: status, // Returns: { subTaskCount: 10, subTaskSucceedCount: 7, subTaskFailedCount: 1, ... } }; } async updateTaskProgress(pk: string, sk: string, result: any) { await this.taskService.updateStatus( { pk, sk }, 'COMPLETED', { result }, ); } async listTasks(tenantCode: string) { return this.taskService.listItemsByPk(tenantCode, 'SFN_TASK', { limit: 20, order: 'desc', }); } } ``` ## EmailService Send emails via AWS SES with support for attachments and inline templates. ```typescript import { EmailService, EmailNotification, TemplatedEmailNotification } from '@mbc-cqrs-serverless/core'; import { Injectable } from '@nestjs/common'; import * as fs from 'fs'; @Injectable() export class NotificationService { constructor(private readonly emailService: EmailService) {} async sendWelcomeEmail(user: { email: string; name: string }) { const notification: TemplatedEmailNotification = { toAddrs: [user.email], template: { subject: 'Welcome, {{name}}!', html: `

Hello {{name}}

Welcome to our service!

Your account has been created successfully.

`, text: 'Hello {{name}}, Welcome to our service!', }, data: { name: user.name, }, }; await this.emailService.sendInlineTemplateEmail(notification); } async sendReportWithAttachment(email: string, reportBuffer: Buffer) { const notification: EmailNotification = { toAddrs: [email], subject: 'Monthly Report', body: '

Please find your monthly report attached.

', attachments: [ { filename: 'report.pdf', content: reportBuffer, contentType: 'application/pdf', }, ], }; await this.emailService.sendEmail(notification); } } ``` ## AppSyncService Send real-time notifications to WebSocket clients via AWS AppSync. ```typescript import { AppSyncService, INotification } from '@mbc-cqrs-serverless/core'; import { Injectable } from '@nestjs/common'; @Injectable() export class RealtimeService { constructor(private readonly appSyncService: AppSyncService) {} async notifyOrderUpdate(tenantCode: string, orderId: string, status: string) { const notification: INotification = { id: `order-${orderId}-${Date.now()}`, table: 'orders', pk: `ORDER#${tenantCode}`, sk: `ORDER#${orderId}`, tenantCode, action: 'MODIFY', content: { orderId, status, updatedAt: new Date().toISOString(), }, }; await this.appSyncService.sendMessage(notification); } } ``` ## Event Handler Pattern Create custom event handlers for processing events from S3, SQS, DynamoDB Streams, and Step Functions. ```typescript import { EventHandler, IEventHandler, EventFactory, DefaultEventFactory, IEvent, } from '@mbc-cqrs-serverless/core'; import { Injectable, Logger } from '@nestjs/common'; import { S3Event } from 'aws-lambda'; // Define custom event export class FileUploadEvent implements IEvent { source: string; bucket: string; key: string; size: number; constructor(data: Partial) { Object.assign(this, data); } } // Create event factory @EventFactory() @Injectable() export class CustomEventFactory extends DefaultEventFactory { async transformS3(event: S3Event): Promise { return event.Records.map(record => new FileUploadEvent({ source: 's3', bucket: record.s3.bucket.name, key: decodeURIComponent(record.s3.object.key), size: record.s3.object.size, })); } } // Create event handler @EventHandler(FileUploadEvent) @Injectable() export class FileUploadHandler implements IEventHandler { private readonly logger = new Logger(FileUploadHandler.name); async execute(event: FileUploadEvent): Promise<{ processed: boolean }> { this.logger.log(`Processing file: ${event.key} from bucket: ${event.bucket}`); // Process the uploaded file const extension = event.key.split('.').pop()?.toLowerCase(); if (extension === 'csv') { // Process CSV file this.logger.log('Processing CSV import'); } return { processed: true }; } } ``` ## Complete CRUD Service Example Full service implementation demonstrating create, read, update, and soft delete operations. ```typescript import { CommandService, CommandPartialInputModel, DataService, generateId, getUserContext, VERSION_FIRST, KEY_SEPARATOR, IInvoke, } from '@mbc-cqrs-serverless/core'; import { Injectable, NotFoundException } from '@nestjs/common'; import { PrismaService } from 'src/prisma'; import { ulid } from 'ulid'; const PRODUCT_PK_PREFIX = 'PRODUCT'; @Injectable() export class ProductService { constructor( private readonly commandService: CommandService, private readonly dataService: DataService, private readonly prismaService: PrismaService, ) {} async create(dto: { name: string; price: number; category: string }, invokeContext: IInvoke) { const { tenantCode } = getUserContext(invokeContext); const pk = `${PRODUCT_PK_PREFIX}${KEY_SEPARATOR}${tenantCode}`; const sk = ulid(); const command = { pk, sk, id: generateId(pk, sk), tenantCode, code: sk, type: 'PRODUCT', name: dto.name, version: VERSION_FIRST, attributes: { price: dto.price, category: dto.category }, }; return this.commandService.publishAsync(command, { invokeContext }); } async findOne(pk: string, sk: string) { const item = await this.dataService.getItem({ pk, sk }); if (!item) throw new NotFoundException('Product not found'); return item; } async findAll(tenantCode: string, page = 1, limit = 20) { const skip = (page - 1) * limit; const [total, items] = await Promise.all([ this.prismaService.product.count({ where: { tenantCode, isDeleted: false } }), this.prismaService.product.findMany({ where: { tenantCode, isDeleted: false }, take: limit, skip, orderBy: { createdAt: 'desc' }, }), ]); return { total, items }; } async update(pk: string, sk: string, dto: { name?: string; price?: number }, invokeContext: IInvoke) { const existing = await this.dataService.getItem({ pk, sk }); if (!existing) throw new NotFoundException('Product not found'); const command: CommandPartialInputModel = { pk: existing.pk, sk: existing.sk, version: existing.version, name: dto.name ?? existing.name, attributes: { ...existing.attributes, price: dto.price ?? existing.attributes.price }, }; return this.commandService.publishPartialUpdateAsync(command, { invokeContext }); } async remove(pk: string, sk: string, invokeContext: IInvoke) { const existing = await this.dataService.getItem({ pk, sk }); if (!existing) throw new NotFoundException('Product not found'); const command: CommandPartialInputModel = { pk: existing.pk, sk: existing.sk, version: existing.version, isDeleted: true, }; return this.commandService.publishPartialUpdateAsync(command, { invokeContext }); } } ``` ## Running the Development Server Start the local development environment with Docker and Serverless Offline. ```bash # Start Docker services (DynamoDB, PostgreSQL, LocalStack) npm run offline:docker # Run database migrations npm run migrate # Start Serverless Offline (in a new terminal) npm run offline:sls # Available endpoints: # - API Gateway: http://localhost:3000 # - DynamoDB: http://localhost:8000 # - DynamoDB Admin: http://localhost:8001 # - Step Functions: http://localhost:8083 # - Cognito: http://localhost:9229 # - SQS: http://localhost:9324 ``` ## Summary The MBC CQRS Serverless Framework provides a comprehensive solution for building enterprise serverless applications with CQRS and Event Sourcing patterns. Core use cases include multi-tenant SaaS applications, event-driven microservices, batch processing systems, and real-time collaborative applications. The framework excels at separating read and write models, maintaining complete audit trails through event sourcing, and scaling independently for different workloads. Integration patterns center around the CommandService for write operations that trigger data sync handlers, DataService for optimized reads, and event handlers for processing asynchronous events from various AWS sources. The modular architecture allows selective adoption of features including sequence generation, task management, tenant isolation, and notifications. For production deployments, the framework integrates with AWS CDK for infrastructure provisioning, supporting both development and production environments with consistent patterns and minimal configuration changes.