### Clone Junjo Server Bare-Bones Template Source: https://python-api.junjo.ai/junjo_server This snippet demonstrates how to clone the Junjo Server Bare-Bones Template repository, navigate into the directory, configure environment variables by copying an example file, and start the Docker services. It also shows how to access the Junjo Server UI. ```bash git clone https://github.com/mdrideout/junjo-server-bare-bones.git cd junjo-server-bare-bones cp .env.example .env # Edit .env with your settings docker compose up -d open http://localhost:5153 ``` -------------------------------- ### Start Junjo Server Services with Docker Compose Source: https://python-api.junjo.ai/junjo_server This command sequence shows how to start the Junjo Server services defined in a Docker Compose file. It involves creating a .env file from an example, starting all defined services in detached mode using 'docker compose up -d', and accessing the web UI via a browser. ```bash # Create .env file (see Configuration section below) cp .env.example .env # Start all services docker compose up -d # Access the UI open http://localhost:5153 ``` -------------------------------- ### Integrate OpenInference for LLM Tracing Source: https://python-api.junjo.ai/junjo_server Installs the OpenInference instrumentation for Google Generative AI and integrates it with the established OpenTelemetry tracer provider. This enables detailed tracing of LLM calls, including prompts, responses, token usage, and parameters. ```shell # Install OpenInference instrumentation for your LLM provider pip install openinference-instrumentation-google-genai ``` -------------------------------- ### Basic Junjo Workflow Example Source: https://python-api.junjo.ai/getting_started A Python script demonstrating a basic Junjo workflow. It defines custom states, stores, nodes, and a graph, then executes the workflow. This example showcases node execution, conditional branching, and state updates within a workflow. ```python from junjo import BaseState, BaseStore, Condition, Edge, Graph, Node, Workflow # Run With # python -m main # uv run -m main async def main(): """The main entry point for the application.""" # Define the workflow state class SampleWorkflowState(BaseState): count: int | None = None # Does not need an initial state value items: list[str] # Does need an initial state value # Define the workflow store class SampleWorkflowStore(BaseStore[SampleWorkflowState]): # An immutable state update function async def set_count(self, payload: int) -> None: await self.set_state({"count": payload}) # Define the nodes class FirstNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: # Get the state and count the items state = await store.get_state() items = state.items count = len(items) # Perform a state update with the count await store.set_count(count) print(f"Counted {count} items") class EvenItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Final Node Executed") class CountIsEven(Condition[SampleWorkflowState]): def evaluate(self, state: SampleWorkflowState) -> bool: count = state.count if count is None: return False return count % 2 == 0 def create_graph() -> Graph: """ Factory function to create a new instance of the sample workflow graph. This ensures that each workflow execution gets a fresh, isolated graph, preventing state conflicts in concurrent environments. """ # Instantiate the nodes first_node = FirstNode() count_items_node = CountItemsNode() even_items_node = EvenItemsNode() odd_items_node = OddItemsNode() final_node = FinalNode() # Create the workflow graph return Graph( source=first_node, sink=final_node, edges=[ Edge(tail=first_node, head=count_items_node), # Branching based on the count of items Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), # Only transitions if count is even Edge(tail=count_items_node, head=odd_items_node), # Fallback if first condition is not met # Branched paths converge to the final node Edge(tail=even_items_node, head=final_node), Edge(tail=odd_items_node, head=final_node), ] ) # Create the workflow sample_workflow = Workflow[SampleWorkflowState, SampleWorkflowStore]( name="Getting Started Example Workflow", graph_factory=create_graph, store_factory=lambda: SampleWorkflowStore( initial_state=SampleWorkflowState( items=["laser", "coffee", "horse"] ) ) ) # Execute the workflow await sample_workflow.execute() print("Final state: ", await sample_workflow.get_state_json()) if __name__ == "__main__": import asyncio asyncio.run(main()) ``` -------------------------------- ### Install OpenTelemetry Packages Source: https://python-api.junjo.ai/junjo_server Installs the necessary OpenTelemetry SDK and gRPC exporter for Junjo Server integration. These packages are essential for enabling telemetry collection within your Python application. ```shell pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc ``` -------------------------------- ### Clone and Start Junjo Server Bare-Bones Template Source: https://python-api.junjo.ai/deployment This snippet demonstrates how to clone the Junjo Server bare-bones template from GitHub, configure environment variables, and start the services using Docker Compose. It's a recommended starting point for custom deployments. ```bash git clone https://github.com/mdrideout/junjo-server-bare-bones.git cd junjo-server-bare-bones cp .env.example .env # Edit .env with your settings docker compose up -d open http://localhost:5153 ``` -------------------------------- ### Minimal Junjo Server Docker Compose Configuration Source: https://python-api.junjo.ai/junjo_server This Docker Compose configuration defines the three services required for Junjo Server: backend, ingestion service, and frontend. It specifies Docker images, port mappings, volume mounts for persistent data, environment file usage, network configurations, and service dependencies. This allows integrating Junjo Server into an existing project. ```yaml services: junjo-server-backend: image: mdrideout/junjo-server-backend:latest ports: - "1323:1323" # HTTP API - "50053:50053" # Internal gRPC volumes: - ./.dbdata/sqlite:/dbdata/sqlite - ./.dbdata/duckdb:/dbdata/duckdb env_file: .env networks: - junjo-network junjo-server-ingestion: image: mdrideout/junjo-server-ingestion-service:latest ports: - "50051:50051" # OTel data ingestion (your app connects here) - "50052:50052" # Internal gRPC volumes: - ./.dbdata/badgerdb:/dbdata/badgerdb env_file: .env networks: - junjo-network junjo-server-frontend: image: mdrideout/junjo-server-frontend:latest ports: - "5153:80" # Web UI env_file: .env networks: - junjo-network depends_on: - junjo-server-backend - junjo-server-ingestion networks: junjo-network: driver: bridge ``` -------------------------------- ### Install Junjo Python Package Source: https://context7.com/context7/python-api_junjo_ai/llms.txt Instructions for installing the Junjo library using popular Python package managers: pip, poetry, and uv. Ensures the library is available for use in your project. ```bash # With pip pip install junjo # With poetry poetry add junjo # With uv uv add junjo ``` -------------------------------- ### Install Junjo Python Library Source: https://python-api.junjo.ai/getting_started Instructions for installing the Junjo Python library using common package managers like pip, poetry, and uv. These commands download and install the library and its dependencies into your Python environment. ```bash pip install junjo ``` ```bash poetry add junjo ``` ```bash uv add junjo ``` -------------------------------- ### Instrument Google Generative AI with OpenTelemetry Source: https://python-api.junjo.ai/junjo_server Applies the OpenInference Google Generative AI instrumentor to the OpenTelemetry tracer provider. This allows Junjo Server to capture and display rich LLM-specific telemetry data. ```python from openinference.instrumentation.google_genai import GoogleGenAIInstrumentor # After setting up OpenTelemetry tracer provider GoogleGenAIInstrumentor().instrument(tracer_provider=tracer_provider) ``` -------------------------------- ### Configure OpenTelemetry with Junjo Server Exporter Source: https://python-api.junjo.ai/junjo_server Sets up OpenTelemetry tracing and metrics providers, utilizing a custom Junjo Server exporter. It retrieves the API key from environment variables and configures the service name, host, and port for data ingestion. TLS should be enabled in production environments. ```python import os from junjo.telemetry.junjo_server_otel_exporter import JunjoServerOtelExporter from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource def init_telemetry(service_name: str): """Configure OpenTelemetry for Junjo Server.""" # Get API key from environment api_key = os.getenv("JUNJO_SERVER_API_KEY") if not api_key: raise ValueError("JUNJO_SERVER_API_KEY environment variable not set. " "Generate a new API key in the Junjo Server UI.") # Create OpenTelemetry resource resource = Resource.create({"service.name": service_name}) # Set up tracer provider tracer_provider = TracerProvider(resource=resource) # Configure Junjo Server exporter junjo_exporter = JunjoServerOtelExporter( host="localhost", # Junjo Server ingestion service host port="50051", # Port 50051 receives OpenTelemetry data api_key=api_key, insecure=True # Use False in production with TLS ) # Add span processor for tracing tracer_provider.add_span_processor(junjo_exporter.span_processor) trace.set_tracer_provider(tracer_provider) # (Optional) Set up metrics meter_provider = MeterProvider( resource=resource, metric_readers=[junjo_exporter.metric_reader] ) metrics.set_meter_provider(meter_provider) ``` -------------------------------- ### Initialize Telemetry in Python Application Source: https://python-api.junjo.ai/junjo_server Initializes the configured OpenTelemetry telemetry before executing workflows. This ensures that all subsequent operations, including workflow executions, are automatically traced and metrics are collected. ```python from otel_config import init_telemetry # Initialize telemetry init_telemetry(service_name="my-ai-workflow") # Execute your workflow - telemetry is automatic! await my_workflow.execute() ``` -------------------------------- ### Complete Junjo Workflow Example Source: https://context7.com/context7/python-api_junjo_ai/llms.txt A comprehensive example demonstrating the integration of various Junjo features within a single workflow. It includes state management, conditional logic, concurrent execution, subflows, and telemetry integration. ```python from junjo import ( BaseState, BaseStore, Node, Edge, Graph, Workflow, Condition, RunConcurrent, Subflow, HookManager ) import asyncio ``` -------------------------------- ### Instantiate and Execute Workflow Source: https://python-api.junjo.ai/_modules/junjo/workflow Demonstrates how to instantiate a Workflow with custom factories and hooks, and then execute it. This example highlights the use of generic type parameters for state and store management. ```python workflow = Workflow[MyGraphState, MyGraphStore]( name="demo_base_workflow", graph_factory=create_my_graph, store_factory=lambda: MyGraphStore(initial_state=MyGraphState()), hook_manager=HookManager(verbose_logging=False, open_telemetry=True), ) await workflow.execute() ``` -------------------------------- ### Install Graphviz System Dependency (macOS) Source: https://python-api.junjo.ai/visualizing_workflows Installs the Graphviz system dependency on macOS using the Homebrew package manager. This is a prerequisite for generating workflow diagrams. ```bash brew install graphviz ``` -------------------------------- ### Set Junjo Server API Key Environment Variable Source: https://python-api.junjo.ai/junjo_server This snippet shows how to set the JUNJO_SERVER_API_KEY environment variable in your terminal. This is a required configuration step after generating an API key from the Junjo Server UI, enabling your applications to authenticate with the server. ```bash export JUNJO_SERVER_API_KEY="your-api-key-here" ``` -------------------------------- ### Using Junjo Telemetry Initialization (Python) Source: https://python-api.junjo.ai/opentelemetry Example of how to import and use the `init_telemetry` function from the `otel_config` module to initialize OpenTelemetry tracing for a Junjo AI workflow. ```python from otel_config import init_telemetry # Initialize before running workflows init_telemetry(service_name="my-ai-workflow") # Execute workflows - automatic instrumentation await my_workflow.execute() ``` -------------------------------- ### Install Junjo Python Dependencies with Graphviz Source: https://python-api.junjo.ai/visualizing_workflows Installs the necessary Python dependencies for Junjo, including the optional 'graphviz' extras. This is required for Junjo's diagram generation capabilities. Assumes use of 'uv' and a 'pyproject.toml' with defined extras. ```bash uv pip install -e ".[dev,graphviz]" ``` -------------------------------- ### Python State and Store Management Setup Source: https://python-api.junjo.ai/_modules/junjo/store This Python code sets up the foundation for state and store management within the Junjo AI project. It imports necessary modules like `abc`, `asyncio`, `inspect`, `typing`, `jsonpatch`, and `opentelemetry`. It defines generic type variables `StateT`, `StoreT`, `ParentStateT`, and `ParentStoreT` for flexible state and store handling. This forms the basis for creating and managing application states and their associated stores, supporting asynchronous operations and tracing. ```python import abc import asyncio import inspect from collections.abc import Awaitable, Callable from types import NoneType from typing import Generic, TypeVar import jsonpatch from opentelemetry import trace from pydantic import ValidationError from .state import BaseState from .util import generate_safe_id # State / Store StateT = TypeVar("StateT", bound=BaseState) StoreT = TypeVar("StoreT", bound="BaseStore") # Parent State / Store ParentStateT = TypeVar("ParentStateT", bound="BaseState | NoneType") ParentStoreT = TypeVar("ParentStoreT", bound="BaseStore | NoneType") ``` -------------------------------- ### Complete OpenTelemetry Configuration for Junjo (Python) Source: https://python-api.junjo.ai/opentelemetry A Python script demonstrating a complete OpenTelemetry setup for Junjo AI, including initializing the tracer provider, configuring the Junjo Server OTel exporter, and setting up resource attributes. It requires the JUNJO_SERVER_API_KEY environment variable. ```python import os from junjo.telemetry.junjo_server_otel_exporter import JunjoServerOtelExporter from opentelemetry import trace, metrics from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource def init_telemetry(service_name: str): """Configure OpenTelemetry with Junjo Server.""" # Get API key and determine environment api_key = os.getenv("JUNJO_SERVER_API_KEY") if not api_key: raise ValueError("JUNJO_SERVER_API_KEY environment variable not set") is_production = os.getenv("ENV", "development") == "production" # Create resource resource = Resource.create({ "service.name": service_name, "service.version": "1.0.0", "deployment.environment": os.getenv("ENV", "development") }) # Set up tracer provider tracer_provider = TracerProvider(resource=resource) # Configure Junjo Server exporter junjo_exporter = JunjoServerOtelExporter( host="localhost", # Junjo Server ingestion service port="50051", # gRPC port for receiving spans api_key=api_key, insecure=not is_production # True for local dev, False for production ) # Add span processor tracer_provider.add_span_processor(junjo_exporter.span_processor) # Set as global tracer provider trace.set_tracer_provider(tracer_provider) print(f"OpenTelemetry configured for {service_name}") print(f"Sending traces to Junjo Server at localhost:50051") ``` -------------------------------- ### Python Basic Workflow Setup with Junjo Source: https://context7.com/context7/python-api_junjo_ai/llms.txt Demonstrates setting up a basic AI workflow using Junjo. This includes defining state schemas with Pydantic, creating nodes for specific tasks, implementing conditional edges based on state, and executing the workflow. It showcases state management and graph composition. ```python from junjo import BaseState, BaseStore, Condition, Edge, Graph, Node, Workflow import asyncio # Define state schema with Pydantic class SampleWorkflowState(BaseState): count: int | None = None items: list[str] # Define store with state update methods class SampleWorkflowStore(BaseStore[SampleWorkflowState]): async def set_count(self, payload: int) -> None: await self.set_state({"count": payload}) # Define workflow nodes class FirstNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: state = await store.get_state() items = state.items count = len(items) await store.set_count(count) print(f"Counted {count} items") class EvenItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Final Node Executed") # Define conditional edge logic class CountIsEven(Condition[SampleWorkflowState]): def evaluate(self, state: SampleWorkflowState) -> bool: count = state.count if count is None: return False return count % 2 == 0 # Create graph factory for concurrency safety def create_graph() -> Graph: first_node = FirstNode() count_items_node = CountItemsNode() even_items_node = EvenItemsNode() odd_items_node = OddItemsNode() final_node = FinalNode() return Graph( source=first_node, sink=final_node, edges=[ Edge(tail=first_node, head=count_items_node), Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), Edge(tail=count_items_node, head=odd_items_node), # Fallback Edge(tail=even_items_node, head=final_node), Edge(tail=odd_items_node, head=final_node), ] ) # Execute workflow async def main(): workflow = Workflow[SampleWorkflowState, SampleWorkflowStore]( name="Getting Started Example Workflow", graph_factory=create_graph, store_factory=lambda: SampleWorkflowStore( initial_state=SampleWorkflowState(items=["laser", "coffee", "horse"]) ) ) await workflow.execute() print("Final state:", await workflow.get_state_json()) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Caddyfile for Reverse Proxy Setup Source: https://python-api.junjo.ai/docker_reference This Caddyfile configuration sets up reverse proxies for the Junjo Server's Web UI, Backend API, and Ingestion gRPC endpoint, enabling automatic HTTPS and subdomain routing. ```caddy # Web UI junjo.example.com { reverse_proxy junjo-server-frontend:80 } # Backend API api.junjo.example.com { reverse_proxy junjo-server-backend:1323 } # Ingestion gRPC grpc.junjo.example.com { reverse_proxy h2c://junjo-server-ingestion:50051 } ``` -------------------------------- ### Subflow Pre and Post Actions Example in Python Source: https://python-api.junjo.ai/_modules/junjo/workflow Demonstrates how to implement pre_run_actions to pass initial state from a parent store to a subflow and post_run_actions to update the parent store with subflow results. This example utilizes generic type parameters for type safety. ```python class ExampleSubflow(Subflow[SubflowState, SubflowStore, ParentState, ParentStore]): async def pre_run_actions(self, parent_store): parent_state = await parent_store.get_state() await self.store.set_parameter({ "parameter": parent_state.parameter }) async def post_run_actions(self, parent_store): async def post_run_actions(self, parent_store): sub_flow_state = await self.get_state() await parent_store.set_subflow_result(self, sub_flow_state.result) # Instantiate the subflow example_subflow = ExampleSubflow( graph_factory=create_example_subflow_graph, store_factory=lambda: ExampleSubflowStore( initial_state=ExampleSubflowState() ), ) ``` -------------------------------- ### Configure Honeycomb Exporter - Python Source: https://python-api.junjo.ai/opentelemetry This Python example demonstrates how to configure an OpenTelemetry exporter for Honeycomb. It uses the OTLPSpanExporter via HTTP and requires your Honeycomb API key in the headers for authentication. ```python # Example: Honeycomb from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter honeycomb_exporter = OTLPSpanExporter( endpoint="https://api.honeycomb.io/v1/traces", headers={"x-honeycomb-team": ""} ) tracer_provider.add_span_processor(BatchSpanProcessor(honeycomb_exporter)) ``` -------------------------------- ### Junjo AI Sample Workflow Test Example (Gemini) Source: https://python-api.junjo.ai/eval_driven_dev An example demonstrating the evaluation of a Junjo joke creation node using Gemini. It combines assertions and live LLM evaluations against several test inputs. Failures provide reasons for prompt issues. ```python # src/base/sample_workflow/sample_subflow/nodes/create_joke_node/test/test_cases.py from .node import create_joke_node from .prompt import joke_evaluation_prompt # Example test cases test_cases = [ { "input": "Tell me a joke about computers.", "expected_evaluation": "The joke should be relevant to computers and humorous." }, { "input": "Tell me a joke about cats.", "expected_evaluation": "The joke should be about cats and funny." } ] def test_joke_generation(): for case in test_cases: input_text = case["input"] expected_eval = case["expected_evaluation"] # Simulate node execution (actual implementation would call Junjo) generated_joke = create_joke_node(input_text) # Simulate LLM evaluation (using Gemini in this example) evaluation_result = evaluate_with_gemini(generated_joke, expected_eval, joke_evaluation_prompt) assert evaluation_result == "PASS", f"Failed on input: {input_text}. Reason: {get_failure_reason(evaluation_result)}" def evaluate_with_gemini(joke, criteria, prompt): # Placeholder for actual Gemini API call and evaluation logic print(f"Evaluating joke: {joke} against criteria: {criteria}") # This would involve calling the Gemini API and parsing its response return "PASS" # or "FAIL" based on LLM evaluation def get_failure_reason(evaluation_result): # Placeholder for logic to extract failure reason from evaluation result return "LLM evaluation failed criteria." ``` ```python # src/base/sample_workflow/sample_subflow/nodes/create_joke_node/test/test_prompt.py joke_evaluation_prompt = """ Evaluate the following joke based on the provided criteria. Respond with 'PASS' or 'FAIL' and a brief explanation. Joke: {joke} Criteria: {criteria} Prompt: """ ``` ```python # src/base/sample_workflow/sample_subflow/nodes/create_joke_node/test/test_node.py import pytest from src.base.sample_workflow.sample_subflow.nodes.create_joke_node.test.test_cases import test_joke_generation def test_create_joke_workflow(): test_joke_generation() ``` ```python # src/base/sample_workflow/sample_subflow/nodes/create_joke_node/node.py import google.generativeai as genai # Configure Gemini API (replace with your actual API key setup) genai.configure(api_key="YOUR_GEMINI_API_KEY") def create_joke_node(input_text): # Placeholder for the actual LLM call to generate a joke # In a real scenario, this would use the LLM configured in Junjo print(f"Generating joke for input: {input_text}") # model = genai.GenerativeModel('gemini-pro') # response = model.generate_content(f"Tell me a joke about {input_text.split('about ')[-1]}.") # return response.text return "Why did the computer go to therapy? Because it had too many bytes!" ``` ```python # src/base/sample_workflow/sample_subflow/nodes/create_joke_node/test/test_schema.py # Placeholder for schema definitions that might describe expected failure reasons class FailureReasonSchema: def __init__(self, reason_code, description): self.reason_code = reason_code self.description = description expected_failure_reasons = { "IRRELEVANT_JOKE": FailureReasonSchema("IRRELEVANT_JOKE", "The generated joke is not relevant to the input topic."), "NOT_HUMOROUS": FailureReasonSchema("NOT_HUMOROUS", "The generated joke is not perceived as humorous.") } ``` -------------------------------- ### OpenTelemetry Context Propagation Setup (Python) Source: https://python-api.junjo.ai/opentelemetry Shows how to configure context propagation in OpenTelemetry using the B3 multi-format propagator. This allows trace context to be correctly transmitted across different services. ```python from opentelemetry import propagate from opentelemetry.propagators.b3 import B3MultiFormat # Use B3 propagation format propagate.set_global_textmap(B3MultiFormat()) ``` -------------------------------- ### OpenTelemetry Sampling Configuration (Python) Source: https://python-api.junjo.ai/opentelemetry Demonstrates how to configure trace sampling in OpenTelemetry by using `TraceIdRatioBased` sampler with a Python `TracerProvider`. This example sets sampling to 10% of traces. ```python from opentelemetry.sdk.trace.sampling import TraceIdRatioBased # Sample 10% of traces tracer_provider = TracerProvider( resource=resource, sampler=TraceIdRatioBased(0.1) ) ``` -------------------------------- ### Implement Workflow Nodes Source: https://python-api.junjo.ai/tutorial Defines various nodes (FirstNode, CountItemsNode, EvenItemsNode, OddItemsNode, FinalNode) that inherit from the Junjo Node class. Each node has a service method that executes the node's logic, interacting with the store to get or set state. ```python from junjo import Node class FirstNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: state = await store.get_state() items = state.items count = len(items) await store.set_count(count) print(f"Counted {count} items") class EvenItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[SampleWorkflowStore]): async def service(self, store: SampleWorkflowStore) -> None: print("Final Node Executed") ``` -------------------------------- ### Configure Multiple OpenTelemetry Exporters - Python Source: https://python-api.junjo.ai/opentelemetry This comprehensive Python example shows how to configure and use multiple OpenTelemetry exporters simultaneously. It sets up both the Junjo Server exporter and a Jaeger exporter, adding them to the same TracerProvider before setting it as the global provider. This allows telemetry data to be sent to different observability platforms concurrently. ```python from junjo.telemetry.junjo_server_otel_exporter import JunjoServerOtelExporter from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.resources import Resource from opentelemetry import trace # Create resource resource = Resource.create({"service.name": "my-workflow"}) # Set up tracer provider tracer_provider = TracerProvider(resource=resource) # Add Junjo Server exporter junjo_exporter = JunjoServerOtelExporter( host="localhost", # Junjo Server ingestion service port="50051", # gRPC port for receiving spans api_key=junjo_api_key, insecure=True # Use False in production with TLS ) tracer_provider.add_span_processor(junjo_exporter.span_processor) # Also send to Jaeger jaeger_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317") tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) # Set as global tracer provider trace.set_tracer_provider(tracer_provider) ``` -------------------------------- ### Docker Compose Development Configuration Source: https://python-api.junjo.ai/docker_reference This configuration provides a simplified Docker Compose setup for local development of the Junjo AI project. It includes the same three services but uses development-specific environment variables and local volume paths. It also defines a bridge network for inter-service communication. ```yaml services: junjo-server-backend: image: mdrideout/junjo-server-backend:latest ports: - "1323:1323" - "50053:50053" volumes: - ./.dbdata/sqlite:/dbdata/sqlite - ./.dbdata/duckdb:/dbdata/duckdb environment: - JUNJO_ENV=development - JUNJO_ALLOW_ORIGINS=http://localhost:5153 networks: - junjo-network junjo-server-ingestion: image: mdrideout/junjo-server-ingestion-service:latest ports: - "50051:50051" - "50052:50052" volumes: - ./.dbdata/badgerdb:/dbdata/badgerdb networks: - junjo-network junjo-server-frontend: image: mdrideout/junjo-server-frontend:latest ports: - "5153:80" environment: - VITE_API_URL=http://localhost:1323 networks: - junjo-network networks: junjo-network: driver: bridge ``` -------------------------------- ### Define and Instantiate a Workflow Graph in Python Source: https://python-api.junjo.ai/_modules/junjo/graph This example demonstrates how to define a directed workflow graph using the `Graph` class from the `junjo` library. It includes setting up nodes, edges with conditions, and a data store for managing workflow state. The graph defines a sequence of operations with conditional branching. ```python from junjo import Node, Edge, Graph, BaseStore, Condition, BaseState # Define a simple state (can be more complex in real scenarios) class MyWorkflowState(BaseState): count: int | None = None # Define a simple store class MyWorkflowStore(BaseStore[MyWorkflowState]): async def set_count(self, payload: int) -> None: await self.set_state({"count": payload}) # Define some simple nodes class FirstNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("First Node Executed") class CountItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: # In a real scenario, you might get items from state and count them await store.set_count(5) # Example count print("Counted items") class EvenItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for even items count.") class OddItemsNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Path taken for odd items count.") class FinalNode(Node[MyWorkflowStore]): async def service(self, store: MyWorkflowStore) -> None: print("Final Node Executed") # Define a condition class CountIsEven(Condition[MyWorkflowState]): def evaluate(self, state: MyWorkflowState) -> bool: if state.count is None: return False return state.count % 2 == 0 # Instantiate the nodes first_node = FirstNode() count_items_node = CountItemsNode() even_items_node = EvenItemsNode() odd_items_node = OddItemsNode() final_node = FinalNode() # Create the workflow graph workflow_graph = Graph( source=first_node, sink=final_node, edges=[ Edge(tail=first_node, head=count_items_node), Edge(tail=count_items_node, head=even_items_node, condition=CountIsEven()), Edge(tail=count_items_node, head=odd_items_node), # Fallback Edge(tail=even_items_node, head=final_node), Edge(tail=odd_items_node, head=final_node), ] ) ``` -------------------------------- ### Junjo AI Backend Environment Variables Reference Source: https://python-api.junjo.ai/docker_reference This section details the environment variables required and optionally used by the Junjo AI backend service. It covers settings for environment mode, CORS origins, session secrets, authentication domains, and build targets. Specific examples and generation methods are provided for clarity. ```env JUNJO_ENV **Required.** Environment mode: `production` or `development` * **Default:** None * **Example:** `JUNJO_ENV=production` JUNJO_ALLOW_ORIGINS **Required.** Comma-separated list of allowed CORS origins for API requests * **Default:** None * **Example:** `JUNJO_ALLOW_ORIGINS=https://junjo.example.com,https://www.example.com` JUNJO_SESSION_SECRET **Required for production.** Secret key for session management and JWT tokens * **Default:** None * **Example:** `JUNJO_SESSION_SECRET=a1b2c3d4e5f6...` (64 character hex string) * **Generate with:** `openssl rand -hex 32` JUNJO_PROD_AUTH_DOMAIN **Required for production.** The primary domain for authentication cookies * **Default:** None * **Example:** `JUNJO_PROD_AUTH_DOMAIN=junjo.example.com` JUNJO_BUILD_TARGET **Optional.** Build target for development purposes * **Default:** `production` * **Example:** `JUNJO_BUILD_TARGET=development` ``` -------------------------------- ### Instantiate ExampleSubflow in Python Source: https://python-api.junjo.ai/api Demonstrates how to instantiate an ExampleSubflow, providing a graph factory and a store factory. The store factory initializes the subflow's state. ```python example_subflow = ExampleSubflow( graph_factory=create_example_subflow_graph, store_factory=lambda: ExampleSubflowStore( initial_state=ExampleSubflowState() ), ) ``` -------------------------------- ### Python: Export Spans to Junjo Server and Jaeger Source: https://python-api.junjo.ai/junjo_server This Python snippet demonstrates how to configure OpenTelemetry to export spans to both Junjo Server and Jaeger simultaneously. It uses standard OTLP exporters for broad compatibility, ensuring Junjo-specific attributes are included. ```python from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor # Assuming tracer_provider and api_key are already defined # Junjo Server junjo_exporter = JunjoServerOtelExporter( host="localhost", port="50051", api_key=api_key, insecure=True ) tracer_provider.add_span_processor(junjo_exporter.span_processor) # Also send to Jaeger jaeger_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317") tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter)) ``` -------------------------------- ### Create and Execute Junjo Workflow Source: https://python-api.junjo.ai/tutorial An asynchronous main function that creates a Junjo Workflow instance, providing a graph factory and a store factory with initial state. It then executes the workflow and prints the final state. ```python from junjo import Workflow import asyncio async def main(): # Create the workflow sample_workflow = Workflow[SampleWorkflowState, SampleWorkflowStore]( name="Getting Started Example Workflow", graph_factory=create_graph, store_factory=lambda: SampleWorkflowStore( initial_state=SampleWorkflowState( items=["laser", "coffee", "horse"] ) ) ) # Execute the workflow await sample_workflow.execute() print("Final state: ", await sample_workflow.get_state_json()) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### BaseStore Class Documentation Source: https://python-api.junjo.ai/_modules/junjo/store Details about the BaseStore class, its initialization, and core functionalities. ```APIDOC ## BaseStore Class ### Description BaseStore is a generic class for managing workflow state immutably and safely in a concurrent environment. It supports Pydantic models for state, allows subscription to state changes, and ensures thread-safe updates. ### Initialization ```python BaseStore(initial_state: StateT) ``` #### Parameters - **initial_state** (StateT) - Required - The initial state of the store, based on a Pydantic model. ### Methods #### `id` Property ##### Description Returns the unique identifier of the store instance. ##### Method GET ##### Endpoint `/id` (as a property of the store instance) #### `subscribe` Method ##### Description Registers a listener (sync or async callable) to be notified when the state changes. Returns an asynchronous function to unsubscribe the listener. ##### Method POST ##### Endpoint `/subscribe` ##### Parameters - **listener** (Subscriber) - Required - The callable function to subscribe to state changes. ##### Request Body Example ```json { "listener": "your_callable_function" } ``` ##### Response - **unsubscribe** (Callable[[], Awaitable[None]]) - A function to asynchronously unsubscribe the listener. ##### Response Example ```json { "message": "Successfully subscribed. Use the returned function to unsubscribe." } ``` #### `get_state` Method ##### Description Retrieves a shallow copy of the current state, adhering to the immutability principle. ##### Method GET ##### Endpoint `/state` ##### Response - **state** (StateT) - A copy of the current state object. ##### Response Example ```json { "state": { ... your state object ... } } ``` #### `get_state_json` Method ##### Description Retrieves the current state as a JSON string. ##### Method GET ##### Endpoint `/state/json` ##### Response - **state_json** (str) - The current state represented as a JSON string. ##### Response Example ```json { "state_json": "{ \"field1\": \"value1\", \"field2\": 123 }" } ``` #### `set_state` Method ##### Description Updates the store's state with a dictionary of changes. This performs an immutable update, merges the current state with the provided updates, validates the changes, and notifies subscribers if the state has been modified. ##### Method PUT ##### Endpoint `/state` ##### Parameters - **update** (dict) - Required - A dictionary containing the fields and their new values to update the state. ##### Request Body Example ```json { "update": { "field_to_update": "new_value", "another_field": 456 } } ``` ##### Response None (typically returns 200 OK on success) ##### Example Usage in Subclass ```python # Assuming MessageWorkflowState and MessageWorkflowStore are defined as in the documentation # payload = Message(...) # await store.set_received_message(payload) # This internally calls set_state ``` ### Error Handling - Concurrency issues are managed using `asyncio.Lock`. - State validation is performed against the Pydantic model. - Exceptions during validation or updates will propagate to the caller. ``` -------------------------------- ### Get State as JSON Source: https://python-api.junjo.ai/_modules/junjo/store Returns the current state serialized as a JSON string. This is useful for external integrations or logging purposes. ```python async def get_state_json(self) -> str: """ Return the current state as a JSON string. """ async with self._lock: return self._state.model_dump_json() ``` -------------------------------- ### Get Next Node Source: https://python-api.junjo.ai/api Retrieves the next node in a workflow graph based on the current node and edge conditions, utilizing a provided store for resolution. ```APIDOC ## async get_next_node ### Description Retrieves the next node (or workflow / subflow) in the graph for the given current node. This method checks the edges connected to the current node and resolves the next node based on the conditions defined in the edges. ### Method `async get_next_node` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```python # Assuming 'store' is a BaseStore instance and 'current_node' is a Node or NestableWorkflow next_node = await get_next_node(store, current_node) ``` ### Response #### Success Response (200) - **Node | _NestableWorkflow** - The next node or subflow in the graph. #### Response Example ```json { "node_type": "Node", "node_name": "NextStep" } ``` ``` -------------------------------- ### Mount Block Storage for Production Data Source: https://python-api.junjo.ai/docker_reference This script demonstrates how to mount block storage for persistent data in a production environment. It includes formatting the disk, creating a mount point, mounting the storage, and setting up the database directories with correct ownership. ```bash # Example: Mount block storage sudo mkfs.ext4 /dev/disk/by-id/scsi-0DO_Volume_junjo-data sudo mkdir -p /mnt/junjo-data sudo mount -o defaults,nofail /dev/disk/by-id/scsi-0DO_Volume_junjo-data /mnt/junjo-data # Create database directories sudo mkdir -p /mnt/junjo-data/{sqlite,duckdb,badgerdb} sudo chown -R $USER:$USER /mnt/junjo-data ``` -------------------------------- ### Get Current State Source: https://python-api.junjo.ai/_modules/junjo/store Retrieves a shallow copy of the current state, adhering to the immutability principle. This prevents external modifications to the internal state of the store. ```python async def get_state(self) -> StateT: """ Return a shallow copy of the current state. (Follows immutability principle) """ async with self._lock: # Return a separate copy of the Pydantic model so outside code doesn't mutate the store return self._state.model_copy() ``` -------------------------------- ### Create Local Database Directories Source: https://python-api.junjo.ai/docker_reference This command creates the necessary directories for storing database files locally. It uses `mkdir -p` to create parent directories as needed and `chmod` to set appropriate permissions for the user. ```bash mkdir -p dbdata/{sqlite,duckdb,badgerdb} chmod -R 755 dbdata ``` -------------------------------- ### Custom Resource Attributes for OpenTelemetry (Python) Source: https://python-api.junjo.ai/opentelemetry Example of creating an OpenTelemetry `Resource` object with custom attributes, such as 'team.name' and 'custom.attribute', to enrich all telemetry data originating from the service. ```python resource = Resource.create({ "service.name": "my-workflow", "service.version": "2.0.0", "deployment.environment": "production", "team.name": "ai-team", "custom.attribute": "value" }) ```