### Running the Complete Rate Limiter Example Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md Execute the full rate limiter example code located in the Pingora repository. This command will build and run the example. ```bash cargo run --example rate_limiter ``` -------------------------------- ### Pingora server configuration with YAML Source: https://context7.com/cloudflare/pingora/llms.txt Illustrates a sample YAML configuration file for Pingora servers, covering settings for threads, daemonization, logging, upgrade sockets, user/group, and upstream connection pooling. Also shows command-line examples for starting, upgrading, and shutting down the server. ```yaml # conf.yaml --- version: 1 # required constant threads: 4 # worker threads per service daemon: true # run in background pid_file: /run/myproxy.pid error_log: /var/log/myproxy.err upgrade_sock: /tmp/myproxy_upgrade.sock user: nobody group: nogroup upstream_keepalive_pool_size: 512 # total pooled upstream connections work_stealing: true # tokio work-stealing runtime (default) # User-defined settings (ignored by Pingora, readable via your own config parsing) my_backend_host: "api.internal" my_backend_port: 8443 ``` ```bash # Start with config RUST_LOG=INFO cargo run -- -c conf.yaml # Graceful upgrade (zero-downtime binary swap) pkill -SIGQUIT myproxy && RUST_LOG=INFO ./myproxy-new -c conf.yaml -u # Graceful shutdown pkill -SIGTERM myproxy ``` -------------------------------- ### Add Logging and Metrics Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md Implement logging and Prometheus metrics in the `logging` phase. This example logs the response code and increments a request counter. It also shows how to start a Prometheus metrics server. ```Rust pub struct MyGateway { req_metric: prometheus::IntCounter, } #[async_trait] impl ProxyHttp for MyGateway { ... async fn logging( &self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX, ) { let response_code = session .response_written() .map_or(0, |resp| resp.status.as_u16()); // access log info!( "{} response code: {response_code}", self.request_summary(session, ctx) ); self.req_metric.inc(); } fn main() { ... let mut prometheus_service_http = pingora_prometheus::prometheus_http_service(); prometheus_service_http.add_tcp("127.0.0.1:6192"); my_server.add_service(prometheus_service_http); my_server.run_forever(); } ``` -------------------------------- ### Load Balancer Output Example Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Example output demonstrating the load balancer distributing traffic across backend peers. ```text upstream peer is: Backend { addr: Inet(1.0.0.1:443), weight: 1 } upstream peer is: Backend { addr: Inet(1.1.1.1:443), weight: 1 } upstream peer is: Backend { addr: Inet(1.0.0.1:443), weight: 1 } upstream peer is: Backend { addr: Inet(1.1.1.1:443), weight: 1 } upstream peer is: Backend { addr: Inet(1.0.0.1:443), weight: 1 } ... ``` -------------------------------- ### Initialize and Run Pingora Server Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Set up a basic Pingora server instance in the main function and start it. This server handles configuration, signal management, and spawns runtime threads. ```rust use async_trait::async_trait; use pingora::prelude::*; use std::sync::Arc; fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap(); my_server.run_forever(); } ``` -------------------------------- ### Run Load Balancer Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Command to run the Pingora project. Use this to start your load balancer. ```bash cargo run ``` -------------------------------- ### Run Pingora Service with Configuration Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Starts the Pingora service using the specified configuration file. `RUST_LOG=INFO` ensures logs are written to the error log. ```bash RUST_LOG=INFO cargo run -- -c conf.yaml -d ``` -------------------------------- ### Basic Pingora Configuration Example Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/conf.md This is a standard Pingora configuration file in YAML format. It specifies essential settings like version, thread count, PID file, upgrade socket, user, and group. ```yaml --- version: 1 threads: 2 pid_file: /run/pingora.pid upgrade_sock: /tmp/pingora_upgrade.sock user: nobody group: webusers ``` -------------------------------- ### Create and Run Pingora Server Source: https://context7.com/cloudflare/pingora/llms.txt Initializes a Pingora server with optional CLI argument parsing and starts its event loop. The server handles daemonization and signal management for graceful shutdowns or upgrades. ```rust use pingora_core::server::configuration::Opt; use pingora_core::server::Server; fn main() { // Pass None to use defaults, or Some(Opt::parse_args()) to enable CLI flags // (-d daemon, -c config file, -u upgrade mode, -t test config) let mut my_server = Server::new(Some(Opt::parse_args())).unwrap(); my_server.bootstrap(); // Add services here, then: my_server.run_forever(); // blocks; exits on SIGTERM/SIGQUIT } ``` -------------------------------- ### Route Traffic Based on Request Path Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md Route traffic to different upstream peers based on the request path. This example directs requests starting with '/family/' to '1.0.0.1' and all others to '1.1.1.1'. ```Rust pub struct MyGateway; #[async_trait] impl ProxyHttp for MyGateway { type CTX = (); fn new_ctx(&self) -> Self::CTX {} async fn upstream_peer( &self, session: &mut Session, _ctx: &mut Self::CTX, ) -> Result> { let addr = if session.req_header().uri.path().starts_with("/family/") { ("1.0.0.1", 443) } else { ("1.1.1.1", 443) }; info!("connecting to {addr:?}"); let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string())); Ok(peer) } } ``` -------------------------------- ### Graceful Upgrade Process for Pingora Services Source: https://context7.com/cloudflare/pingora/llms.txt Perform a zero-downtime binary upgrade by configuring `upgrade_sock` in `conf.yaml`, signaling the old process to quit, and starting the new process in upgrade mode to acquire listening sockets. ```bash # 1. Ensure upgrade_sock is set in conf.yaml: # upgrade_sock: /tmp/myproxy_upgrade.sock # 2. Start initial server RUST_LOG=INFO ./myproxy -c conf.yaml -d # 3. Build new binary, then graceful upgrade: # a. Tell old process to hand over its sockets pkill -SIGQUIT myproxy # b. Start new process in upgrade mode (acquires the sockets) RUST_LOG=INFO ./myproxy-new -c conf.yaml -d --upgrade # The old process finishes in-flight requests then exits. # New process begins accepting connections immediately. # From clients: zero dropped connections. ``` -------------------------------- ### OpenSSL Version and Server Certificate Generation Source: https://github.com/cloudflare/pingora/blob/main/pingora-proxy/tests/utils/conf/keys/README.md Check the installed OpenSSL version and generate a server certificate with Subject Alternative Names (SANs). ```bash openssl version # OpenSSL 3.1.1 echo '[v3_req]' > openssl.cnf openssl req -config openssl.cnf -new -x509 -key key.pem -out server_rustls.crt -days 3650 -sha256 \ -subj '/C=US/ST=CA/L=San Francisco/O=Cloudflare, Inc/CN=openrusty.org' \ -addext "subjectAltName=DNS:*.openrusty.org,DNS:openrusty.org,DNS:cat.com,DNS:dog.com" ``` -------------------------------- ### Setup Prometheus Metrics Endpoint Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/prom.md Configure and add a Prometheus HTTP metrics service to your Pingora server. This service will listen on a specified TCP address for Prometheus scraping. ```rust ... let mut prometheus_service_http = pingora_prometheus::prometheus_http_service(); prometheus_service_http.add_tcp("0.0.0.0:1234"); my_server.add_service(prometheus_service_http); my_server.run_forever(); ``` -------------------------------- ### Gracefully Upgrade Pingora Service Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Upgrades a running Pingora service to a new version without losing requests. Sends SIGQUIT to the old server and starts a new one with the upgrade flag. ```bash pkill -SIGQUIT load_balancer &&\ RUST_LOG=INFO cargo run -- -c conf.yaml -d -u ``` -------------------------------- ### Implement Retry/Failover Logic Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/failover.md This example demonstrates how to implement retry and failover logic by tracking connection attempts in the context. It configures Pingora to retry a connection to a different upstream peer if the initial connection fails, and sets the error as retryable. ```Rust pub struct MyProxy(); pub struct MyCtx { tries: usize, } #[async_trait] impl ProxyHttp for MyProxy { type CTX = MyCtx; fn new_ctx(&self) -> Self::CTX { MyCtx { tries: 0 } } fn fail_to_connect( &self, _session: &mut Session, _peer: &HttpPeer, ctx: &mut Self::CTX, mut e: Box, ) -> Box { if ctx.tries > 0 { return e; } ctx.tries += 1; e.set_retry(true); e } async fn upstream_peer( &self, _session: &mut Session, ctx: &mut Self::CTX, ) -> Result> { let addr = if ctx.tries < 1 { ("192.0.2.1", 443) } else { ("1.1.1.1", 443) }; let mut peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string())); peer.options.connection_timeout = Some(Duration::from_millis(100)); Ok(peer) } } ``` -------------------------------- ### Create and use MemoryCache with TTL and eviction Source: https://context7.com/cloudflare/pingora/llms.txt Demonstrates creating a thread-safe in-memory cache with optional Time-To-Live (TTL) and various operations like put, get, multi_get, get_stale, and remove. Handles CacheStatus for hit, miss, expired, and stale entries. ```rust use pingora_memory_cache::{CacheStatus, MemoryCache}; use std::time::Duration; fn main() { // Create cache with capacity for 1000 items let cache: MemoryCache = MemoryCache::new(1000); // Insert with no TTL (lives until evicted) cache.put("user:42", "Alice".to_string(), None); // Insert with 60-second TTL cache.put("session:abc", "token_xyz".to_string(), Some(Duration::from_secs(60))); // Fetch let (val, status) = cache.get("user:42"); assert_eq!(status, CacheStatus::Hit); println!("user:42 = {:?}", val); // Some("Alice") let (val, status) = cache.get("nonexistent"); assert_eq!(status, CacheStatus::Miss); assert!(val.is_none()); // Batch get let keys = vec!["user:42".to_string(), "session:abc".to_string(), "missing".to_string()]; let results = cache.multi_get(keys.iter().map(|s| s.as_str())); for (v, s) in &results { println!("status={} value={:?}", s.as_str(), v); } // Get stale: returns value even if expired, with how-long-stale in CacheStatus let (stale_val, stale_status) = cache.get_stale("session:abc"); match stale_status { CacheStatus::Stale(age) => println!("stale by {:?}", age), CacheStatus::Hit => println!("still fresh: {:?}", stale_val), _ => {} } // Remove cache.remove("user:42"); assert_eq!(cache.get("user:42").1, CacheStatus::Miss); } ``` -------------------------------- ### Implement Rate Limiting in ProxyHttp Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md Override the `request_filter` method to implement per-client rate limiting. This example retrieves the client's appid from a header, checks against a global rate limiter, and returns a 429 status if the limit is exceeded. ```rust use async_trait::async_trait; use once_cell::sync::Lazy; use pingora::prelude::*; use pingora_limits::rate::Rate; use std::sync::Arc; use std::time::Duration; fn main() { let mut server = Server::new(Some(Opt::default())).unwrap(); server.bootstrap(); let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); // Set health check let hc = TcpHealthCheck::new(); upstreams.set_health_check(hc); upstreams.health_check_frequency = Some(Duration::from_secs(1)); // Set background service let background = background_service("health check", upstreams); let upstreams = background.task(); // Set load balancer let mut lb = http_proxy_service(&server.configuration, LB(upstreams)); lb.add_tcp("0.0.0.0:6188"); // let rate = Rate server.add_service(background); server.add_service(lb); server.run_forever(); } pub struct LB(Arc>); impl LB { pub fn get_request_appid(&self, session: &mut Session) -> Option { match session .req_header() .headers .get("appid") .map(|v| v.to_str()) { None => None, Some(v) => match v { Ok(v) => Some(v.to_string()), Err(_) => None, }, } } } // Rate limiter static RATE_LIMITER: Lazy = Lazy::new(|| Rate::new(Duration::from_secs(1))); // max request per second per client static MAX_REQ_PER_SEC: isize = 1; #[async_trait] impl ProxyHttp for LB { type CTX = (); fn new_ctx(&self) {} async fn upstream_peer( &self, _session: &mut Session, _ctx: &mut Self::CTX, ) -> Result> { let upstream = self.0.select(b"", 256).unwrap(); // Set SNI let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); Ok(peer) } async fn upstream_request_filter( &self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { upstream_request .insert_header("Host", "one.one.one.one") .unwrap(); Ok(()) } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result where Self::CTX: Send + Sync, { let appid = match self.get_request_appid(session) { None => return Ok(false), // no client appid found, skip rate limiting Some(addr) => addr, }; // retrieve the current window requests let curr_window_requests = RATE_LIMITER.observe(&appid, 1); if curr_window_requests > MAX_REQ_PER_SEC { // rate limited, return 429 let mut header = ResponseHeader::build(429, None).unwrap(); header .insert_header("X-Rate-Limit-Limit", MAX_REQ_PER_SEC.to_string()) .unwrap(); header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); session.set_keepalive(None); session .write_response_header(Box::new(header), true) .await?; return Ok(true); } Ok(false) } } ``` -------------------------------- ### Systemd Service Configuration for Pingora Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/systemd.md This configuration sets up Pingora as a systemd service. It specifies the service type, PID file location, and commands for starting, reloading, and gracefully upgrading the service. ```ini [Service] Type=forking PIDFile=/run/pingora.pid ExecStart=/bin/pingora -d -c /etc/pingora.conf ExecReload=kill -QUIT $MAINPID ExecReload=/bin/pingora -u -d -c /etc/pingora.conf ``` -------------------------------- ### Return Custom Error Pages Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md Return custom error pages instead of proxying traffic. This example checks for a specific 'Authorization' header and returns a 403 Forbidden error if it's missing. ```Rust fn check_login(req: &pingora_http::RequestHeader) -> bool { // implement you logic check logic here req.headers.get("Authorization").map(|v| v.as_bytes()) == Some(b"password") } #[async_trait] impl ProxyHttp for MyGateway { ... async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { if session.req_header().uri.path().starts_with("/login") && !check_login(session.req_header()) { let _ = session.respond_error(403).await; // true: tell the proxy that the response is already written return Ok(true); } Ok(false) } } ``` -------------------------------- ### Prepare Nginx Configuration Source: https://github.com/cloudflare/pingora/blob/main/pingora-ketama/test-data/README.md Sets up the necessary directory and copies the nginx configuration file. It then tests the configuration to ensure it's valid. ```bash mkdir -p /tmp/nginx-ketama/logs cp nginx.conf /tmp/nginx-ketama nginx -t -c nginx.conf -p /tmp/nginx-ketama ``` -------------------------------- ### Modify Response Headers Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/modify_filter.md Modify response headers in the `response_filter` phase. This example updates the 'Server' header and removes the 'alt-svc' header. ```Rust #[async_trait] impl ProxyHttp for MyGateway { ... async fn response_filter( &self, _session: &mut Session, upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { // replace existing header if any upstream_response .insert_header("Server", "MyGateway") .unwrap(); // because we don't support h3 upstream_response.remove_header("alt-svc"); Ok(()) } } ``` -------------------------------- ### Describe Certificate and Key Source: https://github.com/cloudflare/pingora/blob/main/pingora-proxy/tests/utils/conf/keys/README.md Use these commands to inspect the details of private keys and X.509 certificates. ```bash # Describe a pkey openssl [ec|rsa|...] -in key.pem -noout -text ``` ```bash # Describe a cert openssl x509 -in some_cert.crt -noout -text ``` -------------------------------- ### Define Load Balancer Structure Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Define a struct to hold the LoadBalancer, which manages the selection of upstream peers. This example uses an Arc for shared ownership. ```rust pub struct LB(Arc>); ``` -------------------------------- ### Create New Cargo Project Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Initialize a new Rust project for the load balancer. ```bash cargo new load_balancer ``` -------------------------------- ### Configure HttpPeer connection options Source: https://context7.com/cloudflare/pingora/llms.txt Shows how to configure upstream HTTP/S connection parameters using HttpPeer and PeerOptions. Includes setting various timeouts, TLS verification, ALPN preferences, and mTLS client certificates. ```rust use pingora_core::upstreams::peer::{HttpPeer, PeerOptions}; use std::time::Duration; // Basic HTTPS peer with SNI let peer = HttpPeer::new(("api.example.com", 443), true, "api.example.com".to_string()); // Peer with custom options let mut peer = HttpPeer::new(("10.0.0.5", 443), true, "internal.svc".to_string()); peer.options = PeerOptions { connection_timeout: Some(Duration::from_millis(200)), total_connection_timeout: Some(Duration::from_millis(500)), read_timeout: Some(Duration::from_secs(30)), write_timeout: Some(Duration::from_secs(10)), idle_timeout: Some(Duration::from_secs(90)), verify_cert: true, verify_hostname: true, alpn: pingora_core::protocols::ALPN::H2H1, // prefer HTTP/2, fall back to HTTP/1.1 ..Default::default() }; // Plain HTTP peer (no TLS) let http_peer = HttpPeer::new(("127.0.0.1", 8080), false, String::new()); // Usage inside upstream_peer(): async fn upstream_peer( &self, _session: &mut pingora_proxy::Session, _ctx: &mut (), ) -> pingora_core::Result> { let mut peer = Box::new(HttpPeer::new(("backend", 443), true, "backend".to_string())); peer.options.connection_timeout = Some(Duration::from_millis(100)); Ok(peer) } ``` -------------------------------- ### Create and Configure Pingora Proxy Service Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Assemble the Pingora server, define the upstream backends using `LoadBalancer`, create an HTTP proxy service with the custom `LB` implementation, and bind it to a TCP endpoint. ```rust fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap(); let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams))); lb.add_tcp("0.0.0.0:6188"); my_server.add_service(lb); my_server.run_forever(); } ``` -------------------------------- ### Basic LoadBalancer with RoundRobin and TCP Health Check Source: https://context7.com/cloudflare/pingora/llms.txt Demonstrates setting up a LoadBalancer with a static list of backends, a TCP health check, and using RoundRobin for selection. The health check is configured to run periodically and is managed by a background service. ```rust use async_trait::async_trait; use pingora_core::prelude::*; use pingora_core::services::background::background_service; use pingora_load_balancing::{health_check::TcpHealthCheck, selection::RoundRobin, LoadBalancer}; use pingora_proxy::{http_proxy_service, ProxyHttp, Session}; use std::{sync::Arc, time::Duration}; pub struct LB(Arc>); #[async_trait] impl ProxyHttp for LB { type CTX = (); fn new_ctx(&self) {} async fn upstream_peer(&self, _s: &mut Session, _ctx: &mut ()) -> Result> { // select() returns the next healthy backend; key is ignored for RoundRobin let upstream = self.0.select(b"", 256).unwrap(); Ok(Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()))) } async fn upstream_request_filter( &self, _s: &mut Session, req: &mut pingora_http::RequestHeader, _ctx: &mut (), ) -> Result<()> { req.insert_header("Host", "one.one.one.one").unwrap(); Ok(()) } } fn main() { let mut server = Server::new(None).unwrap(); server.bootstrap(); // Static list; use Backends + ServiceDiscovery trait for dynamic discovery let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:9999"]).unwrap(); // Attach TCP health check (HTTP health check also available) let hc = TcpHealthCheck::new(); upstreams.set_health_check(hc); upstreams.health_check_frequency = Some(Duration::from_secs(1)); // Wrap in a background service so health checks run automatically let bg = background_service("health check", upstreams); let upstreams = bg.task(); // Arc> let mut lb = http_proxy_service(&server.configuration, LB(upstreams)); lb.add_tcp("0.0.0.0:6188"); server.add_service(bg); server.add_service(lb); server.run_forever(); } // Test: curl 127.0.0.1:6188 -H "Host: one.one.one.one" // Output: upstream peer is: Backend { addr: Inet(1.1.1.1:443), weight: 1 } // upstream peer is: Backend { addr: Inet(1.0.0.1:443), weight: 1 } (round-robin) ``` -------------------------------- ### HttpPeer and PeerOptions Configuration Source: https://context7.com/cloudflare/pingora/llms.txt Illustrates how to configure upstream HTTP/HTTPS connections using HttpPeer and PeerOptions, including timeouts, TLS settings, and ALPN preferences. ```APIDOC ## `HttpPeer` / `PeerOptions` — Configure upstream connection parameters `HttpPeer` specifies _where_ to connect; `PeerOptions` controls _how_: timeouts, TLS verification, ALPN protocol preference, client certificates for mTLS, TCP keepalive, and source IP binding. ```rust use pingora_core::upstreams::peer::{HttpPeer, PeerOptions}; use std::time::Duration; // Basic HTTPS peer with SNI let peer = HttpPeer::new(("api.example.com", 443), true, "api.example.com".to_string()); // Peer with custom options let mut peer = HttpPeer::new(("10.0.0.5", 443), true, "internal.svc".to_string()); peer.options = PeerOptions { connection_timeout: Some(Duration::from_millis(200)), total_connection_timeout: Some(Duration::from_millis(500)), read_timeout: Some(Duration::from_secs(30)), write_timeout: Some(Duration::from_secs(10)), idle_timeout: Some(Duration::from_secs(90)), verify_cert: true, verify_hostname: true, alpn: pingora_core::protocols::ALPN::H2H1, // prefer HTTP/2, fall back to HTTP/1.1 ..Default::default() }; // Plain HTTP peer (no TLS) let http_peer = HttpPeer::new(("127.0.0.1", 8080), false, String::new()); // Usage inside upstream_peer(): async fn upstream_peer( &self, _session: &mut pingora_proxy::Session, _ctx: &mut (), ) -> pingora_core::Result> { let mut peer = Box::new(HttpPeer::new(("backend", 443), true, "backend".to_string())); peer.options.connection_timeout = Some(Duration::from_millis(100)); Ok(peer) } ``` ``` -------------------------------- ### Define Custom CTX for Per-Request State in Pingora Source: https://context7.com/cloudflare/pingora/llms.txt Use a custom `CTX` struct to pass information between proxy filter phases. This example shows parsing a 'beta-flag' header in `request_filter` and using it in `upstream_peer` to route requests. ```rust use async_trait::async_trait; use log::info; use std::sync::Mutex; use pingora_core::upstreams::peer::HttpPeer; use pingora_core::Result; use pingora_proxy::{ProxyHttp, Session}; static TOTAL_REQUESTS: Mutex = Mutex::new(0); pub struct MyProxy { beta_counter: Mutex, } pub struct MyCtx { beta_user: bool, } #[async_trait] impl ProxyHttp for MyProxy { type CTX = MyCtx; fn new_ctx(&self) -> MyCtx { MyCtx { beta_user: false } } async fn request_filter(&self, session: &mut Session, ctx: &mut MyCtx) -> Result { // Parse once here, reuse in upstream_peer ctx.beta_user = session.req_header().headers.get("beta-flag").is_some(); Ok(false) } async fn upstream_peer(&self, _s: &mut Session, ctx: &mut MyCtx) -> Result> { *TOTAL_REQUESTS.lock().unwrap() += 1; let addr = if ctx.beta_user { *self.beta_counter.lock().unwrap() += 1; info!("Beta user #{}", self.beta_counter.lock().unwrap()); ("1.0.0.1", 443) } else { ("1.1.1.1", 443) }; Ok(Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string()))) } } fn main() { let mut server = pingora_core::server::Server::new(None).unwrap(); server.bootstrap(); let mut svc = pingora_proxy::http_proxy_service( &server.configuration, MyProxy { beta_counter: Mutex::new(0) }, ); svc.add_tcp("0.0.0.0:6190"); server.add_service(svc); server.run_forever(); } // curl 127.0.0.1:6190 -H "Host: one.one.one.one" -> routes to 1.1.1.1 // curl 127.0.0.1:6190 -H "beta-flag: 1" -H "Host: ..." -> routes to 1.0.0.1 ``` -------------------------------- ### MemoryCache Usage Source: https://context7.com/cloudflare/pingora/llms.txt Demonstrates how to create, insert, retrieve, and remove items from the MemoryCache, including handling different cache statuses and TTL. ```APIDOC ## `MemoryCache` — High-performance in-memory cache with TTL and S3-FIFO eviction `pingora-memory-cache` provides `MemoryCache` backed by TinyUFO (S3-FIFO + TinyLFU). It is thread-safe, supports optional TTL, returns `CacheStatus` indicating hit/miss/expired/stale, and includes a `RTCache` wrapper for async read-through with stampede protection. ```rust use pingora_memory_cache::{CacheStatus, MemoryCache}; use std::time::Duration; fn main() { // Create cache with capacity for 1000 items let cache: MemoryCache = MemoryCache::new(1000); // Insert with no TTL (lives until evicted) cache.put("user:42", "Alice".to_string(), None); // Insert with 60-second TTL cache.put("session:abc", "token_xyz".to_string(), Some(Duration::from_secs(60))); // Fetch let (val, status) = cache.get("user:42"); assert_eq!(status, CacheStatus::Hit); println!("user:42 = {:?}", val); // Some("Alice") let (val, status) = cache.get("nonexistent"); assert_eq!(status, CacheStatus::Miss); assert!(val.is_none()); // Batch get let keys = vec!["user:42".to_string(), "session:abc".to_string(), "missing".to_string()]; let results = cache.multi_get(keys.iter().map(|s| s.as_str())); for (v, s) in &results { println!("status={} value={:?}", s.as_str(), v); } // Get stale: returns value even if expired, with how-long-stale in CacheStatus let (stale_val, stale_status) = cache.get_stale("session:abc"); match stale_status { CacheStatus::Stale(age) => println!("stale by {:?}", age), CacheStatus::Hit => println!("still fresh: {:?}", stale_val), _ => {{}} } // Remove cache.remove("user:42"); assert_eq!(cache.get("user:42").1, CacheStatus::Miss); } ``` ``` -------------------------------- ### Pingora Configuration File Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Defines service parameters like thread count, log file locations, and upgrade socket. Save this as `conf.yaml`. ```yaml --- version: 1 threads: 2 pid_file: /tmp/load_balancer.pid error_log: /tmp/load_balancer_err.log upgrade_sock: /tmp/load_balancer.sock ``` -------------------------------- ### Enable Command Line Argument Parsing Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Integrate Pingora's command-line argument parsing by passing `Opt::parse_args()` to the `Server::new` constructor. This allows the server to consume command-line arguments. ```rust fn main() { let mut my_server = Server::new(Some(Opt::parse_args())).unwrap(); ... } ``` -------------------------------- ### Share State Across Requests with Counters Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/ctx.md Implement shared state across requests using global static variables or struct members, protected by Mutex for concurrent access. This example tracks total and beta user counts. ```Rust // global counter static REQ_COUNTER: Mutex = Mutex::new(0); pub struct MyProxy { // counter for the service beta_counter: Mutex, // AtomicUsize works too } pub struct MyCtx { beta_user: bool, } fn check_beta_user(req: &pingora_http::RequestHeader) -> bool { // some simple logic to check if user is beta req.headers.get("beta-flag").is_some() } #[async_trait] impl ProxyHttp for MyProxy { type CTX = MyCtx; fn new_ctx(&self) -> Self::CTX { MyCtx { beta_user: false } } async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result { ctx.beta_user = check_beta_user(session.req_header()); Ok(false) } async fn upstream_peer( &self, _session: &mut Session, ctx: &mut Self::CTX, ) -> Result> { let mut req_counter = REQ_COUNTER.lock().unwrap(); *req_counter += 1; let addr = if ctx.beta_user { let mut beta_count = self.beta_counter.lock().unwrap(); *beta_count += 1; info!("I'm a beta user #{beta_count}"); ("1.0.0.1", 443) } else { info!("I'm an user #{req_counter}"); ("1.1.1.1", 443) }; let peer = Box::new(HttpPeer::new(addr, true, "one.one.one.one".to_string())); Ok(peer) } } ``` -------------------------------- ### Multi-cluster Routing with LoadBalancer by Request Path Source: https://context7.com/cloudflare/pingora/llms.txt Configures multiple LoadBalancer instances to handle different upstream clusters. Routing is determined by the request path, directing traffic to 'cluster_one' for paths starting with '/one/' and to 'cluster_two' otherwise. Each cluster uses TCP health checks managed by background services. ```rust use async_trait::async_trait; use pingora_core::prelude::*; use pingora_load_balancing::{ health_check::TcpHealthCheck, selection::{BackendIter, BackendSelection, RoundRobin}, LoadBalancer, }; use pingora_proxy::{http_proxy_service, ProxyHttp, Session}; use std::sync::Arc; struct Router { cluster_one: Arc>, cluster_two: Arc>, } #[async_trait] impl ProxyHttp for Router { type CTX = (); fn new_ctx(&self) {} async fn upstream_peer(&self, session: &mut Session, _ctx: &mut ()) -> Result> { let cluster = if session.req_header().uri.path().starts_with("/one/") { &self.cluster_one } else { &self.cluster_two }; let upstream = cluster.select(b"", 256).unwrap(); Ok(Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string()))) } } fn build_cluster(upstreams: &[&str]) -> pingora_core::services::background::GenBackgroundService> { let mut cluster = LoadBalancer::try_from_iter(upstreams).unwrap(); cluster.set_health_check(TcpHealthCheck::new()); cluster.health_check_frequency = Some(std::time::Duration::from_secs(1)); background_service("cluster", cluster) } fn main() { let mut server = Server::new(None).unwrap(); server.bootstrap(); let svc_one = build_cluster(&["1.1.1.1:443", "127.0.0.1:9999"]); let svc_two = build_cluster(&["1.0.0.1:443", "127.0.0.2:9999"]); let router = Router { cluster_one: svc_one.task(), cluster_two: svc_two.task(), }; let mut proxy = http_proxy_service(&server.configuration, router); proxy.add_tcp("0.0.0.0:6188"); server.add_service(proxy); server.add_service(svc_one); server.add_service(svc_two); server.run_forever(); } // curl 127.0.0.1:6188/one/ -> routes to cluster_one // curl 127.0.0.1:6188/two/ -> routes to cluster_two ``` -------------------------------- ### Wrap ProxyHttp Implementation into a Runnable Service Source: https://context7.com/cloudflare/pingora/llms.txt Use `http_proxy_service` to wrap a `ProxyHttp` implementation into a `Service` that can be hosted by the `Server`. This allows the service to listen on TCP or TLS endpoints. ```rust use pingora_core::listeners::tls::TlsSettings; use pingora_core::server::Server; use pingora_proxy::http_proxy_service; fn main() { let mut server = Server::new(None).unwrap(); server.bootstrap(); // Create the proxy service let mut svc = http_proxy_service(&server.configuration, MyProxy); // Plain HTTP svc.add_tcp("0.0.0.0:8080"); // HTTPS with HTTP/2 enabled let mut tls = TlsSettings::intermediate("cert.pem", "key.pem").unwrap(); tls.enable_h2(); svc.add_tls_with_settings("0.0.0.0:8443", None, tls); server.add_service(svc); server.run_forever(); } ``` -------------------------------- ### Collect Trace Log Source: https://github.com/cloudflare/pingora/blob/main/pingora-ketama/test-data/README.md Copies the generated access log from the temporary nginx directory to the current directory, renaming it to 'sample-nginx-upstream.csv'. ```bash cp /tmp/nginx-ketama/logs/access.log ./sample-nginx-upstream.csv ``` -------------------------------- ### Pingora Proxying Workflow Layers Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md Details the responsibilities of different components in the Pingora proxying workflow, from HttpProxy to Service. ```text ┌─────────────┐ ┌──────────────────────────────────────┐ │ HttpProxy │ │Handles high level Proxying workflow, │ │ (struct) │─ ─ ─ ─ │ customizable via ProxyHttp trait │ └──────┬──────┘ └──────────────────────────────────────┘ │ ┌──────▼──────┐ ┌──────────────────────────────────────┐ │HttpServerApp│ │ Handles selection of H1 vs H2 stream │ │ (trait) │─ ─ ─ ─ │ handling, incl H2 handshake │ └──────┬──────┘ └──────────────────────────────────────┘ │ ┌──────▼──────┐ ┌──────────────────────────────────────┐ │ ServerApp │ │ Handles dispatching of App instances │ │ (trait) │─ ─ ─ ─ │ as individual tasks, per Session │ └──────┬──────┘ └──────────────────────────────────────┘ │ ┌──────▼──────┐ ┌──────────────────────────────────────┐ │ Service │ │ Handles dispatching of App instances │ │ (struct) │─ ─ ─ ─ │ as individual tasks, per Listener │ └─────────────┘ └──────────────────────────────────────┘ ``` -------------------------------- ### Implement Per-Key Rate Limiting Source: https://context7.com/cloudflare/pingora/llms.txt Use `pingora-limits::rate::Rate` for sliding-window rate limiting. Call `observe(key, cost)` in `request_filter` and return a `429` response if the limit is exceeded. Limiting is skipped if the specified header is absent. ```rust use async_trait::async_trait; use once_cell::sync::Lazy; use pingora_core::prelude::*; use pingora_http::ResponseHeader; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use std::time::Duration; static RATE_LIMITER: Lazy = Lazy::new(|| Rate::new(Duration::from_secs(1))); const MAX_RPS: isize = 10; pub struct RateLimitedProxy; #[async_trait] impl ProxyHttp for RateLimitedProxy { type CTX = (); fn new_ctx(&self) {} async fn request_filter(&self, session: &mut Session, _ctx: &mut ()) -> Result { // Key on the "appid" header; skip limiting if absent let appid = match session.req_header().headers.get("appid").and_then(|v| v.to_str().ok()) { Some(id) => id.to_string(), None => return Ok(false), }; let current = RATE_LIMITER.observe(&appid, 1); if current > MAX_RPS { let mut resp = ResponseHeader::build(429, None).unwrap(); resp.insert_header("X-Rate-Limit-Limit", MAX_RPS.to_string()).unwrap(); resp.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); resp.insert_header("X-Rate-Limit-Reset", "1").unwrap(); session.set_keepalive(None); session.write_response_header(Box::new(resp), true).await?; return Ok(true); // response written, short-circuit } Ok(false) } async fn upstream_peer(&self, _s: &mut Session, _ctx: &mut ()) -> Result> { Ok(Box::new(HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".to_string()))) } } ``` -------------------------------- ### Implement ProxyHttp for Upstream Peer Selection Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Implement the `ProxyHttp` trait to define how the proxy selects an upstream peer for each incoming request. This uses the `LoadBalancer`'s `select` method for round-robin distribution and configures TLS with SNI. ```rust #[async_trait] impl ProxyHttp for LB { /// For this small example, we don't need context storage type CTX = (); fn new_ctx(&self) -> () { () } async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result> { let upstream = self.0 .select(b"", 256) // hash doesn't matter for round robin .unwrap(); println!("upstream peer is: {upstream:?}"); // Set SNI to one.one.one.one let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); Ok(peer) } } ``` -------------------------------- ### HTTP Proxy Request Lifecycle Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md Visualizes the phases of an HTTP request lifecycle within the Pingora HttpProxy, including filtering, upstream communication, and response handling. ```mermaid graph TD; start("new request")-->request_filter; request_filter-->upstream_peer; upstream_peer-->Connect{{IO: connect to upstream}}; Connect--connection success-->connected_to_upstream; Connect--connection failure-->fail_to_connect; connected_to_upstream-->upstream_request_filter; upstream_request_filter --> SendReq{{IO: send request to upstream}}; SendReq-->RecvResp{{IO: read response from upstream}}; RecvResp-->upstream_response_filter-->response_filter-->upstream_response_body_filter-->response_body_filter-->logging-->endreq("request done"); fail_to_connect --can retry-->upstream_peer; fail_to_connect --can't retry-->fail_to_proxy--send error response-->logging; RecvResp--failure-->IOFailure; SendReq--failure-->IOFailure; error_while_proxy--can retry-->upstream_peer; error_while_proxy--can't retry-->fail_to_proxy; request_filter --send response-->logging Error>any response filter error]-->error_while_proxy IOFailure>IO error]-->error_while_proxy ``` -------------------------------- ### Basic Proxy Architecture Diagram Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/internals.md A high-level diagram illustrating the fundamental interaction between a Downstream Client, a Proxy, and an Upstream Server. ```text ┌────────────┐ ┌─────────────┐ ┌────────────┐ │ Downstream │ │ Proxy │ │ Upstream │ │ Client │─────────>│ │────────>│ Server │ └────────────┘ └─────────────┘ └────────────┘ ``` -------------------------------- ### Test Load Balancer Source: https://github.com/cloudflare/pingora/blob/main/docs/quick_start.md Command to send requests to the running load balancer for testing. It sends requests to localhost on port 6188 and outputs verbose information, discarding the response body. ```bash curl 127.0.0.1:6188 -svo /dev/null ``` -------------------------------- ### Add RateLimiter Dependencies Source: https://github.com/cloudflare/pingora/blob/main/docs/user_guide/rate_limiter.md Add the necessary dependencies to your `Cargo.toml` file to use Pingora's rate limiting features. ```toml async-trait="0.1" pingora = { version = "0.3", features = [ "lb" ] } pingora-limits = "0.3.0" once_cell = "1.19.0" ```