### Build Simple Pipeline with EmptyStartEvent and EmptyEndEvent Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Creates a basic pipeline workflow by connecting start and end events with an activity node. The pipeline accepts input data through a Data object with variable definitions. This is the foundational pattern for all bamboo-engine workflows. ```python start = EmptyStartEvent() end = EmptyEndEvent() start.extend(activity).extend(end) pipeline_data = Data( inputs={ "request_body": Var(type=Var.PLAIN, value={"key": "value"}) } ) pipeline = builder.build_tree(start, data=pipeline_data) ``` -------------------------------- ### Example Pipeline Tree JSON Output Source: https://github.com/tencentblueking/bamboo-engine/blob/master/docs/user_guide/builded_pipeline_tree_schema.md A sample JSON representation of a generated pipeline tree. This example illustrates the 'activities' section, showcasing a 'ServiceActivity' with its associated properties like 'id', 'component', 'incoming', and 'outgoing' flows. This output format is used after calling 'builder.build_tree()'. ```json { "activities": { "eb456a70affdf47ae9d96c5d196d36b09": { "component": { "code": "example_component", "inputs": {} }, "error_ignorable": false, "id": "eb456a70affdf47ae9d96c5d196d36b09", "incoming": [ "f216d744eae614b71b3ca88002fe81439" ], "name": "", "optional": false, "outgoing": "f42b0a808b63f4315bb9b51159617797c", "retryable": true, "skippable": true, "timeout": null, "type": "ServiceActivity" } }, } ``` -------------------------------- ### Sample Pipeline State Output Source: https://github.com/tencentblueking/bamboo-engine/blob/master/README_en.md This code block shows a sample JSON output representing the state of a pipeline, including details about its children, creation time, start time, and archive time. This structure is typically returned by an API call to retrieve pipeline status. ```python { 'pc31c89e6b85a4e2c8c5db477978c1a57': { 'id': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'state': 'FINISHED', 'root_id:': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'parent_id': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'version': 'vaf47e56f2f31401e979c3c47b2a0c285', 'loop': 1, 'retry': 0, 'skip': False, 'created_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 688664, tzinfo=), 'started_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 688423, tzinfo=), 'archived_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 775165, tzinfo=), 'children': { 'e42035b3f98374062921a191115fc602e': { 'id': 'e42035b3f98374062921a191115fc602e', 'state': 'FINISHED', 'root_id:': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'parent_id': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'version': 've2d0fa10d7d842a1bcac25984620232a', 'loop': 1, 'retry': 0, 'skip': False, 'created_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 744490, tzinfo=), 'started_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 744308, tzinfo=), 'archived_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 746690, tzinfo=) }, 'e327f83de42df4ebfab375c271bf63d29': { 'id': 'e327f83de42df4ebfab375c271bf63d29', 'state': 'FINISHED', 'root_id:': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'parent_id': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'version': 'v893cdc14150d4df5b20f2db32ba142b3', 'loop': 1, 'retry': 0, 'skip': False, 'created_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 753321, tzinfo=), 'started_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 753122, tzinfo=), 'archived_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 758697, tzinfo=) }, 'e6c7d7a3721ca4b19a5a7f3b34d8387bf': { 'id': 'e6c7d7a3721ca4b19a5a7f3b34d8387bf', 'state': 'FINISHED', 'root_id:': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'parent_id': 'pc31c89e6b85a4e2c8c5db477978c1a57', 'version': 'v0c661ee6994d4eb4bdbfe5260f9a9f22', 'loop': 1, 'retry': 0, 'skip': False, 'created_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 767563, tzinfo=), 'started_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 767384, tzinfo=), 'archived_time': datetime.datetime(2021, 3, 10, 3, 45, 54, 773341, tzinfo=) } } } } ``` -------------------------------- ### Build and Execute Simple Workflow Pipeline in Python Source: https://github.com/tencentblueking/bamboo-engine/blob/master/README_en.md Create and execute a simple workflow using bamboo-engine builder API. Constructs a pipeline with start event, service activity component, and end event, then runs it using BambooDjangoRuntime. Demonstrates basic workflow orchestration and execution pattern. ```python import time from bamboo_engine import api from bamboo_engine.builder import * from pipeline.eri.runtime import BambooDjangoRuntime # use builder to build workflow start = EmptyStartEvent() # use bamboo-pipeline example component act = ServiceActivity(component_code="example_component") end = EmptyEndEvent() start.extend(act).extend(end) pipeline = builder.build_tree(start) # execute it runtime = BambooDjangoRuntime() api.run_pipeline(runtime=runtime, pipeline=pipeline) ``` -------------------------------- ### Configure Django Settings for bamboo-pipeline Source: https://github.com/tencentblueking/bamboo-engine/blob/master/README_en.md Configure Django project settings to integrate bamboo-pipeline with Celery message queue. This setup initializes the Celery application and registers required pipeline apps for workflow engine functionality. Required for runtime initialization and task processing. ```python from pipeline.eri.celery.queues import * from celery import Celery app = Celery("proj") app.config_from_object("django.conf:settings") INSTALLED_APPS = [ ... "pipeline", "pipeline.engine", "pipeline.component_framework", "pipeline.eri", ... ] ``` -------------------------------- ### Fetch Pipeline Status with Delay in Python Source: https://github.com/tencentblueking/bamboo-engine/blob/master/README_en.md This snippet demonstrates how to fetch the status of a pipeline after a short delay. It imports necessary libraries, pauses execution for 1 second using `time.sleep(1)`, and then calls an API to get pipeline states. The result is printed to the console. Dependencies include the `api` object and `runtime` and `pipeline` variables. ```python import time time.sleep(1) result = api.get_pipeline_states(runtime=runtime, root_id=pipeline["id"]) print(result.data) ``` -------------------------------- ### Get Pipeline Execution States with Bamboo Engine Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Retrieves the current state of a running pipeline and all its nodes. This function requires a runtime and the pipeline's root ID. It can return states in a flattened format for easier processing. The output includes detailed node information such as state, timing, loop counts, retry counts, and hierarchical relationships. ```python from bamboo_engine import api from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() pipeline_id = "pc31c89e6b85a4e2c8c5db477978c1a57" # Get pipeline states with flattened children result = api.get_pipeline_states(runtime=runtime, root_id=pipeline_id, flat_children=True) if result.result: states = result.data for node_id, node_state in states.items(): print(f"Node: {node_id}") print(f" State: {node_state['state']}") print(f" Started: {node_state['started_time']}") print(f" Loop: {node_state['loop']}, Retry: {node_state['retry']}") print(f" Children count: {len(node_state['children'])}") else: print(f"Error: {result.message}") ``` -------------------------------- ### Run Workflow Pipeline with Bamboo Engine Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Executes a workflow pipeline using the bamboo-engine API. It requires a runtime implementation (e.g., BambooDjangoRuntime) and a constructed pipeline object. The function validates the pipeline, sets up the execution environment, and initiates the first node. It returns a result object indicating success or failure, including any exceptions. ```python import time from bamboo_engine import api from bamboo_engine.builder import builder, EmptyStartEvent, ServiceActivity, EmptyEndEvent from pipeline.eri.runtime import BambooDjangoRuntime # Build a simple workflow with start -> activity -> end start = EmptyStartEvent() act = ServiceActivity(component_code="example_component") end = EmptyEndEvent() start.extend(act).extend(end) pipeline = builder.build_tree(start) # Execute the pipeline runtime = BambooDjangoRuntime() result = api.run_pipeline(runtime=runtime, pipeline=pipeline) if result.result: print(f"Pipeline started successfully: {pipeline['id']}") else: print(f"Failed to start pipeline: {result.message}") print(f"Exception: {result.exc_trace}") ``` -------------------------------- ### Define Service Activity with Configuration using Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Creates a service activity node for executing a component with detailed configuration options. This includes setting the `component_code`, display `name`, and flags for `error_ignorable`, `skippable`, and `retryable`. It also allows configuring component inputs, such as URL, method, body, and timeout, using `Var` objects for dynamic values. ```python from bamboo_engine.builder import ServiceActivity, Var, EmptyStartEvent, EmptyEndEvent, builder, Data # Create activity with full configuration activity = ServiceActivity( component_code="http_request", name="Call External API", error_ignorable=False, timeout=300, # 5 minutes skippable=True, retryable=True ) # Configure activity inputs activity.component.inputs.url = Var(type=Var.PLAIN, value="https://api.example.com/data") activity.component.inputs.method = Var(type=Var.PLAIN, value="POST") activity.component.inputs.body = Var( type=Var.SPLICE, value="${request_body}" # Reference variable from pipeline context ) activity.component.inputs.timeout = Var(type=Var.PLAIN, value=30) ``` -------------------------------- ### Build Pipeline Tree using Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Constructs a complete pipeline tree structure from flow elements, including validation and unique ID generation. It uses the `builder` module and various gateway and activity types. The `Data` object can define inputs and outputs, referencing variables using `Var`. The output is a dictionary representing the pipeline with its ID, activities, gateways, and flows. ```python from bamboo_engine import builder from bamboo_engine.builder import ( EmptyStartEvent, EmptyEndEvent, ServiceActivity, ParallelGateway, ConvergeGateway, ExclusiveGateway, Data, Var ) # Build a complex workflow with parallel branches start = EmptyStartEvent() act1 = ServiceActivity(component_code="step1", name="Initial Step") # Parallel execution pg = ParallelGateway() act2 = ServiceActivity(component_code="step2a", name="Parallel Step A") act3 = ServiceActivity(component_code="step2b", name="Parallel Step B") cg = ConvergeGateway() act4 = ServiceActivity(component_code="step3", name="Final Step") end = EmptyEndEvent() # Connect the workflow start.extend(act1).extend(pg) pg.connect(act2, act3) pg.converge(cg) act2.extend(cg) act3.extend(cg) cg.extend(act4).extend(end) # Define pipeline data with variables data = Data( inputs={ "input_param": Var(type=Var.PLAIN, value="test_value"), "config_param": Var(type=Var.SPLICE, value="${settings.config}") }, outputs=["output_key1", "output_key2"] ) # Build the tree pipeline = builder.build_tree(start, data=data) print(f"Pipeline ID: {pipeline['id']}") print(f"Nodes: {len(pipeline['activities'])} activities, {len(pipeline['gateways'])} gateways") print(f"Flows: {len(pipeline['flows'])}") ``` -------------------------------- ### Create Nested Workflows with SubProcess Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Defines a reusable subprocess with its own complete workflow tree containing start/end events and service activities. Subprocesses accept input parameters through Params and variable substitution with SPLICE mechanism, enabling modular workflow composition within a main pipeline. ```python from bamboo_engine import builder from bamboo_engine.builder import ( EmptyStartEvent, EmptyEndEvent, ServiceActivity, SubProcess, Data, Params, Var ) # Build the subprocess workflow sub_start = EmptyStartEvent() sub_act1 = ServiceActivity(component_code="sub_step1", name="Subprocess Step 1") sub_act2 = ServiceActivity(component_code="sub_step2", name="Subprocess Step 2") sub_end = EmptyEndEvent() sub_start.extend(sub_act1).extend(sub_act2).extend(sub_end) # Define subprocess data sub_data = Data( inputs={ "subprocess_param": Var(type=Var.SPLICE, value="${parent_input}") }, outputs=["subprocess_output"] ) # Create subprocess node in main workflow subprocess = SubProcess( start=sub_start, data=sub_data, name="Data Processing Subprocess" ) # Define parameters passed to subprocess subprocess.params = Params(params={ "parent_input": Var(type=Var.SPLICE, value="${main_pipeline_data}") }) # Build main pipeline with subprocess main_start = EmptyStartEvent() main_end = EmptyEndEvent() main_start.extend(subprocess).extend(main_end) main_data = Data( inputs={ "main_pipeline_data": Var(type=Var.PLAIN, value={"data": "from parent"}) } ) pipeline = builder.build_tree(main_start, data=main_data) ``` -------------------------------- ### Retrieve Node Execution Data with Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Fetches both the input and output data associated with a specific node's execution. This is useful for debugging and understanding the data flow within a pipeline. Requires the runtime and node ID. ```python from bamboo_engine import api from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" result = api.get_execution_data(runtime=runtime, node_id=node_id) if result.result: data = result.data print("Inputs:") for key, value in data['inputs'].items(): print(f" {key}: {value}") print("\nOutputs:") for key, value in data['outputs'].items(): print(f" {key}: {value}") # Check for errors if 'ex_data' in data['outputs']: print(f"\nError: {data['outputs']['ex_data']}") else: print(f"Failed to get execution data: {result.message}") ``` -------------------------------- ### Create Conditional Branching with ExclusiveGateway Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Implements conditional logic using ExclusiveGateway that evaluates multiple conditions and routes execution to a single matching branch. Multiple ServiceActivity nodes handle different outcomes, and a ConvergeGateway merges all branches back together before reaching the end event. ```python from bamboo_engine import builder from bamboo_engine.builder import ( EmptyStartEvent, EmptyEndEvent, ServiceActivity, ExclusiveGateway, ConvergeGateway, Var, Data ) start = EmptyStartEvent() # Exclusive gateway with conditions eg = ExclusiveGateway(name="Status Check") eg.add_condition(0, "${status} == 'success'") eg.add_condition(1, "${status} == 'warning'") eg.add_condition(2, "${status} == 'error'") # Different paths for each condition success_act = ServiceActivity(component_code="success_handler", name="Handle Success") warning_act = ServiceActivity(component_code="warning_handler", name="Handle Warning") error_act = ServiceActivity(component_code="error_handler", name="Handle Error") cg = ConvergeGateway() end = EmptyEndEvent() # Connect the branches start.extend(eg) eg.connect(success_act, warning_act, error_act) success_act.extend(cg) warning_act.extend(cg) error_act.extend(cg) cg.extend(end) # Pipeline data with status variable pipeline_data = Data( inputs={ "status": Var(type=Var.PLAIN, value="success") } ) pipeline = builder.build_tree(start, data=pipeline_data) ``` -------------------------------- ### Pipeline Tree YAML Schema Definition Source: https://github.com/tencentblueking/bamboo-engine/blob/master/docs/user_guide/builded_pipeline_tree_schema.md Defines the structure and properties of the pipeline tree generated by the Bamboo Engine. It outlines the fields like 'id', 'start_event', 'end_event', 'activities', 'flows', 'gateways', and 'data', along with their respective types and descriptions. This schema is crucial for understanding and working with pipeline execution graphs. ```yaml pipeline_tree: description: The builded tree of bamboo engine type: dict properties: id: description: The uuid of tree type: string start_event: description: The start event node of tree type: dict properties: id: description: The uuid of start event node type: string incoming: description: The incoming flow of start event node type: string name: description: The name of start event node type: string outgoing: description: The outgoing flow of start event node type: string type: description: The node type of start event node type: string end_event: description: The end event node of tree type: dict properties: id: description: The uuid of end event node type: string incoming: description: The incoming flows of end event node type: list[string] name: description: The name of end event node type: string outgoing: description: The outgoing flow of end event node type: string type: description: The node type of end event node type: string activities: description: The activity nodes in pipeline type: dict properties: $node_id: description: The activity node properties: id: description: The uuid of activity node type: string name: description: The name of activity node type: string incoming: description: The incoming flows of activity node type: list[string] outgoing: description: The outgoing flow of activity node type: string type: description: The node type of activity node type: string error_ignore: description: If the failure of node be ignored type: boolean optional: description: If the node can be excluded when executing type: boolean retryable: description: If the node can be retried type: boolean skippable: description: If the node can be skipped type: boolean timeout: description: Deprecated field component: description: The component which defines the activity of the node type: dict properties: code: description: The unique code of the component type: string inputs: description: The inputs defined in the component type: dict flows: description: The flows in pipeline type: dict properties: $flow_id: description: The flow type: dict properties: id: description: The uuid of flow type: string source: description: The node id which is the source of flow type: string target: description: The node id which is the target of flow type: string is_default: description: Deprecated field gateways: description: The gateways in pipeline type: dict properties: $gateway_id: description: The gateway type: dict properties: id: description: The uuid of gateway type: string name: description: The name of gateway type: string outgoing: description: The outgoing flows of gateway type: list data: description: pipeline data type: dict properties: inputs: description: The inputs of pipeline type: dict outputs: description: The outputs of pipeline type: list ``` -------------------------------- ### Retrieve Node Execution History using Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Retrieves the complete execution history for a given node ID. It takes a runtime instance and the node ID as input. An optional 'loop' parameter (defaulting to -1 for all iterations) can be specified. The output is a list of history dictionaries, each containing details like version, loop count, skip status, timestamps, inputs, and outputs for each execution. ```python from bamboo_engine import api from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" # Get all histories result = api.get_node_histories(runtime=runtime, node_id=node_id, loop=-1) if result.result: histories = result.data print(f"Total executions: {len(histories)}") for i, history in enumerate(histories): print(f"\nExecution {i+1}:") print(f" Version: {history['version']}") print(f" Loop: {history['loop']}, Skip: {history['skip']}") print(f" Started: {history['started_time']}") print(f" Archived: {history['archived_time']}") print(f" Duration: {(history['archived_time'] - history['started_time']).total_seconds()}s") print(f" Inputs: {history['inputs']}") print(f" Outputs: {history['outputs']}") else: print(f"Failed to get histories: {result.message}") ``` -------------------------------- ### Callback a Waiting Node with Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Triggers a callback for a node that is waiting for external events. This is used for nodes with asynchronous execution patterns, allowing them to resume once external data is provided. Requires the runtime, node ID, version, and callback data. ```python from bamboo_engine import api from bamboo_engine.exceptions import InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" version = "v893cdc14150d4df5b20f2db32ba142b3" callback_data = { "status": "success", "result": {"key1": "value1", "key2": 100}, "message": "External system completed successfully" } try: result = api.callback( runtime=runtime, node_id=node_id, version=version, data=callback_data ) if result.result: print(f"Callback processed for node {node_id}") else: print(f"Callback failed: {result.message}") except InvalidOperationError as e: print(f"Invalid callback: {e}") ``` -------------------------------- ### Force Activity Failure using Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Forces a running activity node to fail immediately with custom error data. It requires a runtime instance, the node ID, and the error data. Optionally, it can send a post-set state signal. Returns a result object indicating success or failure, and allows checking the execution data outputs for 'ex_data' and '_forced_failed' flags. ```python from bamboo_engine import api from bamboo_engine.exceptions import InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" ex_data = "Forced failure due to business rule violation: timeout exceeded" try: result = api.forced_fail_activity( runtime=runtime, node_id=node_id, ex_data=ex_data, send_post_set_state_signal=True ) if result.result: print(f"Node {node_id} forced to fail") # Check the failure data exec_result = api.get_execution_data_outputs(runtime=runtime, node_id=node_id) print(f"Error data: {exec_result.data.get('ex_data')}") print(f"Forced failed flag: {exec_result.data.get('_forced_failed')}") else: print(f"Failed to force fail: {result.message}") except InvalidOperationError as e: print(f"Cannot force fail: {e}") ``` -------------------------------- ### Skip Failed Node Execution with Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Skips a failed node in a pipeline, allowing execution to continue with the next node. This is applicable only to ServiceActivity and EmptyStartEvent nodes that are marked as skippable. The skipped node is marked as FINISHED with a skip flag. Requires runtime and node ID. ```python from bamboo_engine import api from bamboo_engine.exceptions import InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" try: result = api.skip_node(runtime=runtime, node_id=node_id) if result.result: print(f"Node {node_id} skipped successfully") # Verify the skip state_result = api.get_children_states(runtime=runtime, node_id=node_id) node_state = state_result.data[node_id] print(f"State: {node_state['state']}, Skip: {node_state['skip']}") else: print(f"Skip failed: {result.message}") except InvalidOperationError as e: print(f"Cannot skip node: {e}") ``` -------------------------------- ### Resume Paused Pipeline Execution with Bamboo Engine Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Resumes a pipeline that has been previously paused. This action reactivates suspended processes and continues execution from the point of suspension. The function requires a runtime and the pipeline ID, and it will only succeed if the pipeline is in the SUSPENDED state. It catches InvalidOperationError for cases where the pipeline cannot be resumed. ```python from bamboo_engine import api from bamboo_engine.exceptions import InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() pipeline_id = "pc31c89e6b85a4e2c8c5db477978c1a57" try: result = api.resume_pipeline(runtime=runtime, pipeline_id=pipeline_id) if result.result: print(f"Pipeline {pipeline_id} resumed successfully") # Check current state state_result = api.get_pipeline_states(runtime=runtime, root_id=pipeline_id) print(f"Current state: {state_result.data[pipeline_id]['state']}") else: print(f"Failed to resume: {result.message}") except InvalidOperationError as e: print(f"Cannot resume pipeline: {e}") ``` -------------------------------- ### Retry Failed Node Execution with Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Retries a node that is in a FAILED state and marked as retryable. Optionally, new input data can be provided to alter the retry attempt. This operation creates a history record of the previous failed execution. It requires the runtime and node ID. ```python from bamboo_engine import api from bamboo_engine.exceptions import InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() node_id = "e42035b3f98374062921a191115fc602e" # Retry with modified input data new_data = { "param1": "new_value", "param2": 42, "_escape_render_keys": ["param1"] # Don't render param1 } try: result = api.retry_node(runtime=runtime, node_id=node_id, data=new_data) if result.result: print(f"Node {node_id} retry initiated") # Check execution history history_result = api.get_node_histories(runtime=runtime, node_id=node_id) print(f"Execution history count: {len(history_result.data)}") else: print(f"Retry failed: {result.message}") except InvalidOperationError as e: print(f"Cannot retry node: {e}") ``` -------------------------------- ### Revoke Pipeline Execution with Python Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Cancels a running pipeline, transitioning its state to REVOKED. This is a terminal operation and cannot be resumed. It requires the runtime and pipeline ID as input and returns a result indicating success or failure. ```python from bamboo_engine import api from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() pipeline_id = "pc31c89e6b85a4e2c8c5db477978c1a57" result = api.revoke_pipeline(runtime=runtime, pipeline_id=pipeline_id) if result.result: print(f"Pipeline {pipeline_id} revoked successfully") # Verify the revocation state_result = api.get_pipeline_states(runtime=runtime, root_id=pipeline_id) print(f"Final state: {state_result.data[pipeline_id]['state']}") # REVOKED else: print(f"Failed to revoke: {result.message}") ``` -------------------------------- ### Pause Pipeline Execution with Bamboo Engine Source: https://context7.com/tencentblueking/bamboo-engine/llms.txt Suspends the execution of a running pipeline. Once paused, no further nodes will execute until the pipeline is resumed. This function requires a runtime and the target pipeline ID. It handles potential exceptions like NotFoundError or InvalidOperationError if the pipeline does not exist or is not in a pausable state. ```python from bamboo_engine import api from bamboo_engine.exceptions import NotFoundError, InvalidOperationError from pipeline.eri.runtime import BambooDjangoRuntime runtime = BambooDjangoRuntime() pipeline_id = "pc31c89e6b85a4e2c8c5db477978c1a57" try: result = api.pause_pipeline(runtime=runtime, pipeline_id=pipeline_id) if result.result: print(f"Pipeline {pipeline_id} paused successfully") else: print(f"Failed to pause: {result.message}") except (NotFoundError, InvalidOperationError) as e: print(f"Operation error: {e}") ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.