# Prefect Prefect is a workflow orchestration framework for building data pipelines in Python. It transforms scripts into production workflows by adding scheduling, caching, retries, and event-based automations with minimal code changes. The framework tracks workflow activity through either a self-hosted Prefect server or managed Prefect Cloud, providing observability into every flow and task run. At its core, Prefect uses two primary decorators—`@flow` and `@task`—to define workflows and their atomic units of work. Flows compose work into observable, schedulable pipelines while tasks provide granular control over execution with features like automatic retries, caching, and concurrent execution. Prefect supports deployments for remote execution, work pools for dynamic infrastructure provisioning, and integrations with major cloud providers (AWS, GCP, Azure) and data tools (dbt, Snowflake, Kubernetes). ## Installation ```bash pip install -U prefect ``` ## Core Concepts ### Creating a Basic Flow A flow is defined using the `@flow` decorator on a Python function. Flows track execution metadata, support retries, timeouts, and can be deployed for remote orchestration. ```python from prefect import flow, task import httpx @task(log_prints=True) def get_stars(repo: str): url = f"https://api.github.com/repos/{repo}" count = httpx.get(url).json()["stargazers_count"] print(f"{repo} has {count} stars!") @flow(name="GitHub Stars") def github_stars(repos: list[str]): for repo in repos: get_stars(repo) if __name__ == "__main__": github_stars(["PrefectHQ/prefect"]) ``` ### Creating Tasks with Retries Tasks are atomic units of work that support caching, retries, and concurrent execution. Configure retries with configurable delays and custom retry conditions. ```python import httpx from prefect import flow, task from prefect.tasks import exponential_backoff def retry_handler(task, task_run, state) -> bool: """Skip retries for 401/404 status codes.""" try: state.result() except httpx.HTTPStatusError as exc: return exc.response.status_code not in [401, 404] except httpx.ConnectError: return False return True @task( retries=4, retry_delay_seconds=exponential_backoff(backoff_factor=2), retry_condition_fn=retry_handler ) def fetch_data(url: str): response = httpx.get(url) response.raise_for_status() return response.json() @flow def data_pipeline(urls: list[str]): results = [] for url in urls: results.append(fetch_data(url)) return results ``` ### Running Tasks Concurrently with Submit Use `.submit()` to run tasks concurrently via task runners. Returns a `PrefectFuture` for tracking execution. ```python import time from prefect import flow, task from prefect.futures import wait @task def process_item(item: int) -> int: time.sleep(1) return item * 2 @flow def parallel_processing(): items = list(range(10)) futures = [process_item.submit(item) for item in items] # Wait for all futures to complete done, not_done = wait(futures) # Get results results = [future.result() for future in done] print(f"Processed {len(results)} items: {results}") return results if __name__ == "__main__": parallel_processing() ``` ### Mapping Tasks Over Iterables Use `.map()` to apply a task to each element of an iterable concurrently. ```python from prefect import flow, task, unmapped from prefect.futures import wait @task def add_together(x: int, y: int) -> int: return x + y @task def multiply(x: int, factor: list[int]) -> int: return x * sum(factor) @flow def batch_operations(): numbers = [1, 2, 3, 4, 5] # Map over numbers with a static value sums = add_together.map(numbers, 10) # Use unmapped for iterable arguments that shouldn't be mapped products = multiply.map(numbers, unmapped([2, 3])) wait(sums) wait(products) print(f"Sums: {sums.result()}") # [11, 12, 13, 14, 15] print(f"Products: {products.result()}") # [5, 10, 15, 20, 25] if __name__ == "__main__": batch_operations() ``` ### Task Caching with Cache Policies Enable caching to reuse results from expensive computations. Configure cache policies to control cache key computation. ```python from datetime import timedelta from prefect import flow, task from prefect.cache_policies import INPUTS, TASK_SOURCE @task( cache_policy=TASK_SOURCE + INPUTS, cache_expiration=timedelta(hours=1) ) def expensive_computation(x: int, y: int) -> int: print(f"Computing {x} + {y}...") return x + y # Ignore specific parameters in cache key custom_policy = INPUTS - "debug" @task(cache_policy=custom_policy) def fetch_with_debug(url: str, debug: bool = False) -> dict: print(f"Fetching {url}...") return {"url": url, "data": "sample"} @flow def cached_workflow(): # First call computes and caches result1 = expensive_computation(1, 2) # Second call with same inputs returns cached result result2 = expensive_computation(1, 2) # debug parameter is ignored in cache key data1 = fetch_with_debug("https://api.example.com", debug=False) data2 = fetch_with_debug("https://api.example.com", debug=True) # Still cached return result1, result2 if __name__ == "__main__": cached_workflow() ``` ### Working with States States contain information about the status of flow and task runs. Use them for conditional logic and custom state handling. ```python from prefect import flow, task from prefect.states import Completed, Failed @task def validate_data(data: dict) -> bool: return "required_field" in data @task def process_data(data: dict) -> dict: return {"processed": True, **data} @flow def stateful_workflow(data: dict): is_valid = validate_data(data) if not is_valid: return Failed(message="Validation failed: missing required_field") result = process_data(data) return Completed(message="Data processed successfully", data=result) @flow def check_task_states(): # Get state object instead of result state = validate_data({"required_field": "value"}, return_state=True) if state.is_completed(): print(f"Task completed with result: {state.result()}") elif state.is_failed(): print(f"Task failed: {state.message}") if __name__ == "__main__": result = stateful_workflow({"required_field": "test"}) print(f"Flow result: {result}") ``` ### State Change Hooks Execute code in response to state transitions in flows or tasks. ```python from prefect import flow, task from prefect.states import State def on_task_failure(task, task_run, state: State): print(f"Task {task.name} failed with message: {state.message}") # Send alert, log to external system, etc. def on_task_success(task, task_run, state: State): print(f"Task {task.name} completed successfully") def on_flow_completion(flow, flow_run, state: State): print(f"Flow {flow.name} finished with state: {state.name}") @task(on_failure=[on_task_failure], on_completion=[on_task_success]) def risky_task(should_fail: bool = False): if should_fail: raise ValueError("Intentional failure") return "success" @flow(on_completion=[on_flow_completion]) def monitored_workflow(): risky_task(should_fail=False) risky_task(should_fail=True) if __name__ == "__main__": monitored_workflow() ``` ### Creating Artifacts Artifacts capture outputs like reports, tables, and links for display in the Prefect UI. ```python from prefect import flow, task from prefect.artifacts import ( create_markdown_artifact, create_table_artifact, create_link_artifact, create_progress_artifact, update_progress_artifact, ) import time @task def generate_report(): markdown_report = """# Daily Report ## Summary Processing completed successfully. ## Statistics | Metric | Value | |--------|-------| | Records Processed | 1,000 | | Success Rate | 99.5% | | Duration | 45s | """ create_markdown_artifact( key="daily-report", markdown=markdown_report, description="Daily processing report" ) @task def track_customers(): customers = [ {"id": "001", "name": "Acme Corp", "status": "Active"}, {"id": "002", "name": "TechStart", "status": "Pending"}, ] create_table_artifact( key="customer-list", table=customers, description="Current customer status" ) @task def save_output_link(): create_link_artifact( key="output-data", link="https://storage.example.com/output.csv", link_text="Download Output", description="Link to processed data" ) @task def batch_process_with_progress(): progress_id = create_progress_artifact( progress=0.0, description="Batch processing progress" ) for i in range(1, 11): time.sleep(0.5) # Simulate work update_progress_artifact(artifact_id=progress_id, progress=i * 10) return "Processing complete" @flow def artifact_workflow(): generate_report() track_customers() save_output_link() batch_process_with_progress() if __name__ == "__main__": artifact_workflow() ``` ### Creating Deployments with Serve The simplest way to deploy a flow for scheduled execution on static infrastructure. ```python from prefect import flow @flow(log_prints=True) def daily_etl(): print("Running daily ETL pipeline...") # ETL logic here return {"records_processed": 1000} if __name__ == "__main__": # Create deployment and start serving daily_etl.serve( name="daily-etl-deployment", cron="0 6 * * *", # Run daily at 6 AM parameters={"source": "production"}, tags=["etl", "production"] ) ``` ### Creating Deployments with Work Pools Deploy flows to dynamic infrastructure using work pools and the `.deploy()` method. ```python from prefect import flow from prefect.client.schemas.objects import ConcurrencyLimitConfig @flow def ml_training_pipeline(dataset: str, epochs: int = 10): print(f"Training model on {dataset} for {epochs} epochs") return {"accuracy": 0.95} if __name__ == "__main__": ml_training_pipeline.deploy( name="ml-training", work_pool_name="kubernetes-pool", image="my-registry/ml-training:latest", push=False, cron="0 0 * * 0", # Weekly on Sunday parameters={"dataset": "training_data_v2", "epochs": 50}, tags=["ml", "training"], concurrency_limit=ConcurrencyLimitConfig(limit=3), job_variables={ "cpu": "4", "memory": "16Gi", "env": {"MODEL_VERSION": "v2"} } ) ``` ### Running Deployments Programmatically Trigger deployment runs from Python code. ```python from prefect.deployments import run_deployment # Trigger a deployment run flow_run = run_deployment( name="my-flow/my-deployment", parameters={"param1": "value1"}, timeout=0, # Don't wait for completion ) print(f"Started flow run: {flow_run.id}") # Trigger and wait for completion flow_run = run_deployment( name="my-flow/my-deployment", parameters={"param1": "value1"}, timeout=300, # Wait up to 5 minutes ) print(f"Flow run completed with state: {flow_run.state}") ``` ### Scheduling with Cron, Interval, and RRule Configure different schedule types for deployments. ```python from prefect import flow @flow def scheduled_flow(): print("Running on schedule") if __name__ == "__main__": # Cron schedule - runs at 9 AM on weekdays scheduled_flow.serve( name="cron-deployment", cron="0 9 * * MON-FRI", timezone="America/New_York" ) # Interval schedule - runs every 30 minutes # Can use seconds, ISO 8601 duration, or time format scheduled_flow.serve( name="interval-deployment", interval=1800 # 30 minutes in seconds # Or: interval="PT30M" (ISO 8601) # Or: interval="00:30:00" (time format) ) # RRule schedule - complex calendar logic # Last weekday of each month scheduled_flow.serve( name="rrule-deployment", rrule="FREQ=MONTHLY;BYDAY=MO,TU,WE,TH,FR;BYSETPOS=-1" ) ``` ### Using Blocks for Configuration Blocks store typed configuration that can be reused across workflows. ```python from prefect.blocks.system import Secret from prefect.filesystems import RemoteFileSystem from prefect import flow, task # Save a secret block (typically done once) async def setup_blocks(): secret = Secret(value="my-api-key-12345") await secret.save("api-key", overwrite=True) # Load and use blocks in flows @task def fetch_with_api_key(endpoint: str): api_key = Secret.load("api-key") # Use api_key.get() to retrieve the secret value return f"Fetched from {endpoint} with key {api_key.get()[:5]}..." @flow def workflow_with_blocks(): result = fetch_with_api_key("https://api.example.com/data") print(result) if __name__ == "__main__": import asyncio asyncio.run(setup_blocks()) workflow_with_blocks() ``` ### Background Tasks with Delay Use `.delay()` for fire-and-forget task execution on separate infrastructure. ```python from prefect import task @task def send_notification(user_id: str, message: str): print(f"Sending to {user_id}: {message}") # Notification logic here @task def process_webhook(payload: dict): print(f"Processing webhook: {payload}") # Heavy processing logic here # In a web endpoint handler (e.g., FastAPI) def handle_user_signup(user_id: str): # Fire-and-forget - returns immediately send_notification.delay(user_id, "Welcome to our platform!") return {"status": "signup processed"} def handle_webhook(payload: dict): # Queue for background processing future = process_webhook.delay(payload) # Response returns without waiting return {"status": "webhook received", "task_run_id": str(future.task_run_id)} ``` ### Working with the REST API Interact with Prefect Cloud or self-hosted server via REST API. ```python import asyncio import requests from prefect.client.orchestration import get_client # Using PrefectClient async def list_flows(): client = get_client() flows = await client.read_flows(limit=5) for flow in flows: print(f"{flow.name}: {flow.id}") # Using requests library with Prefect Cloud def get_recent_artifacts(): PREFECT_API_URL = "https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}" PREFECT_API_KEY = "your-api-key" response = requests.post( f"{PREFECT_API_URL}/artifacts/filter", headers={"Authorization": f"Bearer {PREFECT_API_KEY}"}, json={ "sort": "CREATED_DESC", "limit": 5, "artifacts": {"key": {"exists_": True}} } ) return response.json() # Create a flow run via curl """ curl --location --request POST "$PREFECT_API_URL/deployments/$DEPLOYMENT_ID/create_flow_run" \ --header "Content-Type: application/json" \ --header "Authorization: Bearer $PREFECT_API_KEY" \ --data-raw '{"parameters": {"key": "value"}}' """ if __name__ == "__main__": asyncio.run(list_flows()) ``` ### Handling Failures in Concurrent Work Process concurrent tasks and handle failures gracefully. ```python from typing import Any from prefect import flow, task from prefect.futures import wait from prefect.states import State @task def process_record(record_id: int) -> dict: if record_id % 3 == 0: raise ValueError(f"Cannot process record {record_id}") return {"id": record_id, "status": "processed"} @flow def resilient_batch_processing(): record_ids = list(range(1, 11)) # Submit all tasks futures = process_record.map(record_ids) # Wait for all to complete (success or failure) done, not_done = wait(futures) # Separate successful and failed results successful: list[Any] = [] failed: list[State] = [] for future in done: if future.state.is_completed(): successful.append(future.result()) else: failed.append(future.state) print(f"✓ Processed: {len(successful)} records") print(f"✗ Failed: {len(failed)} records") # Continue with successful results return successful if __name__ == "__main__": results = resilient_batch_processing() print(f"Final results: {results}") ``` ### Starting the Prefect Server Run a local Prefect server for development and self-hosted deployments. ```bash # Start the Prefect server prefect server start # Access the UI at http://localhost:4200 # Configure client to use local server prefect config set PREFECT_API_URL=http://localhost:4200/api ``` ### CLI Commands Reference Common Prefect CLI commands for managing workflows. ```bash # Deployments prefect deployment run my-flow/my-deployment prefect deployment ls prefect deployment inspect my-flow/my-deployment # Flow runs prefect flow-run ls prefect flow-run cancel # Work pools prefect work-pool create my-pool --type process prefect work-pool ls prefect worker start --pool my-pool # Blocks prefect block type ls prefect block register -m prefect.blocks.notifications # Artifacts prefect artifact ls prefect artifact inspect my-artifact-key prefect artifact delete my-artifact-key # Configuration prefect config view --show-defaults prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true prefect profile inspect ``` ## Summary Prefect excels at transforming Python code into production-ready workflows with minimal overhead. The primary use cases include ETL/ELT data pipelines, ML training and inference workflows, scheduled batch processing, event-driven automation, and any Python workload requiring observability, retries, and scheduling. The framework's hybrid architecture keeps your code and data in your infrastructure while leveraging Prefect's orchestration capabilities. Integration patterns typically involve wrapping existing Python functions with `@flow` and `@task` decorators, deploying flows to work pools for dynamic infrastructure, using blocks for credential and configuration management, and leveraging the REST API for programmatic control. Prefect integrates seamlessly with cloud providers through dedicated integration libraries (prefect-aws, prefect-gcp, prefect-azure) and data tools (prefect-dbt, prefect-snowflake, prefect-kubernetes), making it adaptable to diverse data engineering and ML operations environments.