### Orchestrator Initialization Source: https://context7.com/tangleml/tangle/llms.txt Python code example demonstrating how to set up and run the TangleML orchestrator with launchers and storage configurations. ```python import pathlib import docker from sqlalchemy import orm from cloud_pipelines.orchestration.storage_providers import local_storage from cloud_pipelines_backend import orchestrator_sql, database_ops from cloud_pipelines_backend.launchers import local_docker_launchers # Setup paths data_root = pathlib.Path("data") artifacts_dir = data_root / "artifacts" logs_dir = data_root / "logs" artifacts_dir.mkdir(parents=True, exist_ok=True) logs_dir.mkdir(parents=True, exist_ok=True) # Configure database database_uri = f"sqlite:///{data_root}/db.sqlite" db_engine = database_ops.create_db_engine_and_migrate_db(database_uri) # Configure storage provider storage_provider = local_storage.LocalStorageProvider() # Configure Docker launcher docker_client = docker.DockerClient.from_env(timeout=5) launcher = local_docker_launchers.DockerContainerLauncher( client=docker_client, ) # Create orchestrator session_factory = orm.sessionmaker( autocommit=False, autoflush=False, bind=db_engine ) orchestrator = orchestrator_sql.OrchestratorService_Sql( session_factory=session_factory, launcher=launcher, storage_provider=storage_provider, data_root_uri=artifacts_dir.as_posix(), logs_root_uri=logs_dir.as_posix(), default_task_annotations={}, sleep_seconds_between_queue_sweeps=1.0, ) # Run orchestrator loop (blocks indefinitely) orchestrator.run_loop() ``` -------------------------------- ### POST /api/pipeline_runs/ Source: https://context7.com/tangleml/tangle/llms.txt Submit a new pipeline for execution. This endpoint allows users to define and start a new ML pipeline run. ```APIDOC ## POST /api/pipeline_runs/ ### Description Submit a new pipeline for execution. This endpoint allows users to define and start a new ML pipeline run. ### Method POST ### Endpoint /api/pipeline_runs/ ### Parameters #### Request Body - **root_task** (object) - Required - The root task of the pipeline, defining its component and arguments. - **component_ref** (object) - Required - Reference to the component to be executed. - **spec** (object) - Required - Specification of the component. - **name** (string) - Required - The name of the component. - **inputs** (array) - Optional - List of component inputs. - **name** (string) - Required - Input name. - **type** (string) - Required - Input type (e.g., "Dataset", "Float"). - **default** (string) - Optional - Default value for the input. - **outputs** (array) - Optional - List of component outputs. - **name** (string) - Required - Output name. - **type** (string) - Required - Output type (e.g., "Model"). - **implementation** (object) - Required - How the component is implemented. - **container** (object) - Required - Containerized implementation details. - **image** (string) - Required - Docker image to use. - **command** (array of strings) - Optional - Command to execute inside the container. - **args** (array) - Optional - Arguments for the command, can reference inputs or outputs. - **arguments** (object) - Required - Arguments for the root task. - **[argument_name]** (string) - Required - Value for the argument (e.g., data path, hyperparameter). - **annotations** (object) - Optional - Metadata for the pipeline run. - **[key]** (string) - Optional - Annotation key. - **[value]** (string) - Optional - Annotation value. ### Request Example ```json { "root_task": { "component_ref": { "spec": { "name": "train_model", "inputs": [ {"name": "training_data", "type": "Dataset"}, {"name": "learning_rate", "type": "Float", "default": "0.01"} ], "outputs": [ {"name": "model", "type": "Model"} ], "implementation": { "container": { "image": "python:3.9", "command": ["python", "train.py"], "args": [ "--data", {"input_path": {"input_name": "training_data"}}, "--lr", {"input_value": {"input_name": "learning_rate"}}, "--output", {"output_path": {"output_name": "model"}} ] } } } }, "arguments": { "training_data": "s3://bucket/data.csv", "learning_rate": "0.001" } }, "annotations": {"experiment": "baseline", "version": "1.0"} } ``` ### Response #### Success Response (200) - **id** (string) - Unique identifier for the pipeline run. - **root_execution_id** (string) - ID of the root execution task. - **created_at** (string) - Timestamp when the pipeline run was created. - **created_by** (string) - Username of the user who created the run. - **annotations** (object) - Annotations associated with the pipeline run. #### Response Example ```json { "id": "a1b2c3d4e5f6g7h8i9j0", "root_execution_id": "exec123456789", "created_at": "2025-01-15T10:30:00Z", "created_by": "admin", "annotations": {"experiment": "baseline", "version": "1.0"} } ``` ``` -------------------------------- ### Initialize FastAPI Server with Database and Authentication - Python Source: https://context7.com/tangleml/tangle/llms.txt Sets up the FastAPI application, configures the database connection using SQLAlchemy (supporting SQLite and MySQL), and defines a placeholder function for user authentication. This is the initial setup for the Tangle API server. ```python import fastapi from cloud_pipelines_backend import api_router, database_ops # Create FastAPI application app = fastapi.FastAPI( title="Cloud Pipelines API", version="0.0.1", separate_input_output_schemas=False, ) # Configure database connection database_uri = "sqlite:///db.sqlite" # or MySQL: "mysql://user:pass@host/db" db_engine = database_ops.create_db_engine_and_migrate_db( database_uri=database_uri, ) # Configure authentication (placeholder for production OAuth) def get_user_details(request: fastapi.Request): return api_router.UserDetails( name="admin", permissions=api_router.Permissions( read=True, write=True, admin=True, ), ) # Setup API routes api_router.setup_routes( app=app, db_engine=db_engine, user_details_getter=get_user_details, default_component_library_owner_username="admin", ) # Start server: uvicorn api_server_main:app --host 0.0.0.0 --port 8000 ``` -------------------------------- ### Get User Pinned Libraries (curl) Source: https://context7.com/tangleml/tangle/llms.txt Fetches the list of component libraries that the current user has pinned for quick access. This allows users to customize their library view. ```curl curl "http://localhost:8000/api/component_library_pins/me/" ``` -------------------------------- ### GET /api/pipeline_runs/{pipeline_run_id} Source: https://context7.com/tangleml/tangle/llms.txt Get specific pipeline run details. Retrieves detailed information about a single pipeline run. ```APIDOC ## GET /api/pipeline_runs/{pipeline_run_id} ### Description Get specific pipeline run details. Retrieves detailed information about a single pipeline run. ### Method GET ### Endpoint /api/pipeline_runs/{pipeline_run_id} ### Parameters #### Path Parameters - **pipeline_run_id** (string) - Required - The unique identifier of the pipeline run to retrieve. ### Response #### Success Response (200) - **id** (string) - Unique identifier for the pipeline run. - **root_execution_id** (string) - ID of the root execution task. - **execution_stats** (object) - Statistics on the current state of pipeline executions. - **[STATUS]** (integer) - Count of executions in a given status (e.g., SUCCEEDED, RUNNING, PENDING, FAILED). - **created_at** (string) - Timestamp when the pipeline run was created. #### Response Example ```json { "id": "a1b2c3d4e5f6g7h8i9j0", "root_execution_id": "exec123456789", "execution_stats": { "SUCCEEDED": 5, "RUNNING": 2, "PENDING": 3, "FAILED": 0 }, "created_at": "2025-01-15T10:30:00Z" } ``` ``` -------------------------------- ### Get Component by Digest (curl) Source: https://context7.com/tangleml/tangle/llms.txt Fetches detailed information about a specific component using its SHA256 digest. This is useful for retrieving the exact definition of a published component. ```curl curl "http://localhost:8000/api/components/sha256:abc123def456..." ``` -------------------------------- ### GET /api/pipeline_runs/ Source: https://context7.com/tangleml/tangle/llms.txt List and filter pipeline runs. This endpoint retrieves a list of pipeline runs, with options to include execution statistics and filter by various criteria. ```APIDOC ## GET /api/pipeline_runs/ ### Description List and filter pipeline runs. This endpoint retrieves a list of pipeline runs, with options to include execution statistics and filter by various criteria. ### Method GET ### Endpoint /api/pipeline_runs/ ### Parameters #### Query Parameters - **include_execution_stats** (boolean) - Optional - If true, includes execution statistics for each pipeline run. - **filter** (string) - Optional - Filters the pipeline runs. Example: `created_by:me`. ### Response #### Success Response (200) - **id** (string) - Unique identifier for the pipeline run. - **root_execution_id** (string) - ID of the root execution task. - **execution_stats** (object) - Statistics on the current state of pipeline executions (if `include_execution_stats` is true). - **[STATUS]** (integer) - Count of executions in a given status (e.g., SUCCEEDED, RUNNING, PENDING, FAILED). - **created_at** (string) - Timestamp when the pipeline run was created. #### Response Example ```json { "id": "a1b2c3d4e5f6g7h8i9j0", "root_execution_id": "exec123456789", "execution_stats": { "SUCCEEDED": 5, "RUNNING": 2, "PENDING": 3, "FAILED": 0 }, "created_at": "2025-01-15T10:30:00Z" } ``` ``` -------------------------------- ### Get Execution Details and Artifacts using Curl Source: https://context7.com/tangleml/tangle/llms.txt Retrieve detailed information about a task execution, including its specification and associated artifacts. This requires the execution ID and uses the /api/executions/{execution_id}/details and /api/executions/{execution_id}/artifacts endpoints. It also shows how to fetch specific artifact information using the /api/artifacts/{artifact_id} endpoint. ```bash # Get execution details including task specification curl "http://localhost:8000/api/executions/exec123456789/details" # Response: # { # "task_spec": { # "component_ref": {...}, # "arguments": {...} # }, # "parent_execution_id": null, # "child_executions": [...] # } # Get execution artifacts (inputs and outputs) curl "http://localhost:8000/api/executions/exec123456789/artifacts" # Response: # { # "input_artifacts": { # "training_data": { # "artifact_id": "art_abc123", # "artifact_data": { # "uri": "s3://bucket/data.csv", # "hash": "md5=a1b2c3d4e5f6", # "total_size": 1048576 # } # } # }, # "output_artifacts": { # "model": { # "artifact_id": "art_xyz789", # "artifact_data": { # "uri": "/data/artifacts/by_execution/exec123/outputs/model/data", # "hash": "md5=x7y8z9a0b1c2", # "total_size": 5242880 # } # } # } # } # Get specific artifact information curl "http://localhost:8000/api/artifacts/art_xyz789" ``` -------------------------------- ### Cancel Running Pipeline Execution - Bash Source: https://context7.com/tangleml/tangle/llms.txt Provides examples of how to cancel an ongoing pipeline execution using the Tangle API. It includes a cURL command to initiate the cancellation and another to verify the status of the affected execution. ```bash # Cancel a running pipeline curl -X POST "http://localhost:8000/api/pipeline_runs/a1b2c3d4e5f6g7h8i9j0/cancel" # All running tasks will be marked for termination # Response: {"status": "cancelled"} # Verify cancellation status curl "http://localhost:8000/api/executions/exec123456789/container_state" # { # "status": "CANCELLED", # "exit_code": null, # "started_at": "2025-01-15T10:31:00Z", # "ended_at": "2025-01-15T10:35:22Z" # } ``` -------------------------------- ### GET /api/executions/{execution_id}/container_state Source: https://context7.com/tangleml/tangle/llms.txt Verify cancellation status or get the state of a specific execution. This endpoint allows checking the status of an individual execution task. ```APIDOC ## GET /api/executions/{execution_id}/container_state ### Description Verify cancellation status or get the state of a specific execution. This endpoint allows checking the status of an individual execution task. ### Method GET ### Endpoint /api/executions/{execution_id}/container_state ### Parameters #### Path Parameters - **execution_id** (string) - Required - The unique identifier of the execution to query. ### Response #### Success Response (200) - **status** (string) - The current status of the execution (e.g., "CANCELLED", "SUCCEEDED", "RUNNING"). - **exit_code** (integer or null) - The exit code of the containerized task, if terminated. - **started_at** (string) - Timestamp when the execution started. - **ended_at** (string) - Timestamp when the execution ended. #### Response Example ```json { "status": "CANCELLED", "exit_code": null, "started_at": "2025-01-15T10:31:00Z", "ended_at": "2025-01-15T10:35:22Z" } ``` ``` -------------------------------- ### Configure and Launch with Kubernetes Launcher Source: https://context7.com/tangleml/tangle/llms.txt Configures and launches a container task on a Kubernetes cluster. It shows how to load Kubernetes configuration, create API clients, and set up either a single-node cluster with HostPath volumes or a GKE cluster with GCSFuse. Resource annotations for CPU, memory, and accelerators can be specified. The status of the launched pod is printed, and its logs can be streamed. ```python from kubernetes import client, config from cloud_pipelines_backend.launchers import kubernetes_launchers # Load Kubernetes configuration (from ~/.kube/config or in-cluster) config.load_kube_config() # Create Kubernetes API clients core_api = client.CoreV1Api() api_client = client.ApiClient() # Configure launcher for single-node cluster with HostPath volumes launcher = kubernetes_launchers.KubernetesWithHostPathContainerLauncher( core_api_client=core_api, api_client=api_client, namespace="default", persistent_volume_name="data-volume", host_path="/mnt/data", ) # Or configure for GKE with Google Cloud Storage via GCSFuse # launcher = kubernetes_launchers.KubernetesWithGcsFuseContainerLauncher( # core_api_client=core_api, # api_client=api_client, # namespace="default", # gcs_fuse_csi_driver_version="v1.2.0", # gcsfuse_bucket="my-bucket", # ) # Launch with resource annotations annotations = { "cloud-pipelines.net/launchers/generic/resources.cpu": "2", "cloud-pipelines.net/launchers/generic/resources.memory": "4Gi", "cloud-pipelines.net/launchers/generic/resources.accelerators": '{"nvidia.com/gpu": "1"}' } launched_pod = launcher.launch_container_task( component_spec=component_spec, input_arguments=input_args, output_uris=output_uris, log_uri="gs://my-bucket/logs/execution.log", annotations=annotations, ) # Check pod status print(f"Pod status: {launched_pod.status}") # Status: PENDING, RUNNING, SUCCEEDED, or FAILED # Stream pod logs for log_line in launched_pod.stream_log_lines(): print(log_line, end="") ``` -------------------------------- ### Orchestrator Initialization using Python Source: https://context7.com/tangleml/tangle/llms.txt Configure and run the Tangle orchestrator with specified launchers and storage providers. This Python script sets up paths, a database engine, a local storage provider, and a Docker container launcher, then initializes and runs the orchestrator service. ```python import pathlib import docker from sqlalchemy import orm from cloud_pipelines.orchestration.storage_providers import local_storage from cloud_pipelines_backend import orchestrator_sql, database_ops from cloud_pipelines_backend.launchers import local_docker_launchers # Setup paths data_root = pathlib.Path("data") artifacts_dir = data_root / "artifacts" logs_dir = data_root / "logs" artifacts_dir.mkdir(parents=True, exist_ok=True) logs_dir.mkdir(parents=True, exist_ok=True) # Configure database database_uri = f"sqlite:///{data_root}/db.sqlite" db_engine = database_ops.create_db_engine_and_migrate_db(database_uri) # Configure storage provider storage_provider = local_storage.LocalStorageProvider() # Configure Docker launcher docker_client = docker.DockerClient.from_env(timeout=5) launcher = local_docker_launchers.DockerContainerLauncher( client=docker_client, ) # Create orchestrator session_factory = orm.sessionmaker( autocommit=False, autoflush=False, bind=db_engine ) orchestrator = orchestrator_sql.OrchestratorService_Sql( session_factory=session_factory, launcher=launcher, storage_provider=storage_provider, data_root_uri=artifacts_dir.as_posix(), logs_root_uri=logs_dir.as_posix(), default_task_annotations={}, sleep_seconds_between_queue_sweeps=1.0, ) # Run orchestrator loop (blocks indefinitely) orchestrator.run_loop() ``` -------------------------------- ### Docker Launcher Configuration using Python Source: https://context7.com/tangleml/tangle/llms.txt Configure and use the Docker container launcher for running tasks locally. This Python code demonstrates initializing the Docker client, creating a launcher instance, and defining a component with a container implementation that processes CSV files using pandas. ```python import docker from cloud_pipelines_backend.launchers import local_docker_launchers from cloud_pipelines_backend.component_structures import ( ComponentSpec, ContainerImplementation, ContainerSpec, InputSpec, OutputSpec, InputPathPlaceholder, OutputPathPlaceholder ) # Initialize Docker client docker_client = docker.DockerClient.from_env(timeout=10) # Create launcher launcher = local_docker_launchers.DockerContainerLauncher( client=docker_client, ) # Define component with container implementation component_spec = ComponentSpec( name="data_processor", inputs=[InputSpec(name="input_file", type="CSV")], outputs=[OutputSpec(name="processed_file", type="CSV")], implementation=ContainerImplementation( container=ContainerSpec( image="python:3.9", command=["python", "-c"], args=[ "import pandas as pd; " "df = pd.read_csv('", InputPathPlaceholder(input_name="input_file"), "'); " "df.to_csv('", OutputPathPlaceholder(output_name="processed_file"), "', index=False)" ], ) ), ) ``` -------------------------------- ### Configure and Launch with HuggingFace Launcher Source: https://context7.com/tangleml/tangle/llms.txt Configures and launches a container task on HuggingFace infrastructure. It initializes the HuggingFace API client and storage provider. The HuggingFace Jobs container launcher is then configured to launch a task with specific input and output URIs in the HuggingFace format. Resource annotations for CPU and memory can be applied. The job ID and status are printed, with outputs stored in a HuggingFace dataset. ```python from huggingface_hub import HfApi from cloud_pipelines_backend.launchers import huggingface_launchers from cloud_pipelines_backend.storage_providers import huggingface_repo_storage # Initialize HuggingFace API client hf_token = "hf_..." # HuggingFace access token hf_api = HfApi(token=hf_token) # Configure HuggingFace storage provider storage_provider = huggingface_repo_storage.HuggingFaceRepoStorageProvider( client=hf_api ) # Create HuggingFace Jobs launcher launcher = huggingface_launchers.HuggingFaceJobsContainerLauncher( api_client=hf_api, token_secret=hf_token, ) # Launch task with HuggingFace URIs input_args = { "training_data": InputArgument( uri="hf://datasets/username/my-dataset@main/train.csv", total_size=1048576, is_dir=False, ) } output_uris = { "model": "hf://datasets/username/tangle_data@main/artifacts/model/data" } launched_job = launcher.launch_container_task( component_spec=component_spec, input_arguments=input_args, output_uris=output_uris, log_uri="hf://datasets/username/tangle_data@main/logs/job.log", annotations={ "cloud-pipelines.net/launchers/generic/resources.cpu": "4", "cloud-pipelines.net/launchers/generic/resources.memory": "16Gi", }, ) # Monitor job status print(f"Job ID: {launched_job.job_id}") print(f"Status: {launched_job.status}") # Outputs stored in user's private HuggingFace dataset ``` -------------------------------- ### Launch Container Task with Generic Launcher Source: https://context7.com/tangleml/tangle/llms.txt Launches a container task using a generic launcher. It defines input arguments, output URIs, and a log URI. After launching, it prints the container's status and exit code, and retrieves the log text. The terminate() method is commented out but available for manual termination. ```python from cloud_pipelines_backend.launchers.interfaces import InputArgument input_args = { "input_file": InputArgument( uri="/data/inputs/data.csv", total_size=1024, is_dir=False, ) } output_uris = { "processed_file": "/data/outputs/processed.csv" } launched_container = launcher.launch_container_task( component_spec=component_spec, input_arguments=input_args, output_uris=output_uris, log_uri="/data/logs/execution.log", annotations={}, ) # Monitor container status print(f"Container status: {launched_container.status}") print(f"Exit code: {launched_container.exit_code}") # Retrieve logs log_text = launched_container.get_log() print(log_text) # Terminate if needed # launched_container.terminate() ``` -------------------------------- ### Python: Implement Custom Cloud Storage Provider for Tangle Source: https://context7.com/tangleml/tangle/llms.txt This Python code defines a `CustomCloudStorageProvider` class that extends Tangle's `StorageProvider` interface. It handles uploading, downloading, checking existence, and retrieving metadata for files and directories in a custom cloud storage system using a provided API client. It also calculates MD5 hashes for uploaded files. Dependencies include `cloud_pipelines.orchestration.storage_providers.interfaces` and `hashlib`. ```python from cloud_pipelines.orchestration.storage_providers.interfaces import ( StorageProvider, DataInfo ) import hashlib import os class CustomCloudStorageProvider(StorageProvider): def __init__(self, api_client): self.client = api_client def upload(self, source_local_path: str, destination_uri: str): # Upload file or directory to custom cloud storage if os.path.isdir(source_local_path): for root, dirs, files in os.walk(source_local_path): for file in files: local_file = os.path.join(root, file) rel_path = os.path.relpath(local_file, source_local_path) remote_path = f"{destination_uri}/{rel_path}" self.client.upload_file(local_file, remote_path) else: self.client.upload_file(source_local_path, destination_uri) def download(self, source_uri: str, destination_local_path: str): # Download file or directory from cloud storage info = self.get_info(source_uri) if info.is_dir: os.makedirs(destination_local_path, exist_ok=True) for item in self.client.list_objects(source_uri): local_path = os.path.join(destination_local_path, item.name) self.client.download_file(item.uri, local_path) else: self.client.download_file(source_uri, destination_local_path) def exists(self, uri: str) -> bool: try: self.client.get_metadata(uri) return True except NotFoundError: return False def get_info(self, uri: str) -> DataInfo: metadata = self.client.get_metadata(uri) # Calculate hash for files hash_str = None if not metadata.is_dir: content = self.client.download_bytes(uri) md5 = hashlib.md5(content).hexdigest() hash_str = f"md5={md5}" return DataInfo( size=metadata.size, is_dir=metadata.is_dir, hashes=[hash_str] if hash_str else [], ) def calculate_data_hash(self, uri: str) -> str: info = self.get_info(uri) return info.hashes[0] if info.hashes else None # Use in orchestrator configuration custom_storage = CustomCloudStorageProvider(api_client=my_api_client) orchestrator = orchestrator_sql.OrchestratorService_Sql( session_factory=session_factory, launcher=launcher, storage_provider=custom_storage, # Use custom storage data_root_uri="custom://my-bucket/artifacts", logs_root_uri="custom://my-bucket/logs", default_task_annotations={}, sleep_seconds_between_queue_sweeps=1.0, ) ``` -------------------------------- ### Component Libraries API Source: https://context7.com/tangleml/tangle/llms.txt Endpoints for managing component libraries, including creation, listing, and user pinning. ```APIDOC ## POST /api/component_libraries/ ### Description Creates a new component library. ### Method POST ### Endpoint /api/component_libraries/ ### Parameters #### Request Body - **name** (string) - Required - The name of the component library. - **url_prefix** (string) - Required - A URL prefix associated with the library. - **owner_name** (string) - Required - The name of the owner or team. ### Request Example ```json { "name": "ML Training Components", "url_prefix": "https://github.com/myorg/components/tree/main/", "owner_name": "myteam" } ``` ### Response #### Success Response (200) - **id** (string) - The unique identifier of the created library. - **name** (string) - The name of the library. - **url_prefix** (string) - The URL prefix of the library. - **owner_name** (string) - The owner name of the library. #### Response Example ```json { "id": "lib_123456", "name": "ML Training Components", "url_prefix": "https://github.com/myorg/components/tree/main/", "owner_name": "myteam" } ``` ``` ```APIDOC ## GET /api/component_libraries/ ### Description Lists all available component libraries. ### Method GET ### Endpoint /api/component_libraries/ ### Response #### Success Response (200) - Returns a list of component library objects. (Structure TBD based on actual API response) #### Response Example ```json [ { "id": "lib_123456", "name": "ML Training Components", "url_prefix": "https://github.com/myorg/components/tree/main/", "owner_name": "myteam" } ] ``` ``` ```APIDOC ## GET /api/component_library_pins/me/ ### Description Retrieves the list of component libraries pinned by the current user. ### Method GET ### Endpoint /api/component_library_pins/me/ ### Response #### Success Response (200) - Returns a list of pinned library IDs. (Structure TBD based on actual API response) #### Response Example ```json { "library_ids": ["lib_123456", "lib_789012"] } ``` ``` ```APIDOC ## PUT /api/component_library_pins/me/ ### Description Updates the user's pinned component libraries. ### Method PUT ### Endpoint /api/component_library_pins/me/ ### Parameters #### Request Body - **library_ids** (array) - Required - A list of library IDs to be pinned. - (string) - The ID of a component library. ### Request Example ```json { "library_ids": ["lib_123456", "lib_789012"] } ``` ### Response #### Success Response (200) - Returns the updated list of pinned library IDs. (Structure TBD based on actual API response) #### Response Example ```json { "library_ids": ["lib_123456", "lib_789012"] } ``` ``` -------------------------------- ### Query Pipeline Run Status and Filter - Bash Source: https://context7.com/tangleml/tangle/llms.txt Shows how to query the status of pipeline runs using cURL commands. It demonstrates listing all runs with execution statistics, filtering runs by the creator, and retrieving details for a specific pipeline run. ```bash # List all pipeline runs with status information curl "http://localhost:8000/api/pipeline_runs/?include_execution_stats=true" # Filter runs by current user curl "http://localhost:8000/api/pipeline_runs/?filter=created_by:me" # Get specific pipeline run details curl "http://localhost:8000/api/pipeline_runs/a1b2c3d4e5f6g7h8i9j0" # Response includes execution statistics: # { # "id": "a1b2c3d4e5f6g7h8i9j0", # "root_execution_id": "exec123456789", # "execution_stats": { # "SUCCEEDED": 5, # "RUNNING": 2, # "PENDING": 3, # "FAILED": 0 # }, # "created_at": "2025-01-15T10:30:00Z" # } ``` -------------------------------- ### List Component Libraries (curl) Source: https://context7.com/tangleml/tangle/llms.txt Retrieves a list of all available component libraries. This endpoint provides a way to discover shared component collections. ```curl curl "http://localhost:8000/api/component_libraries/" ``` -------------------------------- ### Configure Execution Caching Strategies Source: https://context7.com/tangleml/tangle/llms.txt Demonstrates how to configure execution caching for tasks. It shows how to disable caching using a staleness of 'P0D' (0 days) and how to enable caching with a specific staleness period like 'P30D' (30 days). The default behavior allows unlimited cache reuse based on input artifacts and container specifications. ```python from cloud_pipelines_backend.component_structures import ( TaskSpec, ExecutionOptionsSpec, CachingStrategySpec ) # Disable caching for specific task task_no_cache = TaskSpec( component_ref=component_ref, arguments=arguments, execution_options=ExecutionOptionsSpec( caching_strategy=CachingStrategySpec( max_cache_staleness="P0D" # RFC3339 duration: 0 days = no cache ) ) ) # Enable 30-day cache staleness task_with_cache = TaskSpec( component_ref=component_ref, arguments=arguments, execution_options=ExecutionOptionsSpec( caching_strategy=CachingStrategySpec( max_cache_staleness="P30D" # Cache valid for 30 days ) ) ) # Default behavior: unlimited cache reuse # Cache key = hash(container_spec + input_artifact_hashes) # Orchestrator searches for matching executions with status: # - PENDING (starting) # - RUNNING (in progress) # - SUCCEEDED (completed) ``` -------------------------------- ### Retrieving Execution Details and Artifacts Source: https://context7.com/tangleml/tangle/llms.txt Endpoints for accessing detailed information about task executions and their associated input/output artifacts. ```APIDOC ## GET /api/executions/{execution_id}/details ### Description Retrieves detailed information about a specific task execution, including its specification. ### Method GET ### Endpoint `/api/executions/{execution_id}/details` ### Parameters #### Path Parameters - **execution_id** (string) - Required - The unique identifier of the execution. ### Response #### Success Response (200) - **task_spec** (object) - The specification of the task. - **parent_execution_id** (string or null) - The ID of the parent execution, if any. - **child_executions** (array) - A list of child execution IDs. #### Response Example ```json { "task_spec": { "component_ref": {...}, "arguments": {...} }, "parent_execution_id": null, "child_executions": [] } ``` ## GET /api/executions/{execution_id}/artifacts ### Description Retrieves the input and output artifacts for a given task execution. ### Method GET ### Endpoint `/api/executions/{execution_id}/artifacts` ### Parameters #### Path Parameters - **execution_id** (string) - Required - The unique identifier of the execution. ### Response #### Success Response (200) - **input_artifacts** (object) - A map of input artifact names to their details. - **output_artifacts** (object) - A map of output artifact names to their details. #### Response Example ```json { "input_artifacts": { "training_data": { "artifact_id": "art_abc123", "artifact_data": { "uri": "s3://bucket/data.csv", "hash": "md5=a1b2c3d4e5f6", "total_size": 1048576 } } }, "output_artifacts": { "model": { "artifact_id": "art_xyz789", "artifact_data": { "uri": "/data/artifacts/by_execution/exec123/outputs/model/data", "hash": "md5=x7y8z9a0b1c2", "total_size": 5242880 } } } } ``` ## GET /api/artifacts/{artifact_id} ### Description Retrieves information about a specific artifact. ### Method GET ### Endpoint `/api/artifacts/{artifact_id}` ### Parameters #### Path Parameters - **artifact_id** (string) - Required - The unique identifier of the artifact. ### Response #### Success Response (200) - (Object) - Details about the artifact (structure depends on artifact type). #### Response Example (Example response structure not provided in source text, would typically mirror artifact_data structure) ``` -------------------------------- ### Submit New Pipeline Run via REST API - Bash Source: https://context7.com/tangleml/tangle/llms.txt Demonstrates how to submit a new pipeline run to the Tangle API using a cURL command. It includes a JSON payload defining a simple training pipeline with inputs, outputs, and container implementation details, along with arguments for the pipeline. ```bash # Create a simple training pipeline curl -X POST "http://localhost:8000/api/pipeline_runs/" \ -H "Content-Type: application/json" \ -d '{ "root_task": { "component_ref": { "spec": { "name": "train_model", "inputs": [ {"name": "training_data", "type": "Dataset"}, {"name": "learning_rate", "type": "Float", "default": "0.01"} ], "outputs": [ {"name": "model", "type": "Model"} ], "implementation": { "container": { "image": "python:3.9", "command": ["python", "train.py"], "args": [ "--data", {"input_path": {"input_name": "training_data"}}, "--lr", {"input_value": {"input_name": "learning_rate"}}, "--output", {"output_path": {"output_name": "model"}} ] } } } }, "arguments": { "training_data": "s3://bucket/data.csv", "learning_rate": "0.001" } }, "annotations": {"experiment": "baseline", "version": "1.0"} }' # Response: # { # "id": "a1b2c3d4e5f6g7h8i9j0", # "root_execution_id": "exec123456789", # "created_at": "2025-01-15T10:30:00Z", # "created_by": "admin", # "annotations": {"experiment": "baseline", "version": "1.0"} # } ``` -------------------------------- ### Execution Caching Configuration Source: https://context7.com/tangleml/tangle/llms.txt Demonstrates how to configure execution caching strategies for tasks, including disabling and enabling specific cache staleness periods. ```APIDOC ## Configuring Execution Caching Control automatic execution result reuse by setting the `caching_strategy` within `execution_options`. ### Disabling Caching To disable caching for a specific task, set `max_cache_staleness` to `P0D` (RFC3339 duration for 0 days). ```python from cloud_pipelines_backend.component_structures import \ TaskSpec, ExecutionOptionsSpec, CachingStrategySpec task_no_cache = TaskSpec( component_ref=component_ref, arguments=arguments, execution_options=ExecutionOptionsSpec( caching_strategy=CachingStrategySpec( max_cache_staleness="P0D" # RFC3339 duration: 0 days = no cache ) ) ) ``` ### Enabling Cache with Staleness To enable caching with a specific staleness period, set `max_cache_staleness` to the desired RFC3339 duration (e.g., `P30D` for 30 days). ```python # Enable 30-day cache staleness task_with_cache = TaskSpec( component_ref=component_ref, arguments=arguments, execution_options=ExecutionOptionsSpec( caching_strategy=CachingStrategySpec( max_cache_staleness="P30D" # Cache valid for 30 days ) ) ) ``` ### Default Behavior By default, caching is unlimited. The cache key is generated from the `container_spec` and `input_artifact_hashes`. The orchestrator searches for executions with statuses `PENDING`, `RUNNING`, or `SUCCEEDED`. ``` -------------------------------- ### Create Component Library (curl) Source: https://context7.com/tangleml/tangle/llms.txt Creates a new component library within TangleML for team sharing. Requires a JSON payload specifying the library's name, a URL prefix for component source code, and the owner's name. Returns the ID and details of the newly created library. ```curl curl -X POST "http://localhost:8000/api/component_libraries/" \ -H "Content-Type: application/json" \ -d '{ "name": "ML Training Components", "url_prefix": "https://github.com/myorg/components/tree/main/", "owner_name": "myteam" }' ``` -------------------------------- ### Publish Component to Library (curl) Source: https://context7.com/tangleml/tangle/llms.txt Publishes a new component to the TangleML library. Requires a JSON payload describing the component's schema, implementation details (container image, command, arguments), and publisher information. Returns a digest and metadata of the published component. ```curl curl -X POST "http://localhost:8000/api/published_components/" \ -H "Content-Type: application/json" \ -d '{ "component": { "name": "xgboost_trainer", "description": "Train XGBoost model on CSV data", "inputs": [ {"name": "data", "type": "CSV"}, {"name": "max_depth", "type": "Integer", "default": "6"} ], "outputs": [ {"name": "model", "type": "XGBoostModel"} ], "implementation": { "container": { "image": "xgboost/xgboost:latest", "command": ["python", "-m", "xgboost.train"], "args": [ "--data", {"input_path": {"input_name": "data"}}, "--max-depth", {"input_value": {"input_name": "max_depth"}}, "--output", {"output_path": {"output_name": "model"}} ] } } }, "name": "xgboost_trainer", "publisher_name": "myteam" }' ``` -------------------------------- ### Streaming Container Logs Source: https://context7.com/tangleml/tangle/llms.txt Endpoints for monitoring real-time execution logs via Server-Sent Events and retrieving complete logs. ```APIDOC ## GET /api/executions/{execution_id}/stream_container_log ### Description Streams real-time container logs for a specific execution as they are generated using Server-Sent Events. ### Method GET ### Endpoint `/api/executions/{execution_id}/stream_container_log` ### Parameters #### Path Parameters - **execution_id** (string) - Required - The unique identifier of the execution. ### Response #### Success Response (200) - (Server-Sent Events Stream) - Each event contains a `data` field with a JSON object including `log_line`. #### Response Example (SSE Stream) ``` data: {"log_line": "2025-01-15 10:31:05 Starting training...\n"} data: {"log_line": "2025-01-15 10:31:10 Epoch 1/10, loss=0.543\n"} data: {"log_line": "2025-01-15 10:31:15 Epoch 2/10, loss=0.421\n"} ``` ## GET /api/executions/{execution_id}/container_log ### Description Retrieves the complete container log for a specific execution after it has finished. ### Method GET ### Endpoint `/api/executions/{execution_id}/container_log` ### Parameters #### Path Parameters - **execution_id** (string) - Required - The unique identifier of the execution. ### Response #### Success Response (200) - **log** (string) - The complete log content. - **launcher_error_messages** (array) - A list of any error messages from the launcher. #### Response Example ```json { "log": "2025-01-15 10:31:05 Starting training...\n...", "launcher_error_messages": [] } ``` ``` -------------------------------- ### List Published Components (curl) Source: https://context7.com/tangleml/tangle/llms.txt Retrieves a list of all components currently published in the TangleML library. This endpoint returns basic metadata for each component. ```curl curl "http://localhost:8000/api/published_components/" ``` -------------------------------- ### Define Pipeline Task (Python) Source: https://context7.com/tangleml/tangle/llms.txt Demonstrates the use of Python data structures for defining task specifications within a pipeline. It imports necessary classes from the cloud_pipelines_backend library. ```python from cloud_pipelines_backend.component_structures import \ TaskSpec, ComponentReference, GraphInputArgument, TaskOutputArgument, \ GraphInputReference, TaskOutputReference ```