### Install Dependencies with Gem and Pip Source: https://github.com/cosmicpython/book/blob/master/Readme.md Installs necessary tools like asciidoctor, Pygments, and asciidoctor-diagram using gem and pip. Ensure you have Ruby and Python environments set up. ```sh gem install asciidoctor python2 -m pip install --user pygments gem install pygments.rb gem install asciidoctor-diagram ``` -------------------------------- ### SQLAlchemy Repository Source: https://context7.com/cosmicpython/book/llms.txt An example of a SQLAlchemy repository implementation for retrieving product data. ```APIDOC ## SqlAlchemyRepository.get ### Description Retrieves a product from the database using its SKU. ### Method ```python get(self, sku) -> Product ``` ### Parameters - **sku** (str) - Description: The stock keeping unit of the product to retrieve. ### Returns - **Product** - The product object if found, otherwise None. ``` -------------------------------- ### Unit Test Using Test Bootstrap Source: https://context7.com/cosmicpython/book/llms.txt An example unit test that utilizes the test bootstrap function to create a message bus with fake dependencies. It then handles a command and asserts the state of the fake Unit of Work. ```python # Unit test using the test bootstrap def test_add_batch(): bus = bootstrap_test_app() bus.handle(commands.CreateBatch("b1", "CRUNCHY-ARMCHAIR", 100, None)) assert bus.uow.products.get("CRUNCHY-ARMCHAIR") is not None assert bus.uow.committed ``` -------------------------------- ### Redis Event Consumer for Batch Quantity Changes Source: https://context7.com/cosmicpython/book/llms.txt Listens to a Redis channel ('change_batch_quantity') for messages, decodes JSON payloads, and dispatches a ChangeBatchQuantity command to the internal message bus. Requires Redis connection and bootstrap setup. ```python # redis_eventconsumer.py import json import redis import config from allocation import bootstrap, commands r = redis.Redis(**config.get_redis_host_and_port()) def main(): bus = bootstrap.bootstrap() pubsub = r.pubsub(ignore_subscribe_messages=True) pubsub.subscribe("change_batch_quantity") # <-- external channel for message in pubsub.listen(): handle_change_batch_quantity(message, bus) def handle_change_batch_quantity(message, bus): data = json.loads(message["data"]) cmd = commands.ChangeBatchQuantity(ref=data["batchid"], qty=data["qty"]) bus.handle(cmd) ``` -------------------------------- ### End-to-End Test for Allocation Source: https://context7.com/cosmicpython/book/llms.txt Tests the happy path for allocating items and then viewing the allocations. It uses API client methods to post and get data, asserting the expected status codes and results. ```python def test_happy_path_returns_202_and_allocation_is_viewable(): orderid = random_orderid() sku = random_sku() batchref = random_batchref() api_client.post_to_add_batch(batchref, sku, 100, "2030-01-01") r = api_client.post_to_allocate(orderid, sku, qty=3) assert r.status_code == 202 r = api_client.get_allocation(orderid) assert r.ok assert r.json() == [{"sku": sku, "batchref": batchref}] ``` -------------------------------- ### Build and Test Book Content Source: https://github.com/cosmicpython/book/blob/master/Readme.md Commands to build local HTML versions of book chapters using 'make html' and to perform a sanity check on code listings with 'make test'. ```sh make html # builds local .html versions of each chapter ``` ```sh make test # does a sanity-check of the code listings ``` -------------------------------- ### Command Handler for Allocation Source: https://context7.com/cosmicpython/book/llms.txt Handles the Allocate command by creating an OrderLine and attempting to allocate it to a product. Raises InvalidSku if the product is not found. ```python # handlers.py — command handlers def allocate(cmd: commands.Allocate, uow: AbstractUnitOfWork): line = OrderLine(cmd.orderid, cmd.sku, cmd.qty) with uow: product = uow.products.get(sku=line.sku) if product is None: raise InvalidSku(f"Invalid sku {line.sku}") product.allocate(line) uow.commit() ``` -------------------------------- ### Bootstrap Message Bus with Dependency Injection Source: https://context7.com/cosmicpython/book/llms.txt Configures and returns a MessageBus instance, injecting dependencies like Unit of Work, email sender, and event publisher into handlers using functools.partial. Supports optional ORM startup. ```python import inspect import functools from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import config, orm from adapters import email, redis_eventpublisher from service_layer import handlers, messagebus, unit_of_work def bootstrap( start_orm: bool = True, uow: unit_of_work.AbstractUnitOfWork = None, send_mail: callable = email.send, publish: callable = redis_eventpublisher.publish, ) -> messagebus.MessageBus: if start_orm: orm.start_mappers() if uow is None: uow = unit_of_work.SqlAlchemyUnitOfWork() # Inject dependencies into handlers using functools.partial dependencies = {"uow": uow, "send_mail": send_mail, "publish": publish} injected_event_handlers = { event_type: [inject_dependencies(handler, dependencies) for handler in handler_list] for event_type, handler_list in handlers.EVENT_HANDLERS.items() } injected_command_handlers = { command_type: inject_dependencies(handler, dependencies) for command_type, handler in handlers.COMMAND_HANDLERS.items() } return messagebus.MessageBus( uow=uow, event_handlers=injected_event_handlers, command_handlers=injected_command_handlers, ) def inject_dependencies(handler, dependencies): params = inspect.signature(handler).parameters deps = { name: dependency for name, dependency in dependencies.items() if name in params } return functools.partial(handler, **deps) # Production entrypoint bus = bootstrap() ``` -------------------------------- ### Command Handler for Adding a Batch Source: https://context7.com/cosmicpython/book/llms.txt Handles the CreateBatch command by adding a new batch to an existing product or creating a new product if it doesn't exist. It then commits the changes. ```python def add_batch(cmd: commands.CreateBatch, uow: AbstractUnitOfWork): with uow: product = uow.products.get(cmd.sku) if product is None: product = model.Product(sku=cmd.sku, batches=[]) uow.products.add(product) product.batches.append(model.Batch(cmd.ref, cmd.sku, cmd.qty, cmd.eta)) uow.commit() ``` -------------------------------- ### Test Bootstrap with Fake Dependencies Source: https://context7.com/cosmicpython/book/llms.txt Provides a bootstrap function for testing that uses fake implementations for the Unit of Work, email sending, and event publishing, and skips ORM initialization. ```python # Test bootstrap with fakes def bootstrap_test_app(): return bootstrap( start_orm=False, uow=FakeUnitOfWork(), send_mail=lambda *a, **kw: None, publish=lambda *a, **kw: None, ) ``` -------------------------------- ### Define SQLAlchemy ORM Mappers Source: https://context7.com/cosmicpython/book/llms.txt Sets up SQLAlchemy table definitions and ORM mappers for domain models. Call start_mappers() once at application startup. ```python from sqlalchemy import Table, Column, Integer, String, Date, ForeignKey, MetaData from sqlalchemy.orm import mapper, relationship import model metadata = MetaData() order_lines = Table( "order_lines", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("sku", String(255)), Column("qty", Integer, nullable=False), Column("orderid", String(255)), ) batches = Table( "batches", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("reference", String(255)), Column("sku", String(255)), Column("_purchased_quantity", Integer), Column("eta", Date, nullable=True), ) allocations = Table( "allocations", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("orderline_id", Integer, ForeignKey("order_lines.id")), Column("batch_id", Integer, ForeignKey("batches.id")), ) def start_mappers(): """Call once at app startup. Domain classes stay ORM-ignorant until then.""" lines_mapper = mapper(model.OrderLine, order_lines) mapper(model.Batch, batches, properties={ "_allocations": relationship(lines_mapper, secondary=allocations, collection_class=set), }) ``` -------------------------------- ### Command Definitions for Domain Operations Source: https://context7.com/cosmicpython/book/llms.txt Defines commands such as Allocate, CreateBatch, and ChangeBatchQuantity using dataclasses. These represent user intents or system actions. ```python from dataclasses import dataclass from datetime import date from typing import Optional class Command: pass @dataclass class Allocate(Command): orderid: str sku: str qty: int @dataclass class CreateBatch(Command): ref: str sku: str qty: int eta: Optional[date] = None @dataclass class ChangeBatchQuantity(Command): ref: str qty: int ``` -------------------------------- ### SQLAlchemy Repository Implementation Source: https://context7.com/cosmicpython/book/llms.txt Implements the AbstractRepository using SQLAlchemy sessions for database interactions. Requires a SQLAlchemy session during initialization. ```python class SqlAlchemyRepository(AbstractRepository): def __init__(self, session): self.session = session def add(self, batch: model.Batch): self.session.add(batch) def get(self, reference: str) -> model.Batch: return self.session.query(model.Batch).filter_by(reference=reference).one() def list(self): return self.session.query(model.Batch).all() ``` -------------------------------- ### Fake Repository for Testing Source: https://context7.com/cosmicpython/book/llms.txt A mock repository implementation for unit tests that operates on an in-memory set, avoiding database dependencies. ```python # Fake for unit tests — no database required class FakeRepository(AbstractRepository): def __init__(self, batches): self._batches = set(batches) def add(self, batch): self._batches.add(batch) def get(self, reference): return next(b for b in self._batches if b.reference == reference) def list(self): return list(self._batches) ``` -------------------------------- ### Add Batch Endpoint Source: https://context7.com/cosmicpython/book/llms.txt API endpoint for adding a new batch of products. ```APIDOC ## POST /batches ### Description Creates a new batch for a product. ### Method POST ### Endpoint /batches ### Request Body - **ref** (str) - Required - The reference for the batch. - **sku** (str) - Required - The stock keeping unit of the product. - **qty** (int) - Required - The quantity in the batch. - **eta** (date) - Optional - The estimated time of arrival for the batch. ### Response #### Success Response (201) Returns `OK` indicating the batch was created successfully. ``` -------------------------------- ### Test Repository Batch Saving Source: https://context7.com/cosmicpython/book/llms.txt An integration test demonstrating how to use the SqlAlchemyRepository to add a batch and verify its persistence. ```python # Integration test example def test_repository_can_save_a_batch(session): batch = model.Batch("batch1", "RUSTY-SOAPDISH", 100, eta=None) repo = SqlAlchemyRepository(session) repo.add(batch) session.commit() rows = list(session.execute('SELECT reference, sku FROM "batches"')) assert rows == [("batch1", "RUSTY-SOAPDISH")] ``` -------------------------------- ### Abstract Repository Interface Source: https://context7.com/cosmicpython/book/llms.txt Defines the abstract base class for repository implementations, outlining methods for adding and retrieving batches. ```python import abc import model class AbstractRepository(abc.ABC): @abc.abstractmethod def add(self, batch: model.Batch): raise NotImplementedError @abc.abstractmethod def get(self, reference: str) -> model.Batch: raise NotImplementedError def list(self): raise NotImplementedError ``` -------------------------------- ### Flask Allocation Endpoint Source: https://context7.com/cosmicpython/book/llms.txt Handles POST requests to allocate stock. It uses the services layer and a SQLAlchemy Unit of Work. Returns an error for invalid SKU or out of stock conditions. ```python from flask import Flask, request import services, orm from unit_of_work import SqlAlchemyUnitOfWork orm.start_mappers() app = Flask(__name__) @app.route("/allocate", methods=["POST"]) def allocate_endpoint(): try: batchref = services.allocate( request.json["orderid"], request.json["sku"], request.json["qty"], SqlAlchemyUnitOfWork(), ) except (model.OutOfStock, services.InvalidSku) as e: return {"message": str(e)}, return {"batchref": batchref}, 201 ``` -------------------------------- ### Retrieve Allocations by Order ID Source: https://context7.com/cosmicpython/book/llms.txt Fetches allocation details (SKU and batch reference) for a given order ID from the database. Requires an active SQLAlchemy Unit of Work session. ```python from sqlalchemy import text def allocations(orderid: str, uow): with uow: results = list( uow.session.execute( text("SELECT ol.sku, b.reference AS batchref " "FROM allocations a " "JOIN batches b ON b.id = a.batch_id " "JOIN order_lines ol ON ol.id = a.orderline_id " "WHERE ol.orderid = :orderid"), dict(orderid=orderid), ) ) return [{"sku": sku, "batchref": batchref} for sku, batchref in results] ``` -------------------------------- ### SQLAlchemy Product Table Mapping Source: https://context7.com/cosmicpython/book/llms.txt Defines the SQLAlchemy ORM mapping for the 'products' table, including columns for id, sku, and version_number, with the version_number column configured for optimistic locking. ```python # Optimistic locking in SQLAlchemy ORM mapping products = Table( "products", metadata, Column("id", Integer, primary_key=True, autoincrement=True), Column("sku", String(255)), Column("version_number", Integer, nullable=False, server_default="0"), ) ``` -------------------------------- ### Service Layer: Allocate Function Source: https://context7.com/cosmicpython/book/llms.txt Handles the allocation of stock to an order line. It checks for valid SKUs and uses the Unit of Work to commit changes. ```python def allocate(orderid: str, sku: str, qty: int, uow: "AbstractUnitOfWork") -> str: line = OrderLine(orderid, sku, qty) with uow: batches = uow.batches.list() if not any(b.sku == sku for b in batches): raise InvalidSku(f"Invalid sku {sku}") batchref = model.allocate(line, batches) uow.commit() return batchref ``` -------------------------------- ### Domain Model: Entity, Value Object, and Domain Service in Python Source: https://context7.com/cosmicpython/book/llms.txt Defines core domain objects like OrderLine (Value Object) and Batch (Entity), and a Domain Service function `allocate` for business logic. These are pure Python classes with no infrastructure dependencies. ```python from dataclasses import dataclass from typing import Optional, List from datetime import date @dataclass(frozen=True) class OrderLine: orderid: str sku: str qty: int class Batch: def __init__(self, ref: str, sku: str, qty: int, eta: Optional[date]): self.reference = ref self.sku = sku self.eta = eta self._purchased_quantity = qty self._allocations = set() # type: set[OrderLine] def allocate(self, line: OrderLine): if self.can_allocate(line): self._allocations.add(line) def deallocate(self, line: OrderLine): if line in self._allocations: self._allocations.remove(line) def can_allocate(self, line: OrderLine) -> bool: return self.sku == line.sku and self.available_quantity >= line.qty @property def allocated_quantity(self) -> int: return sum(line.qty for line in self._allocations) @property def available_quantity(self) -> int: return self._purchased_quantity - self.allocated_quantity def __gt__(self, other): if self.eta is None: return False if other.eta is None: return True return self.eta > other.eta class OutOfStock(Exception): pass def allocate(line: OrderLine, batches: List[Batch]) -> str: """Domain service: allocate an order line to the earliest available batch.""" try: batch = next(b for b in sorted(batches) if b.can_allocate(line)) except StopIteration: raise OutOfStock(f"Out of stock for sku {line.sku}") batch.allocate(line) return batch.reference # Usage / unit tests (no database, no framework) batch = Batch("batch-001", "SMALL-TABLE", qty=20, eta=date.today()) line = OrderLine("order-ref", "SMALL-TABLE", 2) batch.allocate(line) assert batch.available_quantity == 18 # 20 - 2 # Domain service selects earliest ETA batch from datetime import timedelta early = Batch("early", "SMALL-TABLE", 100, date.today()) late = Batch("late", "SMALL-TABLE", 100, date.today() + timedelta(days=7)) ref = allocate(OrderLine("o1", "SMALL-TABLE", 10), [late, early]) assert ref == "early" ``` -------------------------------- ### Flask Endpoint for Allocation Source: https://context7.com/cosmicpython/book/llms.txt A Flask route that handles POST requests to allocate stock. It parses the request JSON, creates an Allocate command, and dispatches it via the message bus. ```python @app.route("/allocate", methods=["POST"]) def allocate_endpoint(): cmd = commands.Allocate( request.json["orderid"], request.json["sku"], request.json["qty"], ) messagebus.handle(cmd, uow=SqlAlchemyUnitOfWork()) return "OK", 202 ``` -------------------------------- ### SQLAlchemy Unit of Work Implementation Source: https://context7.com/cosmicpython/book/llms.txt Implements the Unit of Work pattern using SQLAlchemy for database sessions. It manages the session lifecycle and repository instantiation within a context manager. ```python class SqlAlchemyUnitOfWork(AbstractUnitOfWork): def __init__(self, session_factory=DEFAULT_SESSION_FACTORY): self.session_factory = session_factory def __enter__(self): self.session = self.session_factory() self.batches = repository.SqlAlchemyRepository(self.session) return super().__enter__() def __exit__(self, *args): super().__exit__(*args) self.session.close() def commit(self): self.session.commit() def rollback(self): self.session.rollback() ``` -------------------------------- ### Flask Endpoint for Allocations View Source: https://context7.com/cosmicpython/book/llms.txt A Flask route that exposes the allocations view function. It initializes a SQLAlchemy Unit of Work and returns allocation data or a 404 if not found. ```python @app.route("/allocations/", methods=["GET"]) def allocations_view_endpoint(orderid): uow = SqlAlchemyUnitOfWork() result = views.allocations(orderid, uow) if not result: return "not found", 404 return jsonify(result), 200 ``` -------------------------------- ### Domain Event Definitions Source: https://context7.com/cosmicpython/book/llms.txt Defines various domain events such as OutOfStock, AllocationRequired, and BatchQuantityChanged using dataclasses. ```python from dataclasses import dataclass class Event: pass @dataclass class OutOfStock(Event): sku: str @dataclass class AllocationRequired(Event): orderid: str sku: str qty: int @dataclass class BatchQuantityChanged(Event): ref: str qty: int ``` -------------------------------- ### Flask Endpoint for Adding a Batch Source: https://context7.com/cosmicpython/book/llms.txt A Flask route that handles POST requests to create a new batch. It parses the request JSON, creates a CreateBatch command, and dispatches it via the message bus. ```python # Flask entrypoint dispatches commands via messagebus @app.route("/batches", methods=["POST"]) def add_batch(): cmd = commands.CreateBatch( request.json["ref"], request.json["sku"], int(request.json["qty"]), request.json.get("eta"), ) messagebus.handle(cmd, uow=SqlAlchemyUnitOfWork()) return "OK", 201 ``` -------------------------------- ### Message Bus Implementation Source: https://context7.com/cosmicpython/book/llms.txt Implements a message bus that dispatches domain events to registered handlers. It processes events from a queue and collects new events from the unit of work. ```python from typing import Dict, Type, List, Callable import logging logger = logging.getLogger(__name__) HANDLERS: Dict[Type[Event], List[Callable]] = {} def handle(events_list, uow): queue = list(events_list) while queue: event = queue.pop(0) for handler in HANDLERS.get(type(event), []): handler(event, uow=uow) queue.extend(uow.collect_new_events()) ``` -------------------------------- ### Unit test for sending email on OutOfStock event Source: https://context7.com/cosmicpython/book/llms.txt Tests that the send_out_of_stock_notification handler is called and sends an email when an AllocationRequired event occurs, simulating an out-of-stock scenario. It uses mocking to prevent actual email sending. ```python # Unit test: event raised, handler called, no real email sent def test_sends_email_on_out_of_stock(uow): uow.products.add(Product("POPULAR-CURTAINS", [Batch("b1", "POPULAR-CURTAINS", 9, None)])) uow.commit() with mock.patch("allocation.adapters.email.send") as mock_send: messagebus.handle([events.AllocationRequired("o1", "POPULAR-CURTAINS", 10)], uow) assert mock_send.call_args == call( "stock@made.com", "Out of stock for POPULAR-CURTAINS", ) ``` -------------------------------- ### Command Handlers Source: https://context7.com/cosmicpython/book/llms.txt Functions that execute commands, representing an intent to change the system's state. ```APIDOC ## Command Handlers ### Description Command handlers process commands, which express an intent to perform an action. They interact with the domain model and unit of work to achieve the desired state change. ### Handlers - **allocate(cmd, uow)**: Handles the `Allocate` command. - **cmd** (commands.Allocate) - The allocation command. - **uow** (AbstractUnitOfWork) - The unit of work. - **add_batch(cmd, uow)**: Handles the `CreateBatch` command. - **cmd** (commands.CreateBatch) - The command to create a new batch. - **uow** (AbstractUnitOfWork) - The unit of work. ``` -------------------------------- ### Domain Event Handler for OutOfStock Source: https://context7.com/cosmicpython/book/llms.txt Defines a handler function that sends an email notification when an OutOfStock event is triggered. It uses a default email sending function. ```python def send_out_of_stock_notification(event: events.OutOfStock, uow, send_mail=email.send): send_mail( "stock@made.com", f"Out of stock for {event.sku}", ) ``` -------------------------------- ### Handler Wiring for Domain Events Source: https://context7.com/cosmicpython/book/llms.txt Wires up specific domain events to their corresponding handler functions. This configuration is used by the message bus to dispatch events. ```python # Wire up handlers HANDLERS[events.OutOfStock] = [send_out_of_stock_notification] HANDLERS[events.AllocationRequired] = [handlers.allocate] HANDLERS[events.BatchQuantityChanged] = [handlers.change_batch_quantity] ``` -------------------------------- ### Unit test for version number increment Source: https://context7.com/cosmicpython/book/llms.txt Verifies that the version number is incremented correctly when an order line is allocated to a product. ```python def test_increments_version_number_for_default_allocation(): line = OrderLine("oref", "SMALL-TABLE", 10) product = Product("SMALL-TABLE", batches=[Batch("b1", "SMALL-TABLE", 100, None)]) product.version_number = 7 product.allocate(line) assert product.version_number == 8 ``` -------------------------------- ### Product Aggregate with Optimistic Locking Source: https://context7.com/cosmicpython/book/llms.txt Represents a product aggregate that manages its batches and enforces consistency using optimistic locking via a version number. It also collects domain events. ```python # model.py (Chapter 7 — Aggregate) from typing import List, Optional from datetime import date class Product: def __init__(self, sku: str, batches: List[Batch] = None, version_number: int = 0): self.sku = sku self.batches = batches or [] self.version_number = version_number # optimistic locking self.events = [] # collected domain events def allocate(self, line: OrderLine) -> str: try: batch = next(b for b in sorted(self.batches) if b.can_allocate(line)) except StopIteration: self.events.append(events.OutOfStock(sku=line.sku)) return None batch.allocate(line) self.version_number += 1 return batch.reference def change_batch_quantity(self, ref: str, qty: int): batch = next(b for b in self.batches if b.reference == ref) batch._purchased_quantity = qty while batch.available_quantity < 0: line = batch._allocations.pop() self.events.append(events.AllocationRequired(line.orderid, line.sku, line.qty)) ``` -------------------------------- ### Publish Domain Events to Redis Source: https://context7.com/cosmicpython/book/llms.txt Publishes a domain event to a specified Redis channel. The event is serialized to JSON, including its type and dictionary representation of its attributes. Requires a Redis client instance configured with host and port. ```python import json import redis import config r = redis.Redis(**config.get_redis_host_and_port()) def publish(channel, event): r.publish(channel, json.dumps({"type": type(event).__name__, **event.__dict__})) ``` -------------------------------- ### Allocate Endpoint Source: https://context7.com/cosmicpython/book/llms.txt API endpoint for allocating products to an order. ```APIDOC ## POST /allocate ### Description Allocates a specified quantity of a product to an order. ### Method POST ### Endpoint /allocate ### Request Body - **orderid** (str) - Required - The ID of the order. - **sku** (str) - Required - The stock keeping unit of the product to allocate. - **qty** (int) - Required - The quantity to allocate. ### Response #### Success Response (202) Returns `OK` indicating the allocation request has been processed. ``` -------------------------------- ### Allocate Order Source: https://context7.com/cosmicpython/book/llms.txt Allocates a product to an order. It takes order details and uses a Unit of Work to manage the transaction. ```APIDOC ## POST /allocate ### Description Allocates a specified quantity of a product (SKU) to an order. This endpoint handles the business logic for allocation and ensures transactional integrity using a Unit of Work. ### Method POST ### Endpoint /allocate ### Parameters #### Request Body - **orderid** (string) - Required - The ID of the order. - **sku** (string) - Required - The Stock Keeping Unit of the product to allocate. - **qty** (integer) - Required - The quantity of the product to allocate. ### Request Example ```json { "orderid": "order123", "sku": "LAMP-001", "qty": 5 } ``` ### Response #### Success Response (201 Created) - **batchref** (string) - The reference of the batch from which the allocation was made. #### Response Example ```json { "batchref": "batch456" } ``` #### Error Response (400 Bad Request) - **message** (string) - Describes the error, such as 'OutOfStock' or 'InvalidSku'. #### Response Example ```json { "message": "OutOfStock: Not enough stock for SKU LAMP-001" } ``` ``` -------------------------------- ### Fake Unit of Work for Testing Source: https://context7.com/cosmicpython/book/llms.txt A mock implementation of the Unit of Work pattern for use in unit tests. It tracks whether commit has been called but does not perform any actual database operations. ```python # FakeUnitOfWork for unit tests class FakeUnitOfWork(AbstractUnitOfWork): def __init__(self): self.batches = repository.FakeRepository([]) self.committed = False def commit(self): self.committed = True def rollback(self): pass ``` -------------------------------- ### Service Layer: Add Batch Function Source: https://context7.com/cosmicpython/book/llms.txt A service function to add a new batch. It uses a Unit of Work to manage the transaction and repository operations. ```python from typing import TYPE_CHECKING import model from model import OrderLine if TYPE_CHECKING: from unit_of_work import AbstractUnitOfWork class InvalidSku(Exception): pass def add_batch(ref: str, sku: str, qty: int, eta, uow: "AbstractUnitOfWork"): with uow: uow.batches.add(model.Batch(ref, sku, qty, eta)) uow.commit() ``` -------------------------------- ### Service-Layer Unit Test for Allocation Source: https://context7.com/cosmicpython/book/llms.txt A unit test for the services.allocate function that does not involve HTTP or a database. It uses a FakeUnitOfWork for dependency injection. ```python # Service-layer unit test (no HTTP, no database) def test_allocate_returns_allocation(): uow = FakeUnitOfWork() services.add_batch("b1", "COMPLICATED-LAMP", 100, None, uow) result = services.allocate("o1", "COMPLICATED-LAMP", 10, uow) assert result == "b1" ``` -------------------------------- ### Integration Test for Unit of Work Source: https://context7.com/cosmicpython/book/llms.txt An integration test that verifies the Unit of Work pattern by performing a round-trip through a real database. It inserts a batch, allocates to it, commits, and then verifies the allocation. ```python # Integration test: round-trip through real DB def test_uow_can_retrieve_a_batch_and_allocate_to_it(session_factory): session = session_factory() session.execute( "INSERT INTO batches (reference, sku, _purchased_quantity, eta) " "VALUES ('batch1', 'HIPSTER-WORKBENCH', 100, null)" ) session.commit() uow = SqlAlchemyUnitOfWork(session_factory) with uow: batch = uow.batches.get(reference="batch1") line = model.OrderLine("o1", "HIPSTER-WORKBENCH", 10) batch.allocate(line) uow.commit() # verify persistence new_session = session_factory() rows = list(new_session.execute("SELECT orderline_id FROM allocations")) assert len(rows) == 1 ``` -------------------------------- ### Message Bus Source: https://context7.com/cosmicpython/book/llms.txt Handles the dispatching of domain events to registered handlers. ```APIDOC ## Message Bus ### Description The message bus is responsible for dispatching domain events to their corresponding handlers. It processes a queue of events, ensuring that handlers are called and new events generated by handlers are also processed. ### Function - **handle(events_list, uow)**: Processes a list of events. - **events_list** (List[Event]) - A list of domain events to handle. - **uow** (AbstractUnitOfWork) - The unit of work associated with the transaction. ``` -------------------------------- ### Test SQLAlchemy OrderLine Mapper Source: https://context7.com/cosmicpython/book/llms.txt A test function to verify that the OrderLine ORM mapper can load data correctly from the database. ```python # Testing the mapping (throwaway test) # pytest conftest.py provides a `session` fixture backed by SQLite/Postgres def test_orderline_mapper_can_load_lines(session): session.execute( "INSERT INTO order_lines (orderid, sku, qty) VALUES " '("order1", "RED-CHAIR", 12),' '("order1", "RED-TABLE", 13),' '("order2", "BLUE-LIPSTICK", 14)' ) expected = [ model.OrderLine("order1", "RED-CHAIR", 12), model.OrderLine("order1", "RED-TABLE", 13), model.OrderLine("order2", "BLUE-LIPSTICK", 14), ] assert session.query(model.OrderLine).all() == expected ``` -------------------------------- ### Domain Events Source: https://context7.com/cosmicpython/book/llms.txt Defines domain events that can be raised within the application and collected on aggregates. ```APIDOC ## Domain Events ### Description Domain events represent facts that have occurred within the domain. They are collected on aggregates and dispatched by the message bus after a transaction commits. ### Event Types - **OutOfStock**: Indicates that a product is out of stock. - **sku** (str) - The stock keeping unit of the out-of-stock product. - **AllocationRequired**: Signals that an allocation is needed for an order. - **orderid** (str) - The ID of the order. - **sku** (str) - The stock keeping unit. - **qty** (int) - The quantity required. - **BatchQuantityChanged**: Notifies that the quantity of a batch has changed. - **ref** (str) - The reference of the batch. - **qty** (int) - The new quantity. ``` -------------------------------- ### Integration Test for Event Publishing with Fakeredis Source: https://context7.com/cosmicpython/book/llms.txt An integration test simulating external Redis message consumption to verify event handling and side effects. It uses fakeredis for in-memory Redis operations. This test checks if an order is reallocated to a different batch after a quantity change. ```python def test_change_batch_quantity_leading_to_reallocation(): bus = bootstrap.bootstrap(start_orm=True, uow=SqlAlchemyUnitOfWork()) bus.handle(commands.CreateBatch("batch1", "INDIFFERENT-TABLE", 50, None)) bus.handle(commands.Allocate("order1", "INDIFFERENT-TABLE", 40)) # Simulate external Redis message arriving redis_eventconsumer.handle_change_batch_quantity( {"data": json.dumps({"batchid": "batch1", "qty": 10})}, bus, ) # order1 should have been reallocated to a different batch with SqlAlchemyUnitOfWork() as uow: product = uow.products.get("INDIFFERENT-TABLE") assert "order1" not in [ line.orderid for b in product.batches for line in b._allocations if b.reference == "batch1" ] ``` -------------------------------- ### Abstract Unit of Work Definition Source: https://context7.com/cosmicpython/book/llms.txt Defines the interface for the Unit of Work pattern using an abstract base class. It includes methods for commit and rollback, and context management. ```python # unit_of_work.py import abc from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import repository, config DEFAULT_SESSION_FACTORY = sessionmaker( bind=create_engine(config.get_postgres_uri()) ) class AbstractUnitOfWork(abc.ABC): batches: repository.AbstractRepository def __enter__(self): return self def __exit__(self, *args): self.rollback() @abc.abstractmethod def commit(self): raise NotImplementedError @abc.abstractmethod def rollback(self): raise NotImplementedError ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.