### Windows Local Environment Setup Source: https://archman.dev/docs/welcome/prerequisites Installs Chocolatey, Node.js LTS, Docker Desktop, and optional kind for Windows. Verifies installations and demonstrates basic kind cluster creation and deletion. ```shell # Install Chocolatey (if not installed) # Run PowerShell as Administrator, then: Set-ExecutionPolicy Bypass -Scope Process -Force; [System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072; iex ((New-Object System.Net.WebClient).DownloadString('https://community.chocolatey.org/install.ps1')) # Install Node.js LTS choco install nodejs -y # Verify node --version npm --version # Install Docker Desktop # Download from https://www.docker.com/products/docker-desktop # Or via Chocolatey: choco install docker-desktop -y # Start Docker Desktop and grant permissions # Verify docker --version # (Optional) Install kind choco install kind -y # Test local k8s kind create cluster kubectl cluster-info kind delete cluster ``` -------------------------------- ### macOS Local Environment Setup Source: https://archman.dev/docs/welcome/prerequisites Installs Homebrew, Node.js LTS, npm/pnpm, Docker Desktop, and optional kind/minikube for macOS. Verifies installations and demonstrates basic kind cluster creation and deletion. ```shell # Install Homebrew (if not already installed) /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" # Install Node.js LTS brew install node # Verify installation node --version # v18.17.0+ npm --version # 8.0.0+ # (Optional) Install pnpm for faster installs npm install -g pnpm # Install Docker Desktop (GUI) # Download from https://www.docker.com/products/docker-desktop # Or via Homebrew: brew install --cask docker # Start Docker Desktop (once) and grant permissions # Verify Docker docker --version # Docker version 20.10+ # (Optional) Install kind for local Kubernetes brew install kind # (Optional) Install minikube as alternative brew install minikube # Test local k8s kind create cluster kubectl cluster-info kind delete cluster # Done! You can now run examples. ``` -------------------------------- ### Example Workflow Simulation (Python) Source: https://archman.dev/docs/domain-driven-design/ddd-in-practice/event-storming-workshops Demonstrates the execution flow of the system by simulating the creation of an order and appending the initial event to the event store. This setup initializes the services and saga, then triggers the order creation process. ```python from datetime import datetime from dataclasses import dataclass # --- Mock EventStore and Event Definitions for Simulation --- class EventStore: def __init__(self): self.subscribers = {} self.events = [] def subscribe(self, event_type, handler): if event_type not in self.subscribers: self.subscribers[event_type] = [] self.subscribers[event_type].append(handler) def append(self, event): print(f"EventStore: Appending {type(event).__name__} for {getattr(event, 'order_id', 'N/A')}") self.events.append(event) self.publish(event) def publish(self, event): event_type = type(event) if event_type in self.subscribers: for handler in self.subscribers[event_type]: handler(event) @dataclass(frozen=True) class OrderCreated: order_id: str customer_id: str items: list total_amount: float created_at: datetime @dataclass(frozen=True) class OrderConfirmed: order_id: str confirmed_at: datetime @dataclass(frozen=True) class FraudCheckStarted: order_id: str amount: float customer_id: str started_at: datetime @dataclass(frozen=True) class FraudCheckPassed: order_id: str passed_at: datetime @dataclass(frozen=True) class FraudCheckFailed: order_id: str reason: str failed_at: datetime @dataclass(frozen=True) class ShipmentPrepared: order_id: str warehouse_id: str prepared_at: datetime # --- Service and Saga Definitions (as provided previously) --- class OrderAggregate: """Order aggregate - consistency boundary""" def __init__(self, order_id): self.order_id = order_id self.events = [] self.status = "pending" self.items = [] self.amount = 0.0 def create_order(self, customer_id, items, total_amount): """Command that creates domain event""" event = OrderCreated( order_id=self.order_id, customer_id=customer_id, items=items, total_amount=total_amount, created_at=datetime.now() ) self.events.append(event) self.status = "created" return event def confirm_order(self): """Can only confirm after fraud check passes""" event = OrderConfirmed(order_id=self.order_id, confirmed_at=datetime.now()) self.events.append(event) self.status = "confirmed" return event class FraudCheckService: """Fraud context - independent consistency boundary""" def __init__(self, event_store): self.event_store = event_store self.pending_checks = {} event_store.subscribe(OrderCreated, self.on_order_created) def on_order_created(self, event: OrderCreated): print(f"Fraud: Checking order {event.order_id}") fraud_event = FraudCheckStarted( order_id=event.order_id, amount=event.total_amount, customer_id=event.customer_id, started_at=datetime.now() ) self.event_store.append(fraud_event) self.pending_checks[event.order_id] = fraud_event if event.total_amount > 10000: result = FraudCheckFailed( order_id=event.order_id, reason="manual_review_required", failed_at=datetime.now() ) else: result = FraudCheckPassed( order_id=event.order_id, passed_at=datetime.now() ) self.event_store.append(result) class FulfillmentService: """Fulfillment context - owns shipment logic""" def __init__(self, event_store): self.event_store = event_store event_store.subscribe(FraudCheckPassed, self.on_fraud_check_passed) def on_fraud_check_passed(self, event: FraudCheckPassed): print(f"Fulfillment: Preparing shipment for {event.order_id}") shipment_event = ShipmentPrepared( order_id=event.order_id, warehouse_id="warehouse-001", prepared_at=datetime.now() ) self.event_store.append(shipment_event) class OrderFulfillmentSaga: """ Saga coordinates multi-context process: Order Created → Fraud Check → Shipment Preparation → Shipment """ def __init__(self, event_store): self.event_store = event_store event_store.subscribe(OrderConfirmed, self.on_order_confirmed) event_store.subscribe(FraudCheckFailed, self.on_fraud_check_failed) def on_order_confirmed(self, event: OrderConfirmed): print(f"Saga: Order {event.order_id} confirmed, initiating fulfillment") def on_fraud_check_failed(self, event: FraudCheckFailed): print(f"Saga: Order {event.order_id} failed fraud check ({event.reason}), cancelling") # --- Example usage --- print("--- Starting Order Workflow Simulation ---") event_store = EventStore() fraud_service = FraudCheckService(event_store) fulfillment_service = FulfillmentService(event_store) saga = OrderFulfillmentSaga(event_store) # Simulate workflow order = OrderAggregate("order-123") order_created_event = order.create_order("cust-456", [{"sku": "item-1", "qty": 2}], 5000.0) # Manually append the first event to trigger the chain reaction event_store.append(order_created_event) # To simulate order confirmation (which would happen after fraud check passes in a real scenario) # For this example, we'll manually trigger confirmation to see saga reaction # In a real system, OrderConfirmed would be published by another service or aggregate # For demonstration, let's assume fraud check passed and order is confirmed print("\n--- Simulating Order Confirmation ---") # This part is a simplification; in reality, OrderConfirmed would be a result of other events/actions # We are directly calling the aggregate method here for demonstration purposes # A more accurate simulation might involve another service reacting to FraudCheckPassed # For now, we simulate the event being published: order_confirmed_event = OrderConfirmed(order_id="order-123", confirmed_at=datetime.now()) event_store.append(order_confirmed_event) print("\n--- Simulation Complete ---") ``` -------------------------------- ### Raft Server Initialization and Peer Setup (Go) Source: https://archman.dev/docs/foundational-concepts/data-fundamentals/consistency-models/strong Initializes a Raft server and sets up peer connections. It creates multiple RaftServer instances and configures them to know about each other, which is a prerequisite for forming a cluster and starting the consensus process. ```go func main() { fmt.Println("=== Strong Consistency with Raft ===\n") servers := make([]*RaftServer, 3) for i := 0; i < 3; i++ { servers[i] = NewRaftServer(fmt.Sprintf("Server%d", i+1)) } for _, server := range servers { for _, s := range servers { if s.ServerID != server.ServerID { server.Peers = append(server.Peers, s) } } } // ... (rest of the main function for election, writes, replication, reads) ``` -------------------------------- ### Order Creation Data Flow Example (Python) Source: https://archman.dev/docs/domain-driven-design/ddd-in-practice/aligning-bounded-contexts-to-microservices Demonstrates the data flow for creating an order, starting with a client's POST request, followed by the Orders Service handling the request, saving to its database, and publishing an 'OrderCreated' event. Includes example JSON payload and event structure. ```json # 1. Client creates order POST /orders { "customer_id": "cust_123", "items": [ {"product_id": "prod_456", "quantity": 2} ] } ``` ```python # 2. Orders Service handles request class OrderService: def create_order(self, customer_id, items): # Create order order = Order(uuid4(), customer_id) for item in items: order.add_item(item['product_id'], item['quantity']) # Save to Orders DB (only this service can access) order_repo.save(order) # INSERT into orders table # Publish event event_bus.publish(OrderCreated(order.id, customer_id, items)) return order ``` ```json # 3. Events published to message broker OrderCreated event: { "event_id": "evt_789", "event_type": "OrderCreated", "order_id": "ord_123", "customer_id": "cust_123", "items": [...] } ``` -------------------------------- ### Validate Local Environment Setup Source: https://archman.dev/docs/welcome/prerequisites This command sequence verifies the installation and proper functioning of Node.js, Docker, and optionally Kubernetes (using kind). It's a crucial first step after setting up the local environment to ensure readiness for development. ```bash # 1. Verify Node.js node -e "console.log('Node.js is working:', process.version)" # 2. Verify Docker docker run hello-world # 3. (Optional) Verify Kubernetes kind create cluster --wait 5m kubectl cluster-info kubectl get nodes # 4. If all three succeed: You're ready to start! ``` -------------------------------- ### Go: Example Workshop Usage Source: https://archman.dev/docs/domain-driven-design/ddd-in-practice/event-storming-workshops This Go code demonstrates an example usage of the event store, fraud service, and fulfillment service. It simulates creating an order and shows how events cascade through the system, printing all events stored. It requires implementations of EventStore, FraudService, and FulfillmentService. ```go // Example usage func ExampleWorkshop() { es := NewEventStore() fraud := NewFraudService(es) fulfillment := NewFulfillmentService(es) _ = fraud _ = fulfillment // Create order order := NewOrderAggregate("order-123") orderEvent := order.CreateOrder("cust-456", []OrderItem{ {SKU: "item-1", Qty: 2}, }, 5000.0) es.Append(orderEvent) // Events cascade through contexts fmt.Println("\nEvents in store:") for _, event := range es.events { fmt.Printf(" - %s: %v\n", event.EventType(), event) } } ``` -------------------------------- ### Istio AuthorizationPolicy: Access Control Source: https://archman.dev/docs/distributed-systems-and-microservices/communication/service-mesh Defines fine-grained access control policies in Istio. This example allows GET requests to paths starting with '/api/public/*' from the 'frontend' service account. ```yaml apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy metadata: name: api-policy spec: rules: - from: - source: principals: ["cluster.local/ns/default/sa/frontend"] to: - operation: methods: ["GET"] paths: ["/api/public/*"] ``` -------------------------------- ### Dockerfile Layer Caching Optimization for Faster Builds Source: https://archman.dev/docs/cloud-native-and-runtime-topology/platform-concerns/image-artifact-management This example demonstrates optimizing Docker build times using multi-stage builds and layer caching. The inefficient example copies all files and runs npm install and build, causing cache invalidation on any file change. The efficient example separates dependency installation (npm install) from code copying and building. This caches the dependency layer until 'package.json' changes, significantly speeding up rebuilds when only code is modified. ```dockerfile # ❌ Inefficient: Copies everything, invalidates layer on any file change COPY . /app RUN npm install && npm run build # ✅ Efficient: Cache dependencies separately from code FROM node:18-alpine AS builder COPY package*.json ./ RUN npm install COPY . ./ RUN npm run build FROM node:18-alpine COPY --from=builder /app/dist ./dist COPY --from=builder /app/node_modules ./node_modules CMD ["node", "dist/index.js"] # Benefits: # - npm install cached until package.json changes # - Code changes only rebuild code layer # - Rebuilds 5x faster after code-only change ``` -------------------------------- ### Raft System Initialization and Operation Flow (JavaScript) Source: https://archman.dev/docs/foundational-concepts/data-fundamentals/consistency-models/strong Sets up multiple Raft servers, establishes peer connections, simulates a leader election, and demonstrates client write operations followed by log replication and state reads. ```javascript // Main console.log("=== Strong Consistency with Raft ===\n"); const servers = Array.from({ length: 3 }, (_, i) => new RaftServer(`Server${i + 1}`)); for (const server of servers) { server.peers = servers.filter(s => s.serverId !== server.serverId); } // Simulate leader election and term update console.log("--- Election: Server1 becomes leader ---"); servers[0].becomeLeader(); servers[0].currentTerm = 1; for (const server of servers) { server.currentTerm = 1; } // Simulate client write and replication console.log("\n--- Client writes increment=5 ---"); servers[0].appendEntry("increment", 5); servers[0].applyLog(); // Apply to leader's state machine immediately for demo console.log("\n--- Replication to followers ---"); servers[0].replicateToPeer(servers[1]); servers[0].replicateToPeer(servers[2]); // Followers apply log after replication (simplified) servers[1].applyLog(); servers[2].applyLog(); console.log("\n--- Leader read ---"); const val = servers[0].readCounter(); console.log(`[Server1] Counter value: ${val}`); // Simulate another client write console.log("\n--- Client writes increment=3 ---"); servers[0].appendEntry("increment", 3); servers[0].applyLog(); // Apply to leader's state machine immediately for demo console.log("\n--- Replication to followers ---"); servers[0].replicateToPeer(servers[1]); servers[0].replicateToPeer(servers[2]); // Followers apply log after replication (simplified) servers[1].applyLog(); servers[2].applyLog(); console.log("\n--- Final state ---"); for (const server of servers) { console.log(`[${server.serverId}] ${server.getState()}`); } ``` -------------------------------- ### User Onboarding Service Example (Python) Source: https://archman.dev/docs/anti-patterns-and-pitfalls/god-object-god-class Demonstrates a UserOnboardingService coordinating multiple dependencies like authentication, payment, and notifications to create a new user. This example illustrates how a single service can orchestrate complex operations, highlighting the potential for refactoring if it grows too large. ```python from uuid import uuid4 class AuthService: def hash_password(self, password: str) -> str: pass class PaymentService: def charge(self, user_id: str, amount: float, card) -> object: pass class NotificationService: def send_welcome(self, user): pass class UserRepository: def save(self, user): pass class ActivityLogger: def log(self, user_id: str, event: str, details: dict): pass class Card: pass class User: def __init__(self, user_id: str, email: str, name: str): self.user_id = user_id self.email = email self.name = name self.password_hash = None class UserOnboardingService: def __init__(self, auth: AuthService, payment: PaymentService, notification: NotificationService, user_repo: UserRepository, logger: ActivityLogger): self.auth = auth self.payment = payment self.notification = notification self.user_repo = user_repo self.logger = logger def create_user(self, email: str, password: str, card: Card) -> User: # Each service handles its concern user = User(user_id=str(uuid4()), email=email, name=email.split('@')[0]) user.password_hash = self.auth.hash_password(password) self.user_repo.save(user) self.logger.log(user.user_id, 'USER_CREATED', {'email': email}) # Charge card transaction = self.payment.charge(user.user_id, 99.99, card) self.logger.log(user.user_id, 'PAYMENT_CHARGED', {'amount': 99.99}) # Send welcome email self.notification.send_welcome(user) self.logger.log(user.user_id, 'EMAIL_SENT', {'type': 'welcome'}) return user ``` -------------------------------- ### Validation Errors Example Source: https://archman.dev/docs/api-and-interface-design/restful-api-design/error-formats-and-problem-details Example of a structured response for field-level validation errors, providing specific details for each invalid field. ```APIDOC ## Validation Errors Clients need to understand which fields failed validation and why: ```json { "status": 422, "type": "https://api.example.com/errors/validation-failed", "title": "Validation Failed", "validation_errors": [ { "field": "email", "code": "invalid-format", "message": "Must be a valid email address" }, { "field": "age", "code": "out-of-range", "message": "Must be between 18 and 120" } ] } ``` ``` -------------------------------- ### Linux (Ubuntu/Debian) Local Environment Setup Source: https://archman.dev/docs/welcome/prerequisites Installs Node.js LTS, Docker Engine, and optional kind/kubectl for Linux (Ubuntu/Debian). Includes steps to add the user to the docker group and verifies installations. ```shell # Ubuntu/Debian example # Install Node.js LTS curl -fsSL https://deb.nodesource.com/setup_18.x | sudo -E bash - sudo apt-get install -y nodejs # Verify node --version npm --version # Install Docker Engine sudo apt-get update sudo apt-get install -y docker.io docker-compose # Add user to docker group (avoid sudo) sudo usermod -aG docker $USER newgrp docker # Verify docker --version # (Optional) Install kind curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64 chmod +x ./kind sudo mv ./kind /usr/local/bin/kind # (Optional) Install kubectl curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl # Test local k8s kind create cluster kubectl cluster-info kind delete cluster ``` -------------------------------- ### VM Sizing Example: Small E-commerce Shop (Text) Source: https://archman.dev/docs/cloud-native-and-runtime-topology/compute-models/vms An example of VM sizing for a small e-commerce shop, detailing traffic, recommended instance type, estimated cost, and utilization. This serves as a baseline for understanding resource allocation for low-traffic applications. ```text Traffic: 1000 requests/day Solution: 1 × t3.large (2 vCPU, 8GB RAM) Cost: ~$60/month Utilization: ~5% (lots of idle capacity, acceptable for small) ``` -------------------------------- ### VM Sizing and Capacity Planning Example Source: https://archman.dev/docs/cloud-native-and-runtime-topology/compute-models/vms Provides examples of VM sizing for different workload types (Small, Medium, Large) on AWS, including vCPU, RAM, typical use cases, cost estimates, and host density. It also illustrates capacity planning for a physical host, considering vCPU and RAM over-subscription. ```plaintext # Example: E-commerce platform Small VM (t3.medium - AWS): vCPU: 2 RAM: 4 GB Use: Stateless services, web frontends, light processing Cost: ~$0.04/hour Density: 8 per host (16 vCPU, 32 GB physical) Medium VM (t3.large - AWS): vCPU: 2 RAM: 8 GB Use: Database, caching, moderate workloads Cost: ~$0.08/hour Density: 4 per host Large VM (m5.2xlarge - AWS): vCPU: 8 RAM: 32 GB Use: High-traffic services, batch processing Cost: ~$0.38/hour Density: 2 per host (16 vCPU, 64 GB) # Capacity planning: # Physical Host: 16 vCPU, 256 GB RAM # Expected workload: 64 small VMs (2 vCPU, 4 GB each) # Over-subscription: 128:16 vCPU (8:1 - aggressive, OK for bursty) # Over-subscription RAM: 256:256 (1:1 - no over-subscription, safe) ``` -------------------------------- ### Composite Index Usage Example Source: https://archman.dev/docs/foundational-concepts/data-fundamentals/indexing-and-query-optimization Demonstrates how the order of columns in a composite index affects query performance. It shows good, mediocre, and bad query examples based on the index definition (category, price, rating). ```sql Index: (category, price, rating) Good query: SELECT * FROM products WHERE category='Electronics' AND price < 100 ORDER BY rating → Index is used: category filter, then price filter, then rating sort Mediocre query: SELECT * FROM products WHERE price < 100 AND category='Electronics' → Same result, index still used (optimizer may reorder), but less efficient if price is not the first filter Bad query: SELECT * FROM products WHERE rating > 4.5 → Index cannot be used for rating without first filtering by category (missing leading column) ``` -------------------------------- ### Arrange-Act-Assert Pattern Examples (Python) Source: https://archman.dev/docs/testing-strategy/testing-pyramid/unit Illustrates the Arrange-Act-Assert (AAA) pattern for structuring unit tests. Includes examples for calculating discounts on bulk orders and verifying no discount for small orders, showcasing setup, execution, and verification steps. ```python def test_calculate_discount_for_bulk_order(): # Arrange: Set up test data and mocks order = Order( items=[ OrderItem(price=100, quantity=10), # 1000 total OrderItem(price=50, quantity=5) # 250 total ] ) # Total: 1250 # Act: Execute the function under test discount = order.calculate_discount() # Assert: Verify the result assert discount == 125 # 10% for orders > 1000 def test_no_discount_for_small_order(): # Arrange order = Order(items=[OrderItem(price=50, quantity=1)]) # Total: 50 # Act discount = order.calculate_discount() # Assert assert discount == 0 # No discount for orders < 100 ``` -------------------------------- ### Replica Pool Initialization and Usage (JavaScript) Source: https://archman.dev/docs/data-architecture-and-persistence/performance-and-scale/read-replicas-and-fan-out Demonstrates how to initialize a ReplicaPool with primary and replica database configurations and perform write and read operations. It includes an example of reading after a write to ensure causal consistency. ```javascript const pool = new ReplicaPool( { host: 'db.primary', user: 'app', password: 'pwd', database: 'prod' }, [ { host: 'db.replica1', user: 'app', password: 'pwd', database: 'prod' }, { host: 'db.replica2', user: 'app', password: 'pwd', database: 'prod' }, ] ); await pool.initialize(); // Write const userId = await pool.write( 'INSERT INTO users (name) VALUES (?)', ['Alice'] ); // Read with causal consistency const user = await pool.readAfterWrite( 'SELECT * FROM users WHERE id = ?', [userId], Date.now() ); console.log('User:', user); ``` -------------------------------- ### ADR Supersession Metadata Example Source: https://archman.dev/docs/documentation-and-modeling/architecture-decision-records-adr/catalog-and-traceability An example snippet showing the metadata structure for a superseded ADR. It includes fields like `status`, `superseded_by`, `reason`, and `migration_deadline` to clearly indicate the ADR's status and guide migration efforts. This is crucial for maintaining architectural lineage and managing change. ```yaml status: Superseded superseded_by: ADR-0047 reason: In-memory sessions lost on pod restart; Redis provides durability migration_deadline: 2025-06-30 action_required: | - Migrate session code from InMemoryStore to RedisStore - Update configuration to point to Redis - Run integration tests ``` -------------------------------- ### Initialize and Run Inference Server in Go Source: https://archman.dev/docs/data-architecture-and-persistence/data-pipelines-and-analytics/ml-feature-stores-and-model-serving-architecture-level Sets up and starts an HTTP server for model inference. It initializes an online feature store, populates it with sample data, creates a model server instance, defines API routes for health checks and predictions, and starts listening on port 8080. ```go func main() { // Initialize online store featureStore := NewOnlineStore() // Populate with features (simulating sync from offline store) featureStore.SetFeature("total_spend", 1, 150.0, 2) featureStore.SetFeature("days_active", 1, 35, 1) featureStore.SetFeature("total_spend", 2, 250.0, 2) featureStore.SetFeature("days_active", 2, 42, 1) // Create inference server modelServer := NewModelServer(featureStore, "model-v1.2.0") // Routes http.HandleFunc("/health", modelServer.Health) http.HandleFunc("/predict", modelServer.Predict) fmt.Println("Starting inference server on :8080") http.ListenAndServe(":8080", nil) } ``` -------------------------------- ### Dependency Injection for Testability in Python Source: https://archman.dev/docs/foundational-concepts/programming-paradigms/object-oriented Contrasts tight coupling with dependency injection in Python. The BAD example shows a hard-wired StripeGateway, making testing difficult. The GOOD example injects the payment gateway, allowing for mock objects during testing, as shown in the test_place_order function. ```python import pytest # Assume StripeGateway, Order, and other necessary classes are defined # class StripeGateway: # def charge(self, amount): # print(f"Charging {amount}") # return {"status": "success"} # class Order: # def __init__(self, total_cents): # self.total = total_cents # Tight coupling (BAD) # class OrderService: # def __init__(self): # self.payment_gateway = StripeGateway() # Hard-wired dependency # def place_order(self, order): # self.payment_gateway.charge(order.total) # Dependency injection (GOOD) class OrderService: def __init__(self, payment_gateway): self.payment_gateway = payment_gateway def place_order(self, order): # Assume payment_gateway has a charge method return self.payment_gateway.charge(order.total) # Testable: inject mock for testing @pytest.fixture def mock_gateway(): class MockGateway: def charge(self, amount): print(f"Mock charging {amount}") return {"status": "success"} return MockGateway() def test_place_order(mock_gateway): service = OrderService(mock_gateway) # Assume Order class is defined order = Order(total_cents=9999) result = service.place_order(order) assert result["status"] == "success" ``` -------------------------------- ### Isolate Test State for Reliability (Python) Source: https://archman.dev/docs/anti-patterns-and-pitfalls/flaky-tests-and-non-determinism Addresses flaky tests caused by modifications to shared state between tests. It shows a 'bad' example of tests interfering with each other in a shared database and a 'good' example using setup and teardown methods to ensure each test runs with a clean, isolated database. ```python # BAD: Shared test database class TestUser: def test_create_user(self): User.create(name="Alice") users = User.all() assert len(users) == 1 # Fails if other tests left data def test_delete_user(self): user = User.all()[0] # Assumes previous test created exactly 1 user.delete() # Fails if test_create_user didn't run first # GOOD: Each test isolated class TestUser: def setup_method(self): db.create_all() # Fresh database def teardown_method(self): db.drop_all() # Clean up def test_create_user(self): User.create(name="Alice") users = User.all() assert len(users) == 1 # Always passes def test_delete_user(self): User.create(name="Bob") users = User.all() assert len(users) == 1 users[0].delete() users = User.all() assert len(users) == 0 # Always passes ``` -------------------------------- ### Setup and Teardown for Order Service Component Tests (Go) Source: https://archman.dev/docs/testing-strategy/testing-pyramid/component-integration This Go code sets up a PostgreSQL database using Testcontainers for integration testing the Order Service. It includes connecting to the database, running migrations, and initializing the service with a real repository and a mocked payment client. The `cleanup` function ensures the database container is terminated after the test. ```go type OrderServiceTest struct { db *sql.DB repository *OrderRepository paymentMock *MockPaymentClient orderService *OrderService container testcontainers.Container } func setupTest(t *testing.T) *OrderServiceTest { ctx := context.Background() // Start PostgreSQL container req := testcontainers.ContainerRequest{ Image: "postgres:14", ExposedPorts: []string{"5432/tcp"}, Env: map[string]string{ "POSTGRES_DB": "testdb", "POSTGRES_USER": "test", "POSTGRES_PASSWORD": "test", }, WaitingFor: wait.ForListeningPort("5432/tcp"), } container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }) require.NoError(t, err) // Get connection details host, err := container.Host(ctx) require.NoError(t, err) port, err := container.MappedPort(ctx, "5432") require.NoError(t, err) // Connect to test database connStr := fmt.Sprintf( "host=%s port=%s user=test password=test dbname=testdb sslmode=disable", host, port.Port()) db, err := sql.Open("postgres", connStr) require.NoError(t, err) // Run migrations err = runMigrations(db) require.NoError(t, err) // Setup service with real repository and mocked payment client repository := NewOrderRepository(db) paymentMock := NewMockPaymentClient() orderService := NewOrderService(repository, paymentMock) return &OrderServiceTest{ db: db, repository: repository, paymentMock: paymentMock, orderService: orderService, container: container, } } func (st *OrderServiceTest) cleanup(t *testing.T) { st.db.Close() ctx := context.Background() st.container.Terminate(ctx) } ``` -------------------------------- ### SLO Calculator Initialization and Examples Source: https://archman.dev/docs/observability-and-operations/alerting/slo-based-alerts-vs-static-thresholds Demonstrates the initialization of an SLO calculator for an API service with a 99.9% SLO and a 30-day window. It includes scenarios to analyze normal operations and the impact of an incident consuming the error budget. ```python # Example: API service with 99.9% SLO api_slo = SLOCalculator(slo_percentage=99.9, window_days=30) print("=== API Service SLO Analysis ===") print(f"SLO Target: {api_slo.slo_percentage}%") print(f"Error Budget: {api_slo.error_budget_percentage}%") print(f"Window: {api_slo.window_days} days") print() # Scenario 1: Normal operations print("Scenario 1: Normal Operations") print(f" Target error rate: {api_slo.error_budget_percentage}%") print(f" Allowed downtime: {api_slo.error_budget_seconds() / 60:.1f} minutes") print(f" Daily allocation: {api_slo.error_budget_seconds() / api_slo.window_days / 60:.1f} minutes") print() # Scenario 2: Incident consuming error budget quickly print("Scenario 2: Incident Day 1") seconds_consumed_day1 = 600 # 10 minutes downtime on day 1 remaining = api_slo.error_budget_remaining(seconds_consumed_day1) daily_budget = api_slo.error_budget_seconds() / api_slo.window_days burn_rate = api_slo.burn_rate(0, seconds_consumed_day1) print(f" Downtime on Day 1: {seconds_consumed_day1 / 60:.1f} minutes") print(f" Daily budget: {daily_budget / 60:.1f} minutes") print(f" Burn rate: {burn_rate:.1f}x") print(f" Remaining budget: {remaining / 60:.1f} minutes") print(f" Days until breach: {api_slo.days_until_breach(burn_rate):.1f}") print() ``` -------------------------------- ### Single Responsibility Principle (SRP) in Python Source: https://archman.dev/docs/foundational-concepts/programming-paradigms/object-oriented Illustrates the Single Responsibility Principle (SRP) by contrasting a BAD example where a User class handles validation, database saving, and email sending, with a GOOD example that separates these concerns into distinct classes: User, UserValidator, UserRepository, and EmailService. ```python # BAD: Multiple responsibilities # class User: # def __init__(self, name, email): # self.name = name # self.email = email # # def validate_email(self): # return "@" in self.email # # def save_to_database(self): # # Database code here # pass # # def send_welcome_email(self): # # Email code here # pass # GOOD: Single responsibility class User: def __init__(self, name, email): self.name = name self.email = email class UserValidator: def is_valid(self, user): return "@" in user.email class UserRepository: def save(self, user): # Database code print(f"Saving user {user.name} to database") pass class EmailService: def send_welcome(self, user): # Email code print(f"Sending welcome email to {user.email}") pass # Usage example: # user = User("Bob", "bob@example.com") # validator = UserValidator() # repository = UserRepository() # email_service = EmailService() # # if validator.is_valid(user): # repository.save(user) # email_service.send_welcome(user) # else: # print("Invalid user data") ``` -------------------------------- ### Cert-Manager: Install and Configure CA for mTLS Source: https://archman.dev/docs/cloud-native-and-runtime-topology/networking/network-policies-mtls This section details the steps to install Cert-Manager, create a root CA certificate, set up a self-signed issuer for bootstrapping, and configure a CA issuer for use across services. It also includes examples for generating certificates for the Order and Payment services. ```yaml # 1. Install cert-manager (via Helm or manifests) # helm repo add jetstack https://charts.jetstack.io # helm install cert-manager jetstack/cert-manager -n cert-manager --create-namespace --- # 2. Create root CA certificate apiVersion: cert-manager.io/v1 kind: Certificate metadata: name: internal-ca-cert namespace: cert-manager spec: commonName: "Internal CA" secretName: internal-ca-key-pair isCA: true issuerRef: name: selfsigned-issuer kind: Issuer --- # 3. Self-signed issuer for bootstrapping the CA apiVersion: cert-manager.io/v1 kind: Issuer metadata: name: selfsigned-issuer namespace: cert-manager spec: selfSigned: {} --- # 4. CA Issuer - used by all services apiVersion: cert-manager.io/v1 kind: Issuer metadata: name: internal-ca namespace: production spec: ca: secretRef: name: internal-ca-key-pair --- # 5. Certificate for Order Service apiVersion: cert-manager.io/v1 kind: Certificate metadata: name: order-service-cert namespace: production spec: secretName: order-service-tls duration: 2160h # 90 days renewBefore: 720h # Renew 30 days before expiry commonName: order-api.production.svc.cluster.local dnsNames: - order-api - order-api.production - order-api.production.svc - order-api.production.svc.cluster.local issuerRef: name: internal-ca kind: Issuer --- # 6. Certificate for Payment Service apiVersion: cert-manager.io/v1 kind: Certificate metadata: name: payment-service-cert namespace: production spec: secretName: payment-service-tls duration: 2160h renewBefore: 720h commonName: payment-api.production.svc.cluster.local dnsNames: - payment-api - payment-api.production - payment-api.production.svc - payment-api.production.svc.cluster.local issuerRef: name: internal-ca kind: Issuer ``` -------------------------------- ### Main Application: Initialize Broker and Services (Python) Source: https://archman.dev/docs/architectural-styles/event-driven-architecture-eda The main.py script sets up the event-driven system. It initializes the KafkaEventBroker, then instantiates the OrderService, PaymentProcessor, and NotificationProcessor, passing the broker to each. Finally, it triggers an order creation to start the event flow. ```python from event_broker import KafkaEventBroker from order_service import OrderService from payment_processor import PaymentProcessor from notification_processor import NotificationProcessor if __name__ == '__main__': # Initialize broker broker = KafkaEventBroker() # Initialize services (order source) order_service = OrderService(broker) # Initialize processors (event subscribers) payment_processor = PaymentProcessor(broker) notification_processor = NotificationProcessor(broker) # Create an order order_id = order_service.create_order( user_id=456, items=[ {'product_id': 1, 'price': 50.00, 'qty': 1}, {'product_id': 2, 'price': 25.00, 'qty': 2} ) # Output: # Published OrderCreated # Processing payment of $100.00 for order 12345 # Sending order confirmation email to user 456 # Published PaymentProcessed # Sending payment receipt for order 12345 ``` -------------------------------- ### Hypermedia-Driven API Workflow Example Source: https://archman.dev/docs/api-and-interface-design/restful-api-design/resources-representations-hateoas-optional Demonstrates a typical hypermedia-driven API interaction flow. The client starts at the root and discovers subsequent resources and actions by following links provided in the responses. This example illustrates resource discovery, collection retrieval, and specific resource interactions. ```json 1. Client starts at root GET / { "_links": { "users": { "href": "/users" }, "products": { "href": "/products" }, "search": { "href": "/search{?q}" } // URI Template } } 2. Client discovers users collection GET /users { "data": [ { "id": 1, "name": "Alice" } ], "_links": { "self": { "href": "/users" }, "create": { "href": "/users", "method": "POST" } } } 3. Client follows user link GET /users/1 { "id": 1, "name": "Alice", "_links": { "self": { "href": "/users/1" }, "orders": { "href": "/users/1/orders" }, "edit": { "href": "/users/1", "method": "PUT" }, "delete": { "href": "/users/1", "method": "DELETE" } } } 4. Client discovers orders GET /users/1/orders { "data": [ { "id": 1001, "total": 99.99 } ], "_links": { "self": { "href": "/users/1/orders" }, "create": { "href": "/users/1/orders", "method": "POST" } } } 5. Client discovers specific order GET /orders/1001 { "id": 1001, "total": 99.99, "_links": { "self": { "href": "/orders/1001" }, "user": { "href": "/users/1" }, "items": { "href": "/orders/1001/items" }, "cancel": { "href": "/orders/1001/cancel", "method": "POST" } } } ``` -------------------------------- ### Main Application Entry Point (Python) Source: https://archman.dev/docs/architectural-styles/microkernel-plugin The main script initializes the PluginRegistry, loads plugins from a specified directory, and then instantiates the RequestRouter. It demonstrates handling payment and notification requests using the router and shows the output of available plugins. ```python from core.plugin_registry import PluginRegistry from core.router import RequestRouter def main(): # Initialize core registry = PluginRegistry() # Auto-discover plugins from ./plugins directory registry.load_plugins("./plugins") print(f"Available plugins: {registry.list_plugins()}") # Output: ['payment_stripe', 'payment_paypal', 'notify_email', 'notify_sms'] # Create router with registry router = RequestRouter(registry) # Handle requests result1 = router.handle_payment_request("stripe", 99.99) print(f"Stripe payment: {result1}") result2 = router.handle_payment_request("paypal", 50.00) print(f"PayPal payment: {result2}") result3 = router.handle_notification("email", "user123", "Order shipped!") print(f"Email notification: {result3}") # Can add new plugins at runtime without restarting core # Or disable plugins by removing from directory if __name__ == '__main__': main() ```