Try Live
Add Docs
Rankings
Pricing
Docs
Install
Theme
Install
Docs
Pricing
More...
More...
Try Live
Rankings
Enterprise
Create API Key
Add Docs
Prefect
https://github.com/prefecthq/prefect
Admin
Prefect is a Python-based workflow orchestration framework for building resilient and dynamic data
...
Tokens:
1,153,612
Snippets:
6,629
Trust Score:
8.2
Update:
3 weeks ago
Context
Skills
Chat
Benchmark
95
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Prefect Prefect is a workflow orchestration framework for building resilient, observable data pipelines in Python. It transforms ordinary Python scripts into production-ready workflows by adding scheduling, retries, caching, and event-based automations with minimal code changes. Prefect tracks execution metadata, manages dependencies between tasks, and provides both a self-hosted server and a managed cloud option for monitoring workflow activity through a comprehensive dashboard UI. The framework follows a hybrid model where your proprietary flow code stays within your infrastructure while Prefect handles orchestration metadata. By decorating Python functions with `@flow` and `@task`, developers gain automatic state tracking, concurrent execution capabilities, and the ability to deploy workflows that can be triggered remotely via API, schedules, or events. Prefect supports various infrastructure backends including Docker, Kubernetes, and serverless platforms like AWS ECS, Azure ACI, and GCP Cloud Run. ## Installation Install Prefect using pip or uv package managers. ```bash # Install with pip pip install -U prefect # Or install with uv uv add prefect ``` ## Core API: @flow Decorator The `@flow` decorator transforms a Python function into a Prefect flow, enabling automatic state tracking, retries, timeouts, and deployment capabilities. Flows are the primary entry points for workflow orchestration. ```python from prefect import flow, task import httpx @task(log_prints=True) def fetch_weather(city: str) -> dict: """Fetch weather data for a city.""" url = f"https://wttr.in/{city}?format=j1" response = httpx.get(url) response.raise_for_status() return response.json() @task def extract_temperature(weather_data: dict) -> float: """Extract current temperature from weather data.""" return float(weather_data["current_condition"][0]["temp_C"]) @flow( name="Weather Check", retries=3, retry_delay_seconds=10, log_prints=True ) def weather_pipeline(cities: list[str]) -> dict[str, float]: """Check temperatures for multiple cities.""" results = {} for city in cities: data = fetch_weather(city) temp = extract_temperature(data) results[city] = temp print(f"{city}: {temp}°C") return results if __name__ == "__main__": temps = weather_pipeline(["London", "Paris", "Tokyo"]) print(f"Results: {temps}") ``` ## Core API: @task Decorator Tasks are atomic units of work within flows that support caching, retries, concurrent execution, and transactional semantics. Tasks can be called directly, submitted for concurrent execution with `.submit()`, or mapped over iterables with `.map()`. ```python from prefect import flow, task from prefect.cache_policies import INPUTS from datetime import timedelta @task( retries=4, retry_delay_seconds=[1, 2, 4, 8], # Exponential backoff cache_policy=INPUTS, cache_expiration=timedelta(hours=1), log_prints=True ) def process_record(record_id: int) -> dict: """Process a single record with caching and retries.""" print(f"Processing record {record_id}") # Simulate processing return {"id": record_id, "status": "processed"} @task def aggregate_results(results: list[dict]) -> dict: """Aggregate all processed results.""" return { "total": len(results), "processed": [r["id"] for r in results] } @flow def batch_processor(record_ids: list[int]) -> dict: # Process records concurrently using .map() futures = process_record.map(record_ids) results = [f.result() for f in futures] return aggregate_results(results) if __name__ == "__main__": summary = batch_processor([1, 2, 3, 4, 5]) print(summary) ``` ## Concurrent Execution with .submit() and .map() Tasks can run concurrently using `.submit()` for individual tasks or `.map()` for iterating over collections. The `wait()` utility handles synchronization of multiple futures. ```python import time from prefect import flow, task from prefect.futures import wait @task def slow_task(item: int) -> int: """Simulate a slow operation.""" time.sleep(1) return item * 2 @task def combine_results(a: int, b: int) -> int: """Combine two results.""" return a + b @flow def concurrent_workflow(): # Submit individual tasks for concurrent execution future_a = slow_task.submit(5) future_b = slow_task.submit(10) # Tasks run concurrently - total time ~1 second, not 2 result = combine_results(future_a.result(), future_b.result()) print(f"Combined result: {result}") # Map over a collection for bulk concurrent processing items = [1, 2, 3, 4, 5] mapped_futures = slow_task.map(items) # Wait for all futures to complete done, not_done = wait(mapped_futures) # Collect results mapped_results = [f.result() for f in done] print(f"Mapped results: {mapped_results}") return result, mapped_results if __name__ == "__main__": concurrent_workflow() ``` ## Deployments and Serving Flows Deployments allow flows to run on schedules, be triggered remotely via API, or respond to events. The `.serve()` method creates a long-running process that executes scheduled flow runs. ```python from prefect import flow, task @task def extract_data(source: str) -> list: """Extract data from source.""" print(f"Extracting from {source}") return [{"id": i, "value": i * 10} for i in range(5)] @task def transform_data(data: list) -> list: """Transform extracted data.""" return [{"id": d["id"], "value": d["value"] * 2} for d in data] @task def load_data(data: list, destination: str) -> int: """Load data to destination.""" print(f"Loading {len(data)} records to {destination}") return len(data) @flow(log_prints=True) def etl_pipeline(source: str = "api", destination: str = "warehouse") -> int: """Complete ETL pipeline.""" raw_data = extract_data(source) transformed = transform_data(raw_data) count = load_data(transformed, destination) return count if __name__ == "__main__": # Serve the flow with a cron schedule (runs every day at 8 AM) etl_pipeline.serve( name="daily-etl", cron="0 8 * * *", parameters={"source": "production-api", "destination": "data-warehouse"}, tags=["etl", "production"] ) ``` ## Deploying to Work Pools For dynamic infrastructure provisioning, use `.deploy()` with work pools. This enables running flows on Docker, Kubernetes, or serverless platforms. ```python from prefect import flow @flow(log_prints=True) def ml_training_pipeline( dataset_path: str, model_type: str = "random_forest", epochs: int = 100 ) -> dict: """Train an ML model.""" print(f"Training {model_type} on {dataset_path} for {epochs} epochs") return {"model": model_type, "accuracy": 0.95} if __name__ == "__main__": # Deploy to a Docker work pool ml_training_pipeline.deploy( name="ml-training-docker", work_pool_name="docker-pool", image="my-registry/ml-image:latest", job_variables={"env": {"CUDA_VISIBLE_DEVICES": "0"}}, cron="0 2 * * *", # Run at 2 AM daily tags=["ml", "training"] ) ``` ## Running Deployments Programmatically Trigger deployment runs from Python code using the `run_deployment` function. ```python import asyncio from prefect.deployments import run_deployment async def trigger_pipeline(): """Trigger a deployment and wait for completion.""" # Run a deployment with custom parameters flow_run = await run_deployment( name="etl-pipeline/daily-etl", parameters={ "source": "staging-api", "destination": "staging-warehouse" }, timeout=0 # Don't wait for completion ) print(f"Started flow run: {flow_run.id}") return flow_run # Synchronous alternative from prefect.deployments import run_deployment flow_run = run_deployment( name="etl-pipeline/daily-etl", parameters={"source": "test"}, tags=["manual-trigger"] ) print(f"Flow run ID: {flow_run.id}") ``` ## CLI Commands Common Prefect CLI commands for managing flows, deployments, and the server. ```bash # Start the Prefect server prefect server start # Start server in Docker docker run -p 4200:4200 -d --rm prefecthq/prefect:3-python3.12 \ prefect server start --host 0.0.0.0 # Connect to Prefect Cloud prefect cloud login # List flows prefect flow ls # Run a deployment prefect deployment run my-flow/my-deployment # Create a deployment from command line prefect deploy my_script.py:my_flow \ --name my-deployment \ --pool my-work-pool \ --cron "0 * * * *" # List deployments prefect deployment ls # Start a worker for a work pool prefect worker start --pool my-work-pool # View and set configuration prefect config view prefect config set PREFECT_API_URL="http://localhost:4200/api" # Manage schedules prefect deployment schedule create my-flow/my-deployment --cron "0 8 * * *" prefect deployment schedule delete my-flow/my-deployment --schedule-id abc123 ``` ## REST API: Creating Flow Runs The Prefect REST API allows programmatic interaction with deployments and flow runs from any language. ```bash # Set environment variables PREFECT_API_URL="http://localhost:4200/api" DEPLOYMENT_ID="your-deployment-id" # Create a flow run from a deployment curl --location --request POST "$PREFECT_API_URL/deployments/$DEPLOYMENT_ID/create_flow_run" \ --header "Content-Type: application/json" \ --data-raw '{ "parameters": {"param1": "value1"}, "tags": ["manual", "api-triggered"] }' # Filter flow runs curl --location --request POST "$PREFECT_API_URL/flow_runs/filter" \ --header "Content-Type: application/json" \ --data-raw '{ "sort": "CREATED_DESC", "limit": 10, "flow_runs": { "state": { "type": {"any_": ["COMPLETED", "FAILED"]} } } }' ``` ## Python REST Client Use the `PrefectClient` for programmatic API access from Python. ```python import asyncio from prefect.client.orchestration import get_client async def list_recent_flows(): """List recent flows using the Prefect client.""" async with get_client() as client: # Read flows with pagination flows = await client.read_flows(limit=10) for flow in flows: print(f"Flow: {flow.name} (ID: {flow.id})") # Get deployments for this flow deployments = await client.read_deployments( flow_filter={"id": {"any_": [str(flow.id)]}} ) for dep in deployments: print(f" - Deployment: {dep.name}") return flows async def get_flow_run_logs(flow_run_id: str): """Retrieve logs for a specific flow run.""" async with get_client() as client: logs = await client.read_logs( flow_run_filter={"id": {"any_": [flow_run_id]}} ) for log in logs: print(f"[{log.level}] {log.message}") if __name__ == "__main__": asyncio.run(list_recent_flows()) ``` ## Blocks for Configuration Storage Blocks store typed configuration like credentials that can be reused across flows and deployments. ```python from prefect.blocks.system import Secret from prefect_aws import S3Bucket, AwsCredentials # Create and save a secret block api_key = Secret(value="sk-my-secret-api-key") api_key.save("openai-api-key", overwrite=True) # Load and use a secret in a flow @flow def secure_api_call(): secret_block = Secret.load("openai-api-key") api_key = secret_block.get() # Use api_key for API calls... # Create AWS credentials block aws_creds = AwsCredentials( aws_access_key_id="AKIAIOSFODNN7EXAMPLE", aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", region_name="us-east-1" ) aws_creds.save("production-aws", overwrite=True) # Create S3 bucket block for result storage s3_bucket = S3Bucket( credentials=AwsCredentials.load("production-aws"), bucket_name="my-prefect-results" ) s3_bucket.save("result-storage", overwrite=True) # Use blocks in task caching from prefect import task from prefect.cache_policies import INPUTS @task( cache_policy=INPUTS, result_storage=S3Bucket.load("result-storage") ) def cached_expensive_operation(data: dict) -> dict: """Results cached to S3 for distributed access.""" # Expensive computation... return {"processed": data} ``` ## Custom Retry Logic Configure sophisticated retry behavior with conditional retries, exponential backoff, and jitter. ```python import httpx from prefect import flow, task from prefect.tasks import exponential_backoff def should_retry(task, task_run, state) -> bool: """Custom retry condition - don't retry on auth errors.""" try: state.result() except httpx.HTTPStatusError as exc: # Don't retry on 401 Unauthorized or 404 Not Found return exc.response.status_code not in [401, 404] except httpx.ConnectError: return True # Retry connection errors return True @task( retries=5, retry_delay_seconds=exponential_backoff(backoff_factor=2), retry_jitter_factor=0.5, # Add randomness to prevent thundering herd retry_condition_fn=should_retry ) def resilient_api_call(endpoint: str) -> dict: """API call with sophisticated retry handling.""" response = httpx.get(endpoint, timeout=30) response.raise_for_status() return response.json() @flow def api_workflow(): # Will retry with delays: ~2s, ~4s, ~8s, ~16s, ~32s (plus jitter) data = resilient_api_call("https://api.example.com/data") return data ``` ## Scheduling Options Prefect supports cron, interval, and RRule schedules for automated flow execution. ```python from prefect import flow @flow def scheduled_job(): print("Running scheduled job") # Cron schedule - every weekday at 9 AM Eastern scheduled_job.serve( name="weekday-morning-job", cron="0 9 * * MON-FRI", timezone="America/New_York" ) # Interval schedule - every 30 minutes scheduled_job.serve( name="frequent-job", interval=1800, # seconds, or use "PT30M" ISO format ) # RRule for complex schedules - last Friday of each month at 5 PM scheduled_job.serve( name="monthly-report", rrule="FREQ=MONTHLY;BYDAY=-1FR;BYHOUR=17;BYMINUTE=0", timezone="America/Los_Angeles" ) ``` ## State Change Hooks Execute custom logic when flows or tasks change state using hooks. ```python from prefect import flow, task from prefect.states import State def on_failure(flow, flow_run, state: State): """Send alert when flow fails.""" print(f"Flow {flow.name} failed!") print(f"Error: {state.message}") # Send notification via Slack, email, etc. def on_completion(flow, flow_run, state: State): """Log success metrics.""" print(f"Flow {flow.name} completed successfully") print(f"Duration: {flow_run.total_run_time}") @flow( on_failure=[on_failure], on_completion=[on_completion] ) def monitored_pipeline(): """Pipeline with state change hooks.""" result = process_data() return result @task def process_data(): return {"processed": True} ``` ## Artifacts for Data Persistence Create and persist artifacts like tables, markdown reports, and links for visibility in the UI. ```python from prefect import flow, task from prefect.artifacts import create_markdown_artifact, create_table_artifact @task def generate_report(data: list[dict]) -> str: """Generate a markdown report artifact.""" report = f""" # Data Processing Report ## Summary - Total records processed: {len(data)} - Success rate: 100% ## Sample Data | ID | Value | Status | |----|-------|--------| """ for item in data[:5]: report += f"| {item['id']} | {item['value']} | {item['status']} |\n" create_markdown_artifact( key="processing-report", markdown=report, description="Daily processing report" ) return report @task def save_results_table(results: list[dict]): """Create a table artifact.""" create_table_artifact( key="results-table", table=results, description="Processing results" ) @flow def reporting_pipeline(): data = [{"id": i, "value": i*10, "status": "ok"} for i in range(100)] generate_report(data) save_results_table(data[:10]) ``` ## Background Tasks with .delay() Use `.delay()` for fire-and-forget task execution on separate infrastructure, ideal for web applications. ```python from prefect import flow, task @task def send_welcome_email(user_id: str, email: str): """Send welcome email - runs in background.""" print(f"Sending welcome email to {email}") # Email sending logic... @task def process_signup_analytics(user_id: str, metadata: dict): """Process analytics - runs in background.""" print(f"Processing analytics for user {user_id}") # Analytics processing... # In a web application endpoint: def handle_user_signup(user_data: dict): """Handle user signup - returns immediately.""" user_id = user_data["id"] # These tasks run in background, don't block the response send_welcome_email.delay(user_id, user_data["email"]) process_signup_analytics.delay(user_id, user_data.get("metadata", {})) # Return immediately to the user return {"status": "success", "user_id": user_id} # Start a task worker to process delayed tasks # CLI: prefect task-worker start ``` ## Concurrency Limits Control concurrent execution at the deployment or global level to manage resource usage. ```python from prefect import flow, task from prefect.concurrency.sync import concurrency from prefect.client.schemas.objects import ConcurrencyLimitConfig @task def rate_limited_api_call(item_id: int) -> dict: """API call with global concurrency limit.""" # Limit to 5 concurrent calls across all runs with concurrency("api-rate-limit", occupy=1): import httpx response = httpx.get(f"https://api.example.com/items/{item_id}") return response.json() @flow def rate_limited_pipeline(item_ids: list[int]): """Pipeline respecting rate limits.""" results = [] for item_id in item_ids: result = rate_limited_api_call(item_id) results.append(result) return results # Deploy with deployment-level concurrency limit rate_limited_pipeline.deploy( name="rate-limited-deployment", work_pool_name="default", concurrency_limit=ConcurrencyLimitConfig( limit=3, # Max 3 concurrent runs of this deployment collision_strategy="ENQUEUE" # Queue new runs instead of canceling ) ) ``` ## Summary Prefect excels at transforming Python scripts into production-grade workflows with minimal code changes. The `@flow` and `@task` decorators provide automatic observability, retries, and caching. For simple deployments, `.serve()` creates a long-running process that handles scheduled executions. For dynamic infrastructure needs, `.deploy()` with work pools enables execution on Docker, Kubernetes, or serverless platforms. The framework's hybrid architecture keeps your code secure while providing comprehensive orchestration through either a self-hosted server or Prefect Cloud. Common integration patterns include using Blocks for credential management across workflows, leveraging the REST API or Python client for programmatic control, and combining concurrent execution methods (`.submit()`, `.map()`, `.delay()`) based on use case requirements. Prefect integrates with major cloud providers and data tools through official integration libraries (prefect-aws, prefect-gcp, prefect-dbt, etc.), enabling workflows to interact with S3, BigQuery, Snowflake, and other services with pre-built, credential-aware blocks.