### Main Function: Producer-Worker Setup and Verification (Python) Source: https://docs.openstack.org/taskflow/latest/user/examples The main function initializes producers and workers using daemon threads. It includes a workaround for eventlet and uses a fake client for testing. After starting all threads, it joins them and then verifies the job count and consumed units using a verifier backend. Dependencies include 'contextlib', 'fake_client', 'threading_utils', 'producer', 'worker', 'collections', 'backends', 'SHARED_CONF', and 'EXPECTED_UNITS'. ```python def main(): # TODO(harlowja): Hack to make eventlet work right, remove when the # following is fixed: https://github.com/eventlet/eventlet/issues/230 from taskflow.utils import eventlet_utils as _eu # noqa try: import eventlet as _eventlet # noqa except ImportError: pass with contextlib.closing(fake_client.FakeClient()) as c: created = [] for i in range(0, PRODUCERS): p = threading_utils.daemon_thread(producer, i + 1, c) created.append(p) p.start() consumed = collections.deque() for i in range(0, WORKERS): w = threading_utils.daemon_thread(worker, i + 1, c, consumed) created.append(w) w.start() while created: t = created.pop() t.join() # At the end there should be nothing leftover, let's verify that. board = backends.fetch('verifier', SHARED_CONF.copy(), client=c) board.connect() with contextlib.closing(board): if board.job_count != 0 or len(consumed) != EXPECTED_UNITS: return 1 return 0 if __name__ == "__main__": sys.exit(main()) ``` -------------------------------- ### Taskflow Conductor Simulation Setup (Python) Source: https://docs.openstack.org/taskflow/latest/user/examples Initializes logging, sets up the top directory for imports, and defines configuration for the job board and runtime parameters. It imports necessary modules from Taskflow, oslo_utils, and zake for simulating ZooKeeper. ```python import itertools import logging import os import shutil import socket import sys import tempfile import threading import time logging.basicConfig(level=logging.ERROR) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) from oslo_utils import timeutils from oslo_utils import uuidutils from zake import fake_client from taskflow.conductors import backends as conductors from taskflow import engines from taskflow.jobs import backends as boards from taskflow.patterns import linear_flow from taskflow.persistence import backends as persistence from taskflow.persistence import models from taskflow import task from taskflow.utils import threading_utils # INTRO: This examples shows how a worker/producer can post desired work (jobs) to a jobboard and a conductor can consume that work (jobs) from that jobboard and execute those jobs in a reliable & async manner (for example, if the conductor were to crash then the job will be released back onto the jobboard and another conductor can attempt to finish it, from wherever that job last left off). # # In this example a in-memory jobboard (and in-memory storage) is created and used that simulates how this would be done at a larger scale (it is an example after all). # Restrict how long this example runs for... RUN_TIME = 5 REVIEW_CREATION_DELAY = 0.5 SCAN_DELAY = 0.1 NAME = f"{socket.getfqdn()}_{os.getpid()}" # This won't really use zookeeper but will use a local version of it using # the zake library that mimics an actual zookeeper cluster using threads and # an in-memory data structure. JOBBOARD_CONF = { 'board': 'zookeeper://localhost?path=/taskflow/tox/jobs', } ``` -------------------------------- ### Taskflow Workflow Factory and Producer Setup (Python) Source: https://docs.openstack.org/taskflow/latest/user/examples Provides a factory method `create_review_workflow` to construct a linear Taskflow. It also includes `review_iter` to generate review data and `generate_reviewer` to set up a producer thread that posts jobs to the job board. ```python def review_iter(): """Makes reviews (never-ending iterator/generator).""" review_id_gen = itertools.count(0) while True: review_id = next(review_id_gen) review = { 'id': review_id, } yield review # The reason this is at the module namespace level is important, since it must # be accessible from a conductor dispatching an engine, if it was a lambda # function for example, it would not be reimportable and the conductor would # be unable to reference it when creating the workflow to run. def create_review_workflow(): """Factory method used to create a review workflow to run.""" f = linear_flow.Flow("tester") f.add( MakeTempDir(name="maker"), RunReview(name="runner"), CleanResources(name="cleaner") ) return f def generate_reviewer(client, saver, name=NAME): """Creates a review producer thread with the given name prefix.""" real_name = "%s_reviewer" % name no_more = threading.Event() jb = boards.fetch(real_name, JOBBOARD_CONF, client=client, persistence=saver) def make_save_book(saver, review_id): # Record what we want to happen (sometime in the future). book = models.LogBook("book_%s" % review_id) detail = models.FlowDetail("flow_%s" % review_id, uuidutils.generate_uuid()) book.add(detail) # Associate the factory method we want to be called (in the future) # with the book, so that the conductor will be able to call into # that factory to retrieve the workflow objects that represent the # work. ``` -------------------------------- ### Taskflow Worker-Based Engine with Filesystem Transport (Python) Source: https://docs.openstack.org/taskflow/latest/user/workers Illustrates the setup of a Taskflow worker-based engine utilizing the filesystem transport. This example specifies the transport type and provides options for input and output data folders, alongside exchange and topic configurations. ```python flow = lf.Flow('simple-linear').add(...) eng = taskflow.engines.load(flow, engine='worker-based', exchange='test-exchange', topics=['topic1', 'topic2'], transport='filesystem', transport_options={ 'data_folder_in': '/tmp/in', 'data_folder_out': '/tmp/out', }) eng.run() ``` -------------------------------- ### Python: TaskFlow Worker-Based Engine Setup and Execution Source: https://docs.openstack.org/taskflow/latest/user/examples This Python code demonstrates setting up and running a TaskFlow workflow using the worker-based engine. It configures workers, defines a linear flow with tasks, and executes the flow, with tasks being processed by the simulated workers. It handles transport configuration (memory or filesystem) and worker management. ```python import logging import os import sys import tempfile import json from taskflow import engines from taskflow.engines.worker_based import worker from taskflow.patterns import linear_flow as lf from taskflow.tests import utils from taskflow.utils import threading_utils import example_utils # noqa # INTRO: This example walks through a miniature workflow which shows how to # start up a number of workers (these workers will process task execution and # reversion requests using any provided input data) and then use an engine # that creates a set of *capable* tasks and flows (the engine can not create # tasks that the workers are not able to run, this will end in failure) that # those workers will run and then executes that workflow seamlessly using the # workers to perform the actual execution. # # NOTE(harlowja): this example simulates the expected larger number of workers # by using a set of threads (which in this example simulate the remote workers # that would typically be running on other external machines). # A filesystem can also be used as the queue transport (useful as simple # transport type that does not involve setting up a larger mq system). If this # is false then the memory transport is used instead, both work in standalone # setups. USE_FILESYSTEM = False BASE_SHARED_CONF = { 'exchange': 'taskflow', } # Until https://github.com/celery/kombu/issues/398 is resolved it is not # recommended to run many worker threads in this example due to the types # of errors mentioned in that issue. MEMORY_WORKERS = 2 FILE_WORKERS = 1 WORKER_CONF = { # These are the tasks the worker can execute, they *must* be importable, # typically this list is used to restrict what workers may execute to # a smaller set of *allowed* tasks that are known to be safe (one would # not want to allow all python code to be executed). 'tasks': [ 'taskflow.tests.utils:TaskOneArgOneReturn', 'taskflow.tests.utils:TaskMultiArgOneReturn' ], } def run(engine_options): flow = lf.Flow('simple-linear').add( utils.TaskOneArgOneReturn(provides='result1'), utils.TaskMultiArgOneReturn(provides='result2') ) eng = engines.load(flow, store=dict(x=111, y=222, z=333), engine='worker-based', **engine_options) eng.run() return eng.storage.fetch_all() if __name__ == "__main__": logging.basicConfig(level=logging.ERROR) # Setup our transport configuration and merge it into the worker and # engine configuration so that both of those use it correctly. shared_conf = dict(BASE_SHARED_CONF) tmp_path = None if USE_FILESYSTEM: worker_count = FILE_WORKERS tmp_path = tempfile.mkdtemp(prefix='wbe-example-') shared_conf.update({ 'transport': 'filesystem', 'transport_options': { 'data_folder_in': tmp_path, 'data_folder_out': tmp_path, 'polling_interval': 0.1, }, }) else: worker_count = MEMORY_WORKERS shared_conf.update({ 'transport': 'memory', 'transport_options': { 'polling_interval': 0.1, }, }) worker_conf = dict(WORKER_CONF) worker_conf.update(shared_conf) engine_options = dict(shared_conf) workers = [] worker_topics = [] try: # Create a set of workers to simulate actual remote workers. print('Running %s workers.' % (worker_count)) for i in range(0, worker_count): worker_conf['topic'] = 'worker-%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) # Now use those workers to do something. print('Executing some work.') engine_options['topics'] = worker_topics result = run(engine_options) print('Execution finished.') # This is done so that the test examples can work correctly # even when the keys change order (which will happen in various # python versions). print("Result = %s" % json.dumps(result, sort_keys=True)) finally: # And cleanup. pass ``` -------------------------------- ### TaskFlow Hello World Example Source: https://docs.openstack.org/taskflow/latest/user/examples This example demonstrates the basic usage of TaskFlow, equivalent to a 'Hello World' program. It shows how to create a simple workflow with linear and unordered flows, define custom tasks, and run the workflow using different execution engines like serial, threaded, processes, and eventlet. It also illustrates how to inject data into tasks and collect execution statistics. ```python import os import sys logging.basicConfig(level=logging.ERROR) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) from taskflow import engines from taskflow.patterns import linear_flow as lf from taskflow.patterns import unordered_flow as uf from taskflow import task # INTRO: This is the defacto hello world equivalent for taskflow; it shows how # an overly simplistic workflow can be created that runs using different # engines using different styles of execution (all can be used to run in # parallel if a workflow is provided that is parallelizable). class PrinterTask(task.Task): def __init__(self, name, show_name=True, inject=None): super().__init__(name, inject=inject) self._show_name = show_name def execute(self, output): if self._show_name: print(f"{self.name}: {output}") else: print(output) # This will be the work that we want done, which for this example is just to # print 'hello world' (like a song) using different tasks and different # execution models. song = lf.Flow("beats") # Unordered flows when ran can be ran in parallel; and a chorus is everyone # singing at once of course! hi_chorus = uf.Flow('hello') world_chorus = uf.Flow('world') for (name, hello, world) in [('bob', 'hello', 'world'), ('joe', 'hellooo', 'worllllld'), ('sue', "helloooooo!", 'wooorllld!')]: hi_chorus.add(PrinterTask("%s@hello" % name, # This will show up to the execute() method of # the task as the argument named 'output' (which # will allow us to print the character we want). inject={'output': hello})) world_chorus.add(PrinterTask("%s@world" % name, inject={'output': world})) # The composition starts with the conductor and then runs in sequence with # the chorus running in parallel, but no matter what the 'hello' chorus must # always run before the 'world' chorus (otherwise the world will fall apart). song.add(PrinterTask("conductor@begin", show_name=False, inject={'output': "*ding*"}), hi_chorus, world_chorus, PrinterTask("conductor@end", show_name=False, inject={'output': "*dong*"})) # Run in parallel using eventlet green threads... try: import eventlet as _eventlet # noqa except ImportError: # No eventlet currently active, skip running with it... pass else: print("-- Running in parallel using eventlet --") e = engines.load(song, executor='greenthreaded', engine='parallel', max_workers=1) e.run() # Run in parallel using real threads... print("-- Running in parallel using threads --") e = engines.load(song, executor='threaded', engine='parallel', max_workers=1) e.run() # Run in parallel using external processes... print("-- Running in parallel using processes --") e = engines.load(song, executor='processes', engine='parallel', max_workers=1) e.run() # Run serially (aka, if the workflow could have been ran in parallel, it will # not be when ran in this mode)... print("-- Running serially --") e = engines.load(song, engine='serial') e.run() print("-- Statistics gathered --") print(e.statistics) ``` -------------------------------- ### TaskFlow Example: Building a Car with Python Source: https://docs.openstack.org/taskflow/latest/user/examples This Python code defines functions for building car parts and installing them, then orchestrates their execution using TaskFlow's linear and graph flows. It includes tasks for starting up, installing components, verifying the build, and handling potential rollbacks. ```python import os import sys import logging logging.basicConfig(level=logging.ERROR) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) import taskflow.engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow import task from taskflow.types import notifier ANY = notifier.Notifier.ANY import example_utils as eu # noqa # INTRO: This example shows how a graph flow and linear flow can be used # together to execute dependent & non-dependent tasks by going through the # steps required to build a simplistic car (an assembly line if you will). It # also shows how raw functions can be wrapped into a task object instead of # being forced to use the more *heavy* task base class. This is useful in # scenarios where pre-existing code has functions that you easily want to # plug-in to taskflow, without requiring a large amount of code changes. def build_frame(): return 'steel' def build_engine(): return 'honda' def build_doors(): return '2' def build_wheels(): return '4' # These just return true to indiciate success, they would in the real work # do more than just that. def install_engine(frame, engine): return True def install_doors(frame, windows_installed, doors): return True def install_windows(frame, doors): return True def install_wheels(frame, engine, engine_installed, wheels): return True def trash(**kwargs): eu.print_wrapped("Throwing away pieces of car!") def startup(**kwargs): # If you want to see the rollback function being activated try uncommenting # the following line. # # raise ValueError("Car not verified") return True def verify(spec, **kwargs): # If the car is not what we ordered throw away the car (trigger reversion). for key, value in kwargs.items(): if spec[key] != value: raise Exception("Car doesn't match spec!") return True # These two functions connect into the state transition notification emission # points that the engine outputs, they can be used to log state transitions # that are occurring, or they can be used to suspend the engine (or perform # other useful activities). def flow_watch(state, details): print('Flow => %s' % state) def task_watch(state, details): print('Task {} => {}'.format(details.get('task_name'), state)) flow = lf.Flow("make-auto").add( task.FunctorTask(startup, revert=trash, provides='ran'), # A graph flow allows automatic dependency based ordering, the ordering # is determined by analyzing the symbols required and provided and ordering # execution based on a functioning order (if one exists). gf.Flow("install-parts").add( task.FunctorTask(build_frame, provides='frame'), task.FunctorTask(build_engine, provides='engine'), task.FunctorTask(build_doors, provides='doors'), task.FunctorTask(build_wheels, provides='wheels'), # These *_installed outputs allow for other tasks to depend on certain # actions being performed (aka the components were installed), another # way to do this is to link() the tasks manually instead of creating # an 'artificial' data dependency that accomplishes the same goal the # manual linking would result in. task.FunctorTask(install_engine, provides='engine_installed'), task.FunctorTask(install_doors, provides='doors_installed'), task.FunctorTask(install_windows, provides='windows_installed'), task.FunctorTask(install_wheels, provides='wheels_installed')), task.FunctorTask(verify, requires=['frame', 'engine', 'doors', 'wheels', 'engine_installed', 'doors_installed', 'windows_installed', 'wheels_installed'])) # This dictionary will be provided to the tasks as a specification for what # the tasks should produce, in this example this specification will influence # what those tasks do and what output they create. Different tasks depend on # different information from this specification, all of which will be provided # automatically by the engine to those tasks. spec = { "frame": 'steel', } ``` -------------------------------- ### Taskflow Worker Setup and Execution Configuration Source: https://docs.openstack.org/taskflow/latest/user/examples Configures the Taskflow engine and workers for the Mandelbrot example. It sets up shared configurations, defines the number of workers, specifies the tasks each worker can execute, and configures the engine to be worker-based. This allows simulating distributed computation for parallel tasks. ```python BASE_SHARED_CONF = { 'exchange': 'taskflow', } WORKERS = 2 WORKER_CONF = { # These are the tasks the worker can execute, they *must* be importable, # typically this list is used to restrict what workers may execute to # a smaller set of *allowed* tasks that are known to be safe (one would # not want to allow all python code to be executed). 'tasks': [ '%s:MandelCalculator' % (__name__), ], } ENGINE_CONF = { 'engine': 'worker-based', } # Mandelbrot & image settings... IMAGE_SIZE = (512, 512) CHUNK_COUNT = 8 MAX_ITERATIONS = 25 ``` -------------------------------- ### Python: Taskflow flow definition and resumption setup Source: https://docs.openstack.org/taskflow/latest/user/examples Defines a Taskflow flow for creating a volume, including tasks for printing messages, generating volume specifications, and preparing volumes. It also sets up persistence using a backend to allow for the resumption of the flow if it's interrupted. The code handles both initial flow creation and resumption based on provided tracking IDs. ```python import hashlib import logging import os import random import sys import time import contextlib logging.basicConfig(level=logging.ERROR) self_dir = os.path.abspath(os.path.dirname(__file__)) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) sys.path.insert(0, self_dir) from oslo_utils import uuidutils from taskflow import engines from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf from taskflow.persistence import models from taskflow import task import example_utils # noqa @contextlib.contextmanager def slow_down(how_long=0.5): try: yield how_long finally: print("** Ctrl-c me please!!! **") time.sleep(how_long) def find_flow_detail(backend, book_id, flow_id): with contextlib.closing(backend.get_connection()) as conn: lb = conn.get_logbook(book_id) return lb.find(flow_id) class PrintText(task.Task): def __init__(self, print_what, no_slow=False): content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8] super().__init__(name="Print: %s" % (content_hash)) self._text = print_what self._no_slow = no_slow def execute(self): if self._no_slow: print("-" * (len(self._text))) print(self._text) print("-" * (len(self._text))) else: with slow_down(): print("-" * (len(self._text))) print(self._text) print("-" * (len(self._text))) class CreateSpecForVolumes(task.Task): def execute(self): volumes = [] for i in range(0, random.randint(1, 10)): volumes.append({ 'type': 'disk', 'location': "/dev/vda%s" % (i + 1), }) return volumes class PrepareVolumes(task.Task): def execute(self, volume_specs): for v in volume_specs: with slow_down(): print("Dusting off your hard drive %s" % (v)) with slow_down(): print("Taking a well deserved break.") print("Your drive %s has been certified." % (v)) flow = lf.Flow("root").add( PrintText("Starting volume create", no_slow=True), gf.Flow('maker').add( CreateSpecForVolumes("volume_specs", provides='volume_specs'), PrintText("I need a nap, it took me a while to build those specs."), PrepareVolumes(), ), PrintText("Finished volume create", no_slow=True)) with example_utils.get_backend() as backend: try: book_id, flow_id = sys.argv[2].split("+", 1) except (IndexError, ValueError): book_id = None flow_id = None if not all([book_id, flow_id]): book = models.LogBook('resume-volume-create') flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid()) book.add(flow_detail) with contextlib.closing(backend.get_connection()) as conn: conn.save_logbook(book) ``` -------------------------------- ### Main Function for Orchestration in Python Source: https://docs.openstack.org/taskflow/latest/user/examples Sets up a shared persistence backend and fake clients, then generates and starts reviewer and conductor entities. It runs for a specified duration and then gracefully shuts down all entities. Requires 'persistence', 'fake_client', 'generate_reviewer', 'generate_conductor', 'RUN_TIME', and 'timeutils' modules. It manages the lifecycle of threads and their associated stop callbacks. ```python def main(): # Need to share the same backend, so that data can be shared... persistence_conf = { 'connection': 'memory', } saver = persistence.fetch(persistence_conf) with contextlib.closing(saver.get_connection()) as conn: # This ensures that the needed backend setup/data directories/schema # upgrades and so on... exist before they are attempted to be used... conn.upgrade() fc1 = fake_client.FakeClient() # Done like this to share the same client storage location so the correct # zookeeper features work across clients... fc2 = fake_client.FakeClient(storage=fc1.storage) entities = [ generate_reviewer(fc1, saver), generate_conductor(fc2, saver), ] for t, stopper in entities: t.start() try: watch = timeutils.StopWatch(duration=RUN_TIME) watch.start() while not watch.expired(): time.sleep(0.1) finally: for t, stopper in reversed(entities): stopper() t.join() if __name__ == '__main__': main() ``` -------------------------------- ### Python Taskflow Conductor Setup and Workflow Execution Source: https://docs.openstack.org/taskflow/latest/user/examples This Python script sets up a Taskflow conductor to run a '99 Bottles of Beer' song workflow. It defines custom tasks for taking a bottle down, passing it around, and concluding a verse. The workflow is dynamically generated and can resume from where it left off using ZooKeeper for job management and SQLite for persistence. The conductor listens for events during execution. ```python import functools import logging import os import sys import time import traceback import contextlib from kazoo import client top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) from taskflow.conductors import backends as conductor_backends from taskflow import engines from taskflow.jobs import backends as job_backends from taskflow import logging as taskflow_logging from taskflow.patterns import linear_flow as lf from taskflow.persistence import backends as persistence_backends from taskflow.persistence import models from taskflow import task from oslo_utils import timeutils from oslo_utils import uuidutils # Instructions! # # 1. Install zookeeper (or change host listed below) # 2. Download this example, place in file '99_bottles.py' # 3. Run `python 99_bottles.py p` to place a song request onto the jobboard # 4. Run `python 99_bottles.py c` a few times (in different shells) # 5. On demand kill previously listed processes created in (4) and watch # the work resume on another process (and repeat) # 6. Keep enough workers alive to eventually finish the song (if desired). # ME = os.getpid() ZK_HOST = "localhost:2181" JB_CONF = { 'hosts': ZK_HOST, 'board': 'zookeeper', 'path': '/taskflow/99-bottles-demo', } PERSISTENCE_URI = r"sqlite:////tmp/bottles.db" TAKE_DOWN_DELAY = 1.0 PASS_AROUND_DELAY = 3.0 HOW_MANY_BOTTLES = 99 class TakeABottleDown(task.Task): def execute(self, bottles_left): sys.stdout.write('Take one down, ') sys.stdout.flush() time.sleep(TAKE_DOWN_DELAY) return bottles_left - 1 class PassItAround(task.Task): def execute(self): sys.stdout.write('pass it around, ') sys.stdout.flush() time.sleep(PASS_AROUND_DELAY) class Conclusion(task.Task): def execute(self, bottles_left): sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left) sys.stdout.flush() def make_bottles(count): # This is the function that will be called to generate the workflow # and will also be called to regenerate it on resumption so that work # can continue from where it last left off... s = lf.Flow("bottle-song") take_bottle = TakeABottleDown("take-bottle-%s" % count, inject={'bottles_left': count}, provides='bottles_left') pass_it = PassItAround("pass-%s-around" % count) next_bottles = Conclusion("next-bottles-%s" % (count - 1)) s.add(take_bottle, pass_it, next_bottles) for bottle in reversed(list(range(1, count))): take_bottle = TakeABottleDown("take-bottle-%s" % bottle, provides='bottles_left') pass_it = PassItAround("pass-%s-around" % bottle) next_bottles = Conclusion("next-bottles-%s" % (bottle - 1)) s.add(take_bottle, pass_it, next_bottles) return s def run_conductor(only_run_once=False): # This continuously runs consumers until its stopped via ctrl-c or other # kill signal... event_watches = {} # This will be triggered by the conductor doing various activities # with engines, and is quite nice to be able to see the various timing # segments (which is useful for debugging, or watching, or figuring out # where to optimize). def on_conductor_event(cond, event, details): print("Event '%s' has been received..." % event) print("Details = %s" % details) if event.endswith("_start"): w = timeutils.StopWatch() w.start() base_event = event[0:-len("_start")] event_watches[base_event] = w if event.endswith("_end"): base_event = event[0:-len("_end")] try: w = event_watches.pop(base_event) w.stop() print("It took %0.3f seconds for event '%s' to finish" % (w.elapsed(), base_event)) except KeyError: pass if event == 'running_end' and only_run_once: cond.stop() print("Starting conductor with pid: %s" % ME) my_name = "conductor-%s" % ME persist_backend = persistence_backends.fetch(PERSISTENCE_URI) with contextlib.closing(persist_backend): with contextlib.closing(persist_backend.get_connection()) as conn: conn.upgrade() job_board = job_backends.fetch('zookeeper', conf=JB_CONF) with contextlib.closing(job_board): conductor = conductor_backends.load('fast', job_board=job_board, engine=engines.load('serial'), persistence_backend=persist_backend, conductor_name=my_name) conductor.add_event_listener(on_conductor_event) try: conductor.run() except KeyboardInterrupt: print("\nShutting down conductor...\n") if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == 'p': print("Producing job...") job_board = job_backends.fetch('zookeeper', conf=JB_CONF) with contextlib.closing(job_board): flow_def = make_bottles(HOW_MANY_BOTTLES) job_board.post(flow_def) print("Job posted.") else: run_conductor() ``` -------------------------------- ### Run Local TaskFlow Test Scenario Source: https://docs.openstack.org/taskflow/latest/user/examples This function sets up a local testing environment for TaskFlow. It modifies global delay variables for faster execution and appends a unique identifier to the job backend configuration path. It then calls `run_poster()` to post a job and `run_conductor()` to start the conductor, typically used during unit testing. This depends on global variables and other TaskFlow functions like `run_poster` and `run_conductor`. ```python global TAKE_DOWN_DELAY global PASS_AROUND_DELAY global JB_CONF PASS_AROUND_DELAY = 0.01 TAKE_DOWN_DELAY = 0.01 JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid() run_poster() run_conductor(only_run_once=True) ``` -------------------------------- ### Using Dynamic Logging Listeners in Taskflow Source: https://docs.openstack.org/taskflow/latest/user/examples Illustrates how to attach a dynamic logging listener to a Taskflow engine to capture and display notifications about workflow execution. This example uses a simple linear flow with echo tasks and logs all events to stdout. ```python import os import sys logging.basicConfig(level=logging.DEBUG) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) from taskflow import engines from taskflow.listeners import logging as logging_listener from taskflow.patterns import linear_flow as lf from taskflow import task # INTRO: This example walks through a miniature workflow which will do a # simple echo operation; during this execution a listener is associated with # the engine to receive all notifications about what the flow has performed, # this example dumps that output to the stdout for viewing (at debug level # to show all the information which is possible). class Echo(task.Task): def execute(self): print(self.name) # Generate the work to be done (but don't do it yet). wf = lf.Flow('abc') wf.add(Echo('a')) wf.add(Echo('b')) wf.add(Echo('c')) # This will associate the listener with the engine (the listener # will automatically register for notifications with the engine and deregister # when the context is exited). e = engines.load(wf) with logging_listener.DynamicLoggingListener(e): e.run() ``` -------------------------------- ### SQLAlchemy Backend Configuration Source: https://docs.openstack.org/taskflow/latest/user/persistence Provides an example configuration for the SQLAlchemy backend, specifying the database connection string. This backend utilizes SQLAlchemy for persistent storage. ```python conf = { "connection": "sqlite:////tmp/test.db", } ``` -------------------------------- ### Create and Run Linear Flow with TaskFlow Source: https://docs.openstack.org/taskflow/latest/user/examples This Python script demonstrates how to create a simple linear flow with two tasks using TaskFlow. It defines tasks for making phone calls and then runs them sequentially using an engine, passing initial data for the phone numbers. This example illustrates basic TaskFlow structures for serial workflows without persistence. ```python import os import sys logging.basicConfig(level=logging.ERROR) top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)) sys.path.insert(0, top_dir) import taskflow.engines from taskflow.patterns import linear_flow as lf from taskflow import task # INTRO: In this example we create two tasks, each of which ~calls~ a given # ~phone~ number (provided as a function input) in a linear fashion (one after # the other). For a workflow which is serial this shows a extremely simple way # of structuring your tasks (the code that does the work) into a linear # sequence (the flow) and then passing the work off to an engine, with some # initial data to be ran in a reliable manner. # # NOTE(harlowja): This example shows a basic usage of the taskflow structures # without involving the complexity of persistence. Using the structures that # taskflow provides via tasks and flows makes it possible for you to easily at # a later time hook in a persistence layer (and then gain the functionality # that offers) when you decide the complexity of adding that layer in # is 'worth it' for your application's usage pattern (which certain # applications may not need). class CallJim(task.Task): def execute(self, jim_number, *args, **kwargs): print("Calling jim %s." % jim_number) class CallJoe(task.Task): def execute(self, joe_number, *args, **kwargs): print("Calling joe %s." % joe_number) # Create your flow and associated tasks (the work to be done). flow = lf.Flow('simple-linear').add( CallJim(), CallJoe() ) # Now run that flow using the provided initial data (store below). taskflow.engines.run(flow, store=dict(joe_number=444, jim_number=555)) ``` -------------------------------- ### Create and Run Taskflow Engine Source: https://docs.openstack.org/taskflow/latest/user/engines Demonstrates how to create a Taskflow engine instance using a flow and specific engine/backend configurations, followed by executing the flow. This involves importing the `engines` module and utilizing the `load` and `run` helper functions. ```python from taskflow import engines flow = make_flow() # Assuming make_flow() is defined elsewhere eng = engines.load(flow, engine='serial', backend=my_persistence_conf) # Assuming my_persistence_conf is defined eng.run() ``` -------------------------------- ### Create and Post a Job with Taskflow Source: https://docs.openstack.org/taskflow/latest/user/jobs Demonstrates creating persistence and jobboard backends, then posting a job. It uses the `fetch` function for backend instantiation and `board.post` to add a job. ```python from taskflow.persistence import backends as persistence_backends from taskflow.jobs import backends as job_backends ... persistence = persistence_backends.fetch({ "connection": "mysql", "user": ..., "password": ..., }) book = make_and_save_logbook(persistence) board = job_backends.fetch('my-board', { "board": "zookeeper", }, persistence=persistence) job = board.post("my-first-job", book) ... ``` -------------------------------- ### Fetch Persistence Backend using taskflow.persistence.backends Source: https://docs.openstack.org/taskflow/latest/user/persistence Demonstrates how to fetch and configure a persistence backend using the `backends.fetch()` function from the `taskflow.persistence` module. This function utilizes entrypoints (via stevedore) to discover and instantiate the appropriate backend based on provided configuration. The `conf` parameter is a dictionary specifying the backend connection type and its specific parameters. ```python from taskflow.persistence import backends ... persistence = backends.fetch(conf={ "connection": "mysql", "user": ..., "password": ..., }) book = make_and_save_logbook(persistence) ... ``` -------------------------------- ### Write Mandelbrot Results to Image using Pillow Source: https://docs.openstack.org/taskflow/latest/user/examples This function takes the calculated fractal results and saves them as an image file using the Pillow library. It normalizes the color values and maps them to grayscale pixels. Pillow is required, and can be installed via pip. ```python def write_image(results, output_filename=None): print("Gathered %s results that represents a mandelbrot" "" image (using %s chunks that are computed jointly" "" by %s workers)." % (len(results), CHUNK_COUNT, WORKERS)) if not output_filename: return try: from PIL import Image except ImportError as e: raise RuntimeError("Pillow is required to write image files: %s" % e) color_max = 0 for _point, color in results: color_max = max(color, color_max) img = Image.new('L', IMAGE_SIZE, "black") pixels = img.load() for (x, y), color in results: if color_max == 0: color = 0 else: color = int((float(color) / color_max) * 255.0) pixels[x, y] = color img.save(output_filename) ``` -------------------------------- ### Taskflow Engine with State Notifications and Revert Logic Source: https://docs.openstack.org/taskflow/latest/user/examples Demonstrates loading a Taskflow engine, registering notifiers for flow and task state transitions, and running a flow. It also shows how to alter the specification to trigger reverting logic when the built product does not match the desired state. ```python engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) # This registers all (ANY) state transitions to trigger a call to the # flow_watch function for flow state transitions, and registers the # same all (ANY) state transitions for task state transitions. engine.notifier.register(ANY, flow_watch) engine.atom_notifier.register(ANY, task_watch) eu.print_wrapped("Building a car") engine.run() # Alter the specification and ensure that the reverting logic gets triggered spec['doors'] = 5 engine = taskflow.engines.load(flow, store={'spec': spec.copy()}) engine.notifier.register(ANY, flow_watch) engine.atom_notifier.register(ANY, task_watch) eu.print_wrapped("Building a wrong car that doesn't match specification") try: engine.run() except Exception as e: eu.print_wrapped("Flow failed: %s" % e) ``` -------------------------------- ### Create and Run Mandelbrot Fractal Calculation Source: https://docs.openstack.org/taskflow/latest/user/examples The main function to create and run the Mandelbrot fractal calculation. It sets up logging, configures workers and the engine using Taskflow's transport mechanism, calculates the fractal, and then writes the output image. It ensures workers are properly started and stopped. ```python def create_fractal(): logging.basicConfig(level=logging.ERROR) shared_conf = dict(BASE_SHARED_CONF) shared_conf.update({ 'transport': 'memory', 'transport_options': { 'polling_interval': 0.1, }, }) if len(sys.argv) >= 2: output_filename = sys.argv[1] else: output_filename = None worker_conf = dict(WORKER_CONF) worker_conf.update(shared_conf) engine_conf = dict(ENGINE_CONF) engine_conf.update(shared_conf) workers = [] worker_topics = [] print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE) try: print('Running %s workers.' % (WORKERS)) for i in range(0, WORKERS): worker_conf['topic'] = 'calculator_%s' % (i + 1) worker_topics.append(worker_conf['topic']) w = worker.Worker(**worker_conf) runner = threading_utils.daemon_thread(w.run) runner.start() w.wait() workers.append((runner, w.stop)) engine_conf['topics'] = worker_topics results = calculate(engine_conf) print('Execution finished.') finally: print('Stopping workers.') while workers: r, stopper = workers.pop() stopper() r.join() print("Writing image...") write_image(results, output_filename=output_filename) if __name__ == "__main__": create_fractal() ``` -------------------------------- ### Initialize Connection for MemoryBackend Source: https://docs.openstack.org/taskflow/latest/_modules/taskflow/persistence/backends/impl_memory Initializes the Connection for the MemoryBackend, ensuring it's ready for use by calling the 'upgrade' method. ```python class Connection(path_based.PathBasedConnection): def __init__(self, backend): super().__init__(backend) self.upgrade() ``` -------------------------------- ### Start Worker Executor in TaskFlow Source: https://docs.openstack.org/taskflow/latest/_modules/taskflow/engines/worker_based/executor Starts the message processing thread for the worker executor. It ensures the executor is stopped before starting. If already running, it raises a RuntimeError. The method initializes and starts a daemon thread that runs the proxy's start method. ```python def start(self): """Starts message processing thread.""" if self._helper is not None: raise RuntimeError("Worker executor must be stopped before" " it can be started") self._helper = tu.daemon_thread(self._proxy.start) self._helper.start() self._proxy.wait() ```