### Full Example: Local Parameter Definition Methods Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/concept.md A comprehensive example demonstrating both methods of defining local parameters: via task arguments and via task instance methods. Includes necessary imports and workflow setup. ```python r""" A tutorial example set local parameter in pydolphinscheduler. Method 1: task = Shell(..., input_params={"input":"a"}, output_params={"output": "b"}) Method 2: task = Shell(...) task.add_in("input", "a") task.add_out("output", "b") """ from pydolphinscheduler.core.parameter import ParameterType from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.shell import Shell with Workflow(name="local_parameter_example", release_state="offline") as workflow: # [start parameter example] # define a parameter "a", and use it in Shell task example1_input_params = Shell( name="example1_input_params", command="echo ${a}", input_params={ "a": "123", }, ) # define a parameter "random_value", and pass it to the downstream tasks example2_output_params = Shell( name="example2_output_params", command=""" val=$(echo $RANDOM) echo "#{setValue(random_value=${val})}" echo $val """, output_params={ "random_value": "", }, ) # use the parameter "random_value", from upstream tasks # we don't need to define input_params again if the parameter is from upstram tasks example2_input_params = Shell(name="example2_input_params", command="""echo ${random_value}""" ) example2_output_params >> example2_input_params # [end parameter example] # [start parameter define] # Add parameter via task arguments task_1 = Shell( name="task_1", command="echo hello pydolphinscheduler", input_params={ "value_VARCHAR": "abc", "value_INTEGER": 123, "value_FLOAT": 0.1, "value_BOOLEAN": True, }, output_params={ "value_EMPTY": None, }, ) # Add parameter via task instance's method task_2 = Shell(name="task_2", command="echo hello pydolphinscheduler") task_2.add_in("value_VARCHAR", "abc") task_2.add_in("value_INTEGER", 123) task_2.add_in("value_FLOAT", 0.1) task_2.add_in("value_BOOLEAN", True) task_2.add_out("value_EMPTY") # Task 1 is the same as task 2 # Others parameter types which cannot be converted automatically, must declare type explicitly ``` -------------------------------- ### Download Example Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/README.md Retrieve a tutorial example script from the official repository. ```shell # Get the latest code of example from github curl https://raw.githubusercontent.com/apache/dolphinscheduler-sdk-python/main/src/pydolphinscheduler/examples/tutorial.py -o ./tutorial.py ``` -------------------------------- ### Manage Configuration via CLI Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/config.md Examples for getting and setting configuration values using the pydolphinscheduler CLI. ```bash # Get single configuration in the leaf node, # The output look like below: # java_gateway.address = 127.0.0.1 pydolphinscheduler config --get java_gateway.address # Get multiple configuration in the leaf node, # The output look like below: # java_gateway.address = 127.0.0.1 # java_gateway.port = 25333 pydolphinscheduler config --get java_gateway.address --get java_gateway.port # Get parent configuration which contain multiple leaf nodes, # The output look like below: # java_gateway = ordereddict([('address', '127.0.0.1'), ('port', 25333), ('auto_convert', True)]) pydolphinscheduler config --get java_gateway # Set single configuration, # The output look like below: # Set configuration done. pydolphinscheduler config --set java_gateway.address 192.168.1.1 # Set multiple configuration # The output look like below: # Set configuration done. pydolphinscheduler config --set java_gateway.address 192.168.1.1 --set java_gateway.port 25334 # Set configuration not in leaf node will fail # The output look like below: # Raise error. pydolphinscheduler config --set java_gateway 192.168.1.1,25334,True ``` -------------------------------- ### Install from PyPi Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/RELEASE.md Command to install the package from the official PyPi repository. ```shell python3 -m pip install apache-dolphinscheduler ``` -------------------------------- ### Install Pre-commit Hooks Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Commands to install pre-commit and initialize git hooks for local development. ```shell python -m pip install pre-commit ``` ```shell pre-commit install ``` -------------------------------- ### YAML Configuration Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/spark.md Example of configuring a Spark task in a YAML file for DolphinScheduler. ```APIDOC ## YAML Configuration for Spark Task ### Description This example shows how to define a Spark task within a DolphinScheduler workflow using a YAML configuration file. ### Method N/A (This is a YAML configuration example) ### Endpoint N/A ### Parameters N/A ### Request Example ```yaml # Define the workflow workflow: name: "Spark" # Define the tasks within the workflow tasks: - name: task task_type: Spark main_class: org.apache.spark.examples.SparkPi main_package: test_java.jar program_type: SCALA deploy_mode: local ``` ### Response N/A ``` -------------------------------- ### YAML Example: SQL Task Configuration Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sql.md Examples demonstrating how to configure SQL tasks in a YAML file for different scenarios, including multi-line SQL, file-based SQL, and single-line definitions. ```APIDOC ## YAML File Example ### Workflow Definition ```yaml workflow: name: "Sql" ``` ### Tasks Definition ```yaml tasks: - name: task_base task_type: Sql datasource_name: "db" sql: show tables; - name: task_multi_line task_type: Sql datasource_name: "db" sql: | show tables; select id from version where id=1; - name: task_file task_type: Sql datasource_name: "db" sql: $FILE{"example_sql.sql"} # Or you can define task "task_union" it with one line - { "task_type": "Sql", "name": "task_base_one_line", "datasource_name": "db", "sql": "select id from version where id=1;"} # Or you can define task "task_union" it with one line - { "task_type": "Sql", "name": "task_file_one_line", "datasource_name": "db", "sql": '$FILE{"example_sql.sql"}'} ``` ### example_sql.sql Content ```sql select id from version where id=1; select id from version where id=2; select id from version where id=3; select id from version where id=4; select id from version where id=5; ``` ``` -------------------------------- ### Install and Run Tox Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Commands to install tox and execute the local CI test suite. ```shell python -m pip install --upgrade tox ``` ```shell tox -e local-ci ``` -------------------------------- ### SQL File Content Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sql.md Example content for an external SQL file referenced by a task. ```sql select id from version where id=1; select id from version where id=2; select id from version where id=3; select id from version where id=4; select id from version where id=5; ``` -------------------------------- ### Install from TestPyPi Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/RELEASE.md Command to install the package from TestPyPi. Use the --no-deps flag to avoid conflicts with existing installations. ```shell python3 -m pip install --index-url https://test.pypi.org/simple/ --no-deps apache-dolphinscheduler ``` -------------------------------- ### Install Development Dependencies Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Install all necessary dependencies for development, testing, and code style checks using pip. ```shell python -m pip install -e '.[dev]' ``` -------------------------------- ### Install and Verify PyDolphinScheduler Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/README.md Install the package via pip and verify the installation by checking the version. ```shell # Install python -m pip install apache-dolphinscheduler # Verify installation is successful, it will show the version of apache-dolphinscheduler, here we use 0.1.0 as example pydolphinscheduler version # 0.1.0 ``` -------------------------------- ### Run Example Script Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Executes the downloaded tutorial Python script. This will submit a workflow to the DolphinScheduler API server. ```bash python tutorial.py ``` -------------------------------- ### Python SDK Example: Creating and Running a Sub Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sub_workflow.md A comprehensive Python example showing the creation of a sub-workflow, a main workflow containing a SubWorkflow task, and the execution of the main workflow. ```APIDOC ## Python SDK Usage ### Description This example illustrates how to use the Python SDK to define a sub-workflow, then define a main workflow that includes a task to run the sub-workflow. It covers task dependencies and workflow submission. ### Code Example ```python from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.sub_workflow import SubWorkflow # Define the sub-workflow with Workflow(name="sub_workflow_downstream") as wf_downstream: sub_workflow_ds_task = Shell( name="task_sub_workflow", command="echo 'call sub workflow success!'", workflow=wf_downstream, ) wf_downstream.submit() # Define the main workflow with Workflow(name="task_sub_workflow_example") as wf_upstream: sub_workflow_pre = Shell( name="pre-task", command="echo 'prefix task for sub workflow'", workflow=wf_upstream, ) # Declare the SubWorkflow task, referencing the previously defined sub-workflow sw_task = SubWorkflow( name="sub_workflow", workflow_name=wf_downstream.name, # Name of the workflow to be triggered workflow=wf_upstream, # The workflow object this task belongs to ) sub_workflow_pre >> sw_task # Run the main workflow. This will trigger the sub-workflow execution. wf_upstream.run() ``` ### Notes - Ensure the workflow specified by `workflow_name` exists in the DolphinScheduler project before running the main workflow. - The `SubWorkflow` task will trigger a new run of the specified sub-workflow. ``` -------------------------------- ### SageMaker Task YAML Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sagemaker.md An example demonstrating how to configure a SageMaker task in a YAML file for DolphinScheduler. ```APIDOC ## YAML Workflow Example ### Description Example of defining a SageMaker task within a DolphinScheduler workflow using YAML. ### YAML Configuration ```yaml # Define the workflow workflow: name: "Sagemaker" release_state: "offline" # Define the tasks within the workflow tasks: - name: sagemaker task_type: Sagemaker sagemaker_request_json: $FILE{"example_sagemaker_params.json"} ``` ### example_sagemaker_params.json ```json { "ParallelismConfiguration":{ "MaxParallelExecutionSteps":1 }, "PipelineExecutionDescription":"run pipeline using ds", "PipelineExecutionDisplayName":"ds-sagemaker-pipeline", "PipelineName":"DsSagemakerPipeline", "PipelineParameters":[ { "Name":"InputData", "Value": "s3://sagemaker/dataset/dataset.csv" }, { "Name":"InferenceData", "Value": "s3://sagemaker/dataset/inference.csv" } ] } ``` ``` -------------------------------- ### Python Workflow Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/spark.md Example of creating a Spark task within a workflow using the PyDolphinScheduler SDK. ```APIDOC ## Example Workflow with Spark Task ### Description This example demonstrates how to define a workflow that includes a Spark task using the PyDolphinScheduler Python SDK. ### Method N/A (This is a Python script example) ### Endpoint N/A ### Parameters N/A ### Request Example ```python """A example workflow for task spark.""" from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.spark import DeployMode, ProgramType, Spark with Workflow(name="task_spark_example") as workflow: task = Spark( name="task_spark", main_class="org.apache.spark.examples.SparkPi", main_package="spark-examples_2.12-3.2.0.jar", program_type=ProgramType.JAVA, deploy_mode=DeployMode.LOCAL, ) workflow.run() ``` ### Response N/A ``` -------------------------------- ### Execution Output Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/howto/multi-resources.md Example output after running the workflow. ```text 2022-11-29 16:16:51.952742 ``` -------------------------------- ### Download Example Script Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Downloads the tutorial Python script from a GitHub URL. This script can be used to test PyDolphinScheduler functionality. ```bash wget https://raw.githubusercontent.com/apache/dolphinscheduler-sdk-python/main/src/pydolphinscheduler/examples/tutorial.py ``` -------------------------------- ### ResourcePlugin Usage Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/resources_plugin/resource-plugin.md Demonstrates how to use a ResourcePlugin, specifically the Local plugin, when initializing a Workflow and a Shell task. ```APIDOC ## Usage Example Resource plugins can be used in task subclasses and workflows by adding the `resource_plugin` parameter during initialization. ### Example: ```python from pydolphinscheduler.core.resource_plugin import Local from pydolphinscheduler.tasks import Shell from pydolphinscheduler.core.workflow import Workflow with Workflow( name="tutorial_resource_plugin", schedule="0 0 0 * * ? *", start_time="2021-01-01", resource_plugin=Local("/tmp") ) as workflow: file = "resource.sh" path = Path("/tmp").joinpath(file) with open(str(path), "w") as f: f.write("echo tutorial resource plugin") task_parent = Shell( name="local-resource-example", command=file, ) print(task_parent.task_params) os.remove(path) ``` **Note:** If `resource_plugin` is defined in both the task and the workflow, the task's definition takes precedence. If only the workflow defines it, the workflow's definition is used. If neither is defined, the command is executed as a script. ``` -------------------------------- ### Install PyDolphinScheduler from Source (Develop Mode) Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Clone the Apache DolphinScheduler repository and install PyDolphinScheduler in develop mode for unreleased features. This requires Git. ```bash # Clone Apache DolphinScheduler repository git clone git@github.com:apache/dolphinscheduler-sdk-python.git # Install PyDolphinScheduler in develop mode python -m pip install -e . ``` -------------------------------- ### Build Documentation Manually Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Commands to install documentation dependencies and build HTML or multi-version documentation manually. ```shell python -m pip install '.[doc]' ``` ```shell cd pydolphinscheduler/docs/ make clean && make html ``` ```shell # Fetch all history tags because we use tag to build history documents via [sphinx-multiversion](https://holzhaus.github.io/sphinx-multiversion/master/index.html) git fetch --tags cd pydolphinscheduler/docs/ make clean && make multiversion ``` -------------------------------- ### Full Example: Sub Workflow Declaration and Execution Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sub_workflow.md A comprehensive example demonstrating the declaration of a sub-workflow, its tasks, the SubWorkflow task itself, and finally running the main workflow. It highlights the need for the sub-workflow to exist. ```python # [start sub_workflow_declare] with Workflow(name="sub_workflow_downstream") as wf_downstream, Workflow( name="task_sub_workflow_example" ) as wf_upstream: sub_workflow_ds_task = Shell( name="task_sub_workflow", command="echo 'call sub workflow success!'", workflow=wf_downstream, ) wf_downstream.submit() # [end sub_workflow_declare] sub_workflow_pre = Shell( name="pre-task", command="echo 'prefix task for sub workflow'", workflow=wf_upstream, ) # [start sub_workflow_task_declare] sw_task = SubWorkflow( name="sub_workflow", workflow_name=wf_downstream.name, workflow=wf_upstream, ) # [end sub_workflow_task_declare] sub_workflow_pre >> sw_task # Please make sure workflow with name `wf_downstream.name` exists when we submit or run sub workflow task wf_upstream.run() ``` -------------------------------- ### YAML File Example for Sub Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sub_workflow.md An example demonstrating how to configure a SubWorkflow task using YAML, including the definition of both the main and sub-workflows. ```APIDOC ## YAML Configuration ### Description This section provides a YAML configuration example for defining workflows and tasks, specifically showcasing the use of the `SubWorkflow` task type. It includes two YAML files: one for the main workflow and another for the sub-workflow it references. ### Main Workflow YAML (`main_workflow.yaml`) ```yaml # Define the workflow workflow: name: "SubWorkflow" tasks: - name: example_workflow task_type: SubWorkflow # References the sub-workflow defined in example_sub_workflow.yaml workflow_name: "$WORKFLOW{example_sub_workflow.yaml}" - { "task_type": "Shell", "deps": [example_workflow], "name": "task_3", "command": "echo task 3" } ``` ### Sub-Workflow YAML (`example_sub_workflow.yaml`) ```yaml # Define the workflow workflow: name: "example_workflow_for_sub_workflow" # Define the tasks within the workflow tasks: - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" } - { "task_type": "Shell", "deps": [task_1], "name": "task_2", "command": "echo task 2" } - { "task_type": "Shell", "deps": [task_2], "name": "task_3", "command": "echo task 3" } ``` ### Notes - The `workflow_name` in the main workflow uses a special syntax `$WORKFLOW{filename.yaml}` to reference another workflow definition file. - Ensure that the referenced sub-workflow file (`example_sub_workflow.yaml` in this case) is accessible and correctly defined. ``` -------------------------------- ### Configure GitLab Resource Plugin Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/resources_plugin/gitlab.md Examples of initializing the GitLab resource plugin with different authentication strategies. ```python resource_plugin=GitLab(prefix="xxx") ``` ```python resource_plugin=GitLab(prefix="xxx", private_token="xxx") ``` ```python resource_plugin=GitLab(prefix="xxx", username="username", password="pwd") ``` ```python resource_plugin=GitLab(prefix="xxx", oauth_token="xx") ``` -------------------------------- ### Install PyDolphinScheduler from GitHub URL Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Install the latest PyDolphinScheduler package directly from its GitHub repository URL. This is an alternative to cloning the entire repository. ```bash # Must escape the '&' character by adding '\' pip install -e "git+https://github.com/apache/dolphinscheduler-sdk-python.git#egg=apache-dolphinscheduler" ``` -------------------------------- ### Check Python Installation Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Verify if Python is installed correctly by checking its version. Ensure you are using Python 3.9 or higher. ```bash python --version ``` -------------------------------- ### SQL Task Example Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sql.md A complete workflow demonstrating the usage of the SQL task with different configurations. ```python """A example workflow for task SQL.""" from pathlib import Path from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.resources_plugin import Local from pydolphinscheduler.tasks.sql import Sql, SqlType with Workflow( name="task_sql_example", ) as workflow: # [start bare_sql_desc] bare_sql = Sql( name="bare_sql", datasource_name="metadata", sql="select * from t_ds_version", ) # [end bare_sql_desc] # [start sql_file_desc] sql_file = Sql( name="sql_file", datasource_name="metadata", sql="ext/example.sql", sql_type=SqlType.SELECT, resource_plugin=Local(prefix=str(Path(__file__).parent)), ) # [end sql_file_desc] # [start sql_with_pre_post_desc] sql_with_pre_post = Sql( name="sql_with_pre_post", datasource_name="metadata", sql="select * from t_ds_version", pre_statements=[ "update table_one set version = '1.3.6'", "delete from table_two where version = '1.3.6'", ], post_statements="update table_one set version = '3.0.0'", ) # [end sql_with_pre_post_desc] bare_sql >> [ sql_file, sql_with_pre_post, ] workflow.submit() ``` -------------------------------- ### Define and Execute Multiple Resources in a Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/howto/multi-resources.md Demonstrates the complete workflow setup including resource creation and task execution. ```python from pydolphinscheduler.core import Workflow from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.tasks import Shell dependence = "dependence.py" main = "main.py" with Workflow( name="multi_resources_example", # [start create_new_resources] resource_list=[ Resource( name=dependence, content="from datetime import datetime\nnow = datetime.now()", ), Resource(name=main, content="from dependence import now\nprint(now)"), ], # [end create_new_resources] ) as workflow: # [start use_exists_resources] task_use_resource = Shell( name="use-resource", command=f"python {main}", resource_list=[ dependence, main, ], ) # [end use_exists_resources] workflow.run() ``` -------------------------------- ### SageMaker Task Python Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/sagemaker.md An example demonstrating how to create and configure a SageMaker task within a DolphinScheduler workflow using Python. ```APIDOC ## Python Workflow Example ### Description A example workflow for task sagemaker. ### Code ```python """A example workflow for task sagemaker.""" import json from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.sagemaker import SageMaker sagemaker_request_data = { "ParallelismConfiguration": {"MaxParallelExecutionSteps": 1}, "PipelineExecutionDescription": "test Pipeline", "PipelineExecutionDisplayName": "AbalonePipeline", "PipelineName": "AbalonePipeline", "PipelineParameters": [ {"Name": "ProcessingInstanceType", "Value": "ml.m4.xlarge"}, {"Name": "ProcessingInstanceCount", "Value": "2"}, ], } with Workflow( name="task_sagemaker_example", ) as workflow: task_sagemaker = SageMaker( name="task_sagemaker", sagemaker_request_json=json.dumps(sagemaker_request_data, indent=2), ) workflow.run() ``` ``` -------------------------------- ### Run Manual Linting Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Commands to install style dependencies and run isort, Black, and Flake8 manually. ```shell python -m pip install '.[style]' ``` ```shell # We recommend you run isort and Black before Flake8, because Black could auto fix some code style issue # but Flake8 just hint when code style not match pep8 # Run Isort python -m isort . # Run Black python -m black . # Run Flake8 python -m flake8 ``` -------------------------------- ### Start DolphinScheduler API Server Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Starts the API server which includes the Python gateway service. Ensure the Python gateway is enabled via environment variables or configuration files. ```bash ./bin/dolphinscheduler-daemon.sh start api-server ``` ```bash jps # .... # 201472 ApiApplicationServer # .... ``` -------------------------------- ### Configure Remote API Server and Run Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Configures PyDolphinScheduler to connect to a remote API server and then runs the example script. This is useful when the workflow script is separate from the API server. ```bash pydolphinscheduler config --init ``` ```bash pydolphinscheduler config --set java_gateway.address ``` ```bash python tutorial.py ``` -------------------------------- ### YAML File Structure Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tutorial.md This illustrates a typical directory structure for organizing DolphinScheduler workflow YAML files and associated resources. ```bash . └── yaml_define ├── Condition.yaml ├── DataX.yaml ├── Dependent_External.yaml ├── Dependent.yaml ├── example_datax.json ├── example_sql.sql ├── example_sub_workflow.yaml ├── Flink.yaml ├── Http.yaml ├── MapReduce.yaml ├── MoreConfiguration.yaml ├── Procedure.yaml ├── Python.yaml ├── Shell.yaml ├── Spark.yaml ├── Sql.yaml ├── SubWorkflow.yaml └── Switch.yaml ``` -------------------------------- ### Start DolphinScheduler Gateway Source: https://context7.com/apache/dolphinscheduler-sdk-python/llms.txt Instructions for enabling the Python Gateway service via Docker or environment variables. ```bash # Using Docker (recommended for quick start) DOLPHINSCHEDULER_VERSION=3.1.1 docker run --name dolphinscheduler-standalone-server \ -p 12345:12345 \ -p 25333:25333 \ -e API_PYTHON_GATEWAY_ENABLED="true" \ -d apache/dolphinscheduler-standalone-server:"${DOLPHINSCHEDULER_VERSION}" # Or enable via environment variable for existing installation export API_PYTHON_GATEWAY_ENABLED="true" ./bin/dolphinscheduler-daemon.sh start api-server # Verify the server is running jps # Should show: ApiApplicationServer ``` -------------------------------- ### Run integration tests with IntelliJ IDEA Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/CONTRIBUTING.md Starts the standalone server in the IDE and runs integration tests via tox. ```shell # Clone apache/dolphinscheduler repository cd git clone git@github.com:apache/dolphinscheduler.git # Run apache/dolphinscheduler's Standalone Server in IntelliJ IDEA according to # https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/contribute/development-environment-setup.html # Go to dolphinscheduler-sdk/python root directory and run integrate tests via tox tox -e local-integrate-test ``` -------------------------------- ### Start DolphinScheduler via Docker Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/README.md Run a standalone DolphinScheduler server using Docker with the Python gateway enabled. ```shell # Change the version of dolphinscheduler to the version you want to use, here we use 3.1.1 as example DOLPHINSCHEDULER_VERSION=3.1.1 docker run --name dolphinscheduler-standalone-server -p 12345:12345 -p 25333:25333 -e API_PYTHON_GATEWAY_ENABLED="true" -d apache/dolphinscheduler-standalone-server:"${DOLPHINSCHEDULER_VERSION}" ``` -------------------------------- ### Install Specific PyDolphinScheduler Version Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Install a specific pre-release version of PyDolphinScheduler by specifying the version number. For example, to install version 3.0.0-beta-2. ```bash python -m pip install apache-dolphinscheduler==3.0.0b2 ``` -------------------------------- ### Kubernetes Task Example Source: https://context7.com/apache/dolphinscheduler-sdk-python/llms.txt Defines a Kubernetes task to run containerized workloads. Specify the container image, namespace, and resource requirements. ```python from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.kubernetes import Kubernetes with Workflow(name="k8s_workflow") as workflow: k8s_task = Kubernetes( name="k8s_job", image="my-registry.com/my-app:latest", namespace='{"name": "default", "cluster": "production"}', min_cpu_cores=2.0, min_memory_space=1024.0, # MB ) workflow.submit() ``` -------------------------------- ### Run Example Script Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/README.md Execute the tutorial script to create and trigger a new workflow in DolphinScheduler. Ensure the tenant value in tutorial.py is updated to a valid username on your host. ```bash python ./tutorial.py ``` -------------------------------- ### Install PyDolphinScheduler using pip Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/start.md Install the latest stable version of PyDolphinScheduler from PyPI. This command installs the package and its dependencies. ```bash python -m pip install apache-dolphinscheduler ``` -------------------------------- ### Dependent_External.yaml Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/dependent.md Example of a workflow definition file that can be referenced in dependent task configurations. ```APIDOC ## YAML Example: Dependent_External.yaml ### Description This is an example of a workflow definition file named `Dependent_External.yaml`. This file defines a simple workflow with three shell tasks, which can be referenced by other workflows using the `$WORKFLOW{"Dependent_External.yaml"}` syntax in their dependent task configurations. ### Example Structure ```yaml # Define the workflow workflow: name: "task_dependent_external" # Define the tasks within the workflow tasks: - { "task_type": "Shell", "name": "task_1", "command": "echo task 1" } - { "task_type": "Shell", "name": "task_2", "command": "echo task 2" } - { "task_type": "Shell", "name": "task_3", "command": "echo task 3" } ``` ### Key Fields - **workflow.name**: The name of the workflow. - **tasks**: A list of task definitions within the workflow. - **task_type**: The type of the task (e.g., 'Shell'). - **name**: The name of the task. - **command**: The command to execute for a Shell task. ``` -------------------------------- ### Manage Configuration Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/cli.md Commands for initializing, setting, or retrieving configuration values. ```shell pydolphinscheduler config [OPTIONS] ``` -------------------------------- ### YAML Example: Dependent Task Configuration Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/dependent.md Example of how to configure dependent tasks in a workflow YAML file, demonstrating 'and' and 'or' operators. ```APIDOC ## YAML Example: Dependent Task Configuration ### Description This example demonstrates how to define a 'Dependent' task type within a workflow YAML file. It showcases the use of 'op' (operator) to specify 'and' or 'or' logic for task dependencies, and 'groups' to list the dependent items or nested operator groups. ### Example Structure ```yaml workflow: name: "Dependent" tasks: - name: dependent task_type: Dependent denpendence: op: and groups: - op: or groups: - project_name: pydolphin workflow_name: task_dependent_external dependent_task_name: task_1 - project_name: pydolphin workflow_name: task_dependent_external dependent_task_name: task_2 - op: and groups: - project_name: pydolphin workflow_name: task_dependent_external dependent_task_name: task_1 dependent_date: LAST_WEDNESDAY - project_name: pydolphin workflow_name: task_dependent_external dependent_task_name: task_2 dependent_date: last24Hours - name: dependent_var task_type: Dependent denpendence: op: and groups: - op: or groups: - project_name: ${CONFIG.WORKFLOW_PROJECT} workflow_name: $WORKFLOW{"Dependent_External.yaml"} dependent_task_name: task_1 - project_name: ${CONFIG.WORKFLOW_PROJECT} workflow_name: $WORKFLOW{"Dependent_External.yaml"} dependent_task_name: task_2 - op: and groups: - project_name: ${CONFIG.WORKFLOW_PROJECT} workflow_name: $WORKFLOW{"Dependent_External.yaml"} dependent_task_name: task_1 dependent_date: LAST_WEDNESDAY - project_name: ${CONFIG.WORKFLOW_PROJECT} workflow_name: $WORKFLOW{"Dependent_External.yaml"} dependent_task_name: task_2 dependent_date: last24Hours ``` ### Key Fields - **name**: Name of the task. - **task_type**: Must be 'Dependent'. - **denpendence**: Placeholder for dependency configuration. - **op**: Operator for the group ('and' or 'or'). - **groups**: A list of dependent items or nested operator groups. - **project_name**: Name of the project for the dependency. - **workflow_name**: Name of the workflow for the dependency. Can use variables like `${CONFIG.WORKFLOW_PROJECT}` or `$WORKFLOW{"filename.yaml"}`. - **dependent_task_name**: Name of the specific task to depend on. - **dependent_date**: Specifies the date for the dependency (e.g., 'LAST_WEDNESDAY', 'last24Hours', 'today'). ``` -------------------------------- ### Condition Task Example Workflow Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/condition.md An example workflow demonstrating the usage of the Condition task, including its upstream and downstream dependencies, and success/failure branches. ```APIDOC ## Example Workflow for Condition Task This example illustrates how to create a workflow with a Condition task. The workflow includes four shell tasks and one Condition task. The Condition task has an explicit upstream dependency and automatically sets downstream dependencies based on its evaluation. The workflow graph is structured as follows: pre_task_1 -> -> success_branch \ pre_task_2 -> -> conditions -> / pre_task_3 -> -> fail_branch ### Python Code Example ```python from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Condition from pydolphinscheduler.tasks.shell import Shell with Workflow(name="task_condition_example") as workflow: pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1") pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2") pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3") cond_operator = And( And( SUCCESS(pre_task_1, pre_task_2), FAILURE(pre_task_3), ), ) success_branch = Shell(name="success_branch", command="echo success_branch") fail_branch = Shell(name="fail_branch", command="echo fail_branch") condition = Condition( name="condition", condition=cond_operator, success_task=success_branch, failed_task=fail_branch, ) workflow.submit() ``` ``` -------------------------------- ### Export Configuration to Custom Path Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/config.md Sets a custom directory for the configuration file before initialization. ```bash export PYDS_HOME= pydolphinscheduler config --init ``` -------------------------------- ### Closing Vote Mail Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/RELEASE.md An example of a closing vote mail to announce the release result. This should include the number of binding and non-binding votes and list the names of voters. ```text [RESULT][VOTE] Release Apache DolphinScheduler Python version $VERSION 72+ hours passed, we’ve got ($NUMBER) +1 bindings (and ... +1 non-bindings): (list names) +1 bindings: xxx ... +1 non-bindings: xxx ... Thank you for voting, I’ll continue the release process. ``` -------------------------------- ### Initialize Resource Plugin with Prefix Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/resources_plugin/develop.md The __init__ method initializes the resource plugin with a prefix. This method can be overridden if necessary. ```python def __init__(self, prefix: str, *args, **kwargs): super().__init__(prefix, *args, **kwargs) ``` -------------------------------- ### Pytorch Task Workflow Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/pytorch.md This snippet demonstrates creating a workflow with multiple Pytorch tasks, showcasing different environment creation and configuration options. It includes tasks for running with an existing environment, creating a conda environment, and creating a virtualenv environment. ```python """A example workflow for task pytorch.""" from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.pytorch import Pytorch with Workflow( name="task_pytorch_example", ) as workflow: # run project with existing environment task_existing_env = Pytorch( name="task_existing_env", script="main.py", script_params="--dry-run --no-cuda", project_path="https://github.com/pytorch/examples#mnist", python_command="/home/anaconda3/envs/pytorch/bin/python3", ) # run project with creating conda environment task_conda_env = Pytorch( name="task_conda_env", script="main.py", script_params="--dry-run --no-cuda", project_path="https://github.com/pytorch/examples#mnist", is_create_environment=True, python_env_tool="conda", requirements="requirements.txt", conda_python_version="3.7", ) # run project with creating virtualenv environment task_virtualenv_env = Pytorch( name="task_virtualenv_env", script="main.py", script_params="--dry-run --no-cuda", project_path="https://github.com/pytorch/examples#mnist", is_create_environment=True, python_env_tool="virtualenv", requirements="requirements.txt", ) # [start resource_limit] pytorch_resources_limit = Pytorch( name="pytorch_resources_limit", script="main.py", script_params="--dry-run --no-cuda", project_path="https://github.com/pytorch/examples#mnist", python_command="/home/anaconda3/envs/pytorch/bin/python3", cpu_quota=1, memory_max=100, ) # [end resource_limit] workflow.submit() ``` -------------------------------- ### Execute Shell Tasks Source: https://context7.com/apache/dolphinscheduler-sdk-python/llms.txt Demonstrates defining shell tasks with parameters and resource constraints. ```python from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.shell import Shell with Workflow(name="shell_workflow") as workflow: # Simple command task_simple = Shell(name="simple_task", command="echo 'Hello'") # Multi-line command with parameters task_params = Shell( name="parameterized_task", command=""" echo "Parameter 1: ${param1}" echo "Parameter 2: ${param2}" echo "Current date: ${date_param}" """, params={ "param1": "value1", "param2": 123, "date_param": "$[yyyy-MM-dd]" # Built-in date parameter }, ) # Task with resource limits task_limited = Shell( name="limited_task", command="echo 'resource limited'", cpu_quota=1, # CPU cores memory_max=100, # Memory in MB ) task_simple >> task_params >> task_limited workflow.run() ``` -------------------------------- ### Initialize Local resource plugin Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/resources_plugin/local.md Use the Local class to define a resource plugin with a specific file system prefix. ```python resource_plugin=Local("/tmp") ``` -------------------------------- ### Python Function Decorator - Full Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/func_wrap.md This comprehensive example demonstrates how to define a workflow using the `@task` decorator for Python functions. It includes importing necessary components, defining tasks with various dependencies, setting up task relationships, and submitting the workflow. ```python import time from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.func_wrap import task scope_global = "global-var" @task def print_something(): """First task in this workflow.""" print("hello python function wrap task") @task def depend_import(): """Depend on import module.""" time.sleep(2) @task def depend_global_var(): """Depend on global var.""" print(f"Use global variable {scope_global}") @task def depend_local_var(): """Depend on local variable.""" scope_global = "local" print(f"Use local variable overwrite global {scope_global}") def foo(): """Call in other task.""" print("this is a global function") @task def depend_func(): """Depend on global function.""" foo() with Workflow( name="tutorial_decorator", schedule="0 0 0 * * ? *", start_time="2021-01-01", ) as workflow: task_group = [depend_import(), depend_global_var()] print_something().set_downstream(task_group) task_group >> depend_local_var() >> depend_func() workflow.submit() ``` -------------------------------- ### Dependent Task Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/dependent.md A comprehensive example demonstrating how to create and configure dependent tasks in Apache DolphinScheduler using the Python SDK. It includes the creation of two workflows: one for external tasks and another that depends on the completion of specific tasks from the external workflow. ```APIDOC ## Workflow: task_dependent_external ### Description This workflow defines three shell tasks that will be executed sequentially or in parallel as needed. ### Tasks - **task_1**: A shell task that echoes 'task 1'. - **task_2**: A shell task that echoes 'task 2'. - **task_3**: A shell task that echoes 'task 3'. ### Request Example ```python from pydolphinscheduler import configuration from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.shell import Shell with Workflow( name="task_dependent_external", ) as workflow: task_1 = Shell(name="task_1", command="echo task 1") task_2 = Shell(name="task_2", command="echo task 2") task_3 = Shell(name="task_3", command="echo task 3") workflow.submit() ``` ## Workflow: task_dependent_example ### Description This workflow defines a dependent task that waits for specific tasks from the `task_dependent_external` workflow to complete before executing. ### Tasks - **task_dependent**: A dependent task that requires `task_1` and `task_2` from the `task_dependent_external` workflow to be successfully completed. The dependency is defined using an AND condition composed of OR conditions for individual task dependencies. ### Request Example ```python from pydolphinscheduler import configuration from pydolphinscheduler.core.workflow import Workflow from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or with Workflow( name="task_dependent_example", ) as workflow: task = Dependent( name="task_dependent", dependence=And( Or( DependentItem( project_name=configuration.WORKFLOW_PROJECT, workflow_name="task_dependent_external", dependent_task_name="task_1", ), DependentItem( project_name=configuration.WORKFLOW_PROJECT, workflow_name="task_dependent_external", dependent_task_name="task_2", ), ) ), ) workflow.submit() ``` ``` -------------------------------- ### Check Version Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/cli.md Displays the current version of the installed PyDolphinScheduler package. ```shell pydolphinscheduler version [OPTIONS] ``` -------------------------------- ### Model Definition Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/api.md Methods for getting object definition attributes for models. ```APIDOC ## Model Definition ### Description Methods for getting object definition attributes for models. ### Methods #### get_define(camel_attr: bool = True) → dict Get object definition attribute communicate to Java gateway server. use attribute self._DEFINE_ATTR to determine which attributes should including when object tries to communicate with Java gateway server. #### get_define_custom(camel_attr: bool = True, custom_attr: set = None) → dict Get object definition attribute by given attr set. ``` -------------------------------- ### Python Function Decorator Example Source: https://github.com/apache/dolphinscheduler-sdk-python/blob/main/docs/source/tasks/func_wrap.md This example demonstrates how to use the `@task` decorator to define multiple Python tasks within a workflow. It includes tasks that depend on imported modules, global variables, local variables, and global functions. The workflow structure and task dependencies are also defined. ```python import time @task def depend_import(): """Depend on import module.""" time.sleep(2) ``` ```python scope_global = "global-var" @task def depend_global_var(): """Depend on global var.""" print(f"Use global variable {scope_global}") ``` ```python @task def depend_local_var(): """Depend on local variable.""" scope_global = "local" print(f"Use local variable overwrite global {scope_global}") ``` ```python def foo(): """Call in other task.""" print("this is a global function") @task def depend_func(): """Depend on global function.""" foo() ``` ```python # [start package_import] # Import Workflow object to define your workflow attributes from pydolphinscheduler.core.workflow import Workflow # Import task Shell object cause we would create some shell tasks later from pydolphinscheduler.tasks.func_wrap import task # [end package_import] scope_global = "global-var" # [start task_declare] @task def print_something(): """First task in this workflow.""" print("hello python function wrap task") @task def depend_import(): """Depend on import module.""" time.sleep(2) @task def depend_global_var(): """Depend on global var.""" print(f"Use global variable {scope_global}") @task def depend_local_var(): """Depend on local variable.""" scope_global = "local" print(f"Use local variable overwrite global {scope_global}") def foo(): """Call in other task.""" print("this is a global function") @task def depend_func(): """Depend on global function.""" foo() # [end task_declare] # [start workflow_declare] with Workflow( name="tutorial_decorator", schedule="0 0 0 * * ? *", start_time="2021-01-01", ) as workflow: # [end workflow_declare] # [start task_relation_declare] task_group = [depend_import(), depend_global_var()] print_something().set_downstream(task_group) task_group >> depend_local_var() >> depend_func() # [end task_relation_declare] # [start submit_or_run] workflow.submit() # [end submit_or_run] ```