Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Add Docs
Crazyflie Python Library
https://github.com/bitcraze/crazyflie-lib-python
Admin
cflib is a Python API designed to communicate with and control Crazyflie and Crazyflie 2.0
...
Tokens:
35,807
Snippets:
337
Trust Score:
8.4
Update:
2 weeks ago
Context
Skills
Chat
Benchmark
87.1
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# cflib - Crazyflie Python Library cflib is a Python library for communicating with and controlling Crazyflie quadcopters, including the Crazyflie 2.1(+), Crazyflie 2.1 Brushless, and Crazyflie Bolt. The library provides a comprehensive Python API for direct programmatic control through Python scripts and serves as the backend communication layer for applications like the Crazyflie PC client (cfclient). The library supports multiple communication interfaces including radio (via Crazyradio PA), USB, TCP, and UDP. It handles scanning for available Crazyflies, establishing connections, downloading Table of Contents (TOC) for logging and parameters, sending flight commands, and receiving telemetry data. The architecture is event-driven with callback-based asynchronous operations, but also provides synchronous wrapper classes for simpler scripting. ## Initializing Drivers and Scanning for Crazyflies The `cflib.crtp` module provides functions to initialize communication drivers and scan for available Crazyflie devices on all supported interfaces. ```python import cflib.crtp # Initialize the low-level drivers (must be called before any communication) cflib.crtp.init_drivers() # Optionally enable serial driver for UART communication cflib.crtp.init_drivers(enable_serial_driver=True) # Scan all interfaces for available Crazyflies available = cflib.crtp.scan_interfaces() for uri, comment in available: print(f'Found Crazyflie at {uri}: {comment}') # Example output: # Found Crazyflie at radio://0/80/2M/E7E7E7E7E7: Crazyflie 2.1 # Get status of all interfaces status = cflib.crtp.get_interfaces_status() for interface, interface_status in status.items(): print(f'{interface}: {interface_status}') ``` ## Connecting to a Crazyflie (Asynchronous API) The `Crazyflie` class provides the core asynchronous API with callback-based connection handling. This is useful for event-driven applications that need fine-grained control over the connection lifecycle. ```python import time import cflib.crtp from cflib.crazyflie import Crazyflie cflib.crtp.init_drivers() cf = Crazyflie(rw_cache='./cache') # Enable TOC caching for faster reconnection def connected_callback(link_uri): print(f'Connected to {link_uri}') def connection_failed_callback(link_uri, msg): print(f'Connection to {link_uri} failed: {msg}') def disconnected_callback(link_uri): print(f'Disconnected from {link_uri}') # Register callbacks cf.connected.add_callback(connected_callback) cf.connection_failed.add_callback(connection_failed_callback) cf.disconnected.add_callback(disconnected_callback) # Connect to the Crazyflie uri = 'radio://0/80/2M/E7E7E7E7E7' cf.open_link(uri) # Keep the script running while connected try: while cf.is_connected(): time.sleep(1) except KeyboardInterrupt: pass # Clean up cf.close_link() ``` ## Connecting with SyncCrazyflie (Synchronous API) The `SyncCrazyflie` class wraps the asynchronous `Crazyflie` class to provide blocking operations, making it easier to write sequential scripts. It can be used as a context manager for automatic connection handling. ```python import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' # Using context manager for automatic connection/disconnection with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: print(f'Connected to {uri}') # Wait for all parameters to be downloaded (optional) scf.wait_for_params() print('All parameters downloaded') # Access the underlying Crazyflie object cf = scf.cf print(f'Protocol version: {cf.platform.get_protocol_version()}') # Automatically disconnected when context exits print('Disconnected') ``` ## Sending Flight Commands with Commander The `Commander` class provides low-level setpoint commands for controlling the Crazyflie's flight. These require continuous sending (typically at 10-100Hz) to maintain control. ```python import time import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: cf = scf.cf # Send roll/pitch/yaw/thrust setpoint # roll, pitch: degrees | yawrate: degrees/s | thrust: 10001-60000 for _ in range(50): # Send for ~1 second at 50Hz cf.commander.send_setpoint(0, 0, 0, 35000) # Hover-ish thrust time.sleep(0.02) # Stop motors cf.commander.send_stop_setpoint() # Send velocity setpoint in world frame # vx, vy, vz: m/s | yawrate: degrees/s cf.commander.send_velocity_world_setpoint(0.1, 0, 0, 0) # Move forward at 0.1 m/s # Send hover setpoint (body-frame velocity + absolute height) # vx, vy: m/s | yawrate: degrees/s | zdistance: meters cf.commander.send_hover_setpoint(0.1, 0, 0, 0.5) # Forward at 0.5m height # Send absolute position setpoint # x, y, z: meters | yaw: degrees cf.commander.send_position_setpoint(1.0, 0.5, 0.5, 0) # Send full state setpoint (for advanced control) pos = [1.0, 0.5, 0.5] # x, y, z in meters vel = [0, 0, 0] # vx, vy, vz in m/s acc = [0, 0, 0] # ax, ay, az in m/s^2 orientation = [0, 0, 0, 1] # quaternion [qx, qy, qz, qw] cf.commander.send_full_state_setpoint(pos, vel, acc, orientation, 0, 0, 0) # Notify that setpoints will stop (allows other commanders to take over) cf.commander.send_notify_setpoint_stop() ``` ## High-Level Commander for Autonomous Flight The `HighLevelCommander` generates smooth trajectories onboard the Crazyflie for autonomous flight maneuvers like takeoff, landing, and go-to commands. Requires a positioning system (e.g., LPS, Flow deck). ```python import time import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: cf = scf.cf commander = cf.high_level_commander # Arm the Crazyflie (required before flight) cf.supervisor.send_arming_request(True) time.sleep(1.0) # Take off to 1.0m height over 2.0 seconds commander.takeoff(absolute_height_m=1.0, duration_s=2.0) time.sleep(3.0) # Go to absolute position (x=1, y=0, z=1, yaw=0 rad) in 3 seconds commander.go_to(x=1.0, y=0, z=1.0, yaw=0, duration_s=3.0) time.sleep(3.5) # Go to relative position (move 0.5m forward) in 2 seconds commander.go_to(x=0.5, y=0, z=0, yaw=0, duration_s=2.0, relative=True) time.sleep(2.5) # Spiral maneuver: 90 degrees, 0.5m initial radius, 1.0m final radius, 0.3m ascent commander.spiral(angle=1.57, r0=0.5, rF=1.0, ascent=0.3, duration_s=4.0) time.sleep(4.5) # Land to 0.0m over 2.0 seconds commander.land(absolute_height_m=0.0, duration_s=2.0) time.sleep(2.5) # Stop all motors commander.stop() ``` ## MotionCommander for Easy Velocity Control The `MotionCommander` provides intuitive movement primitives using velocity setpoints. It requires a positioning system (e.g., Flow deck) and can be used as a context manager for automatic takeoff/landing. ```python import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.positioning.motion_commander import MotionCommander cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: # Context manager handles takeoff and landing automatically with MotionCommander(scf, default_height=0.5) as mc: # Blocking movement commands mc.forward(0.5) # Move forward 0.5m mc.back(0.5) # Move back 0.5m mc.left(0.3) # Move left 0.3m mc.right(0.3) # Move right 0.3m mc.up(0.2) # Move up 0.2m mc.down(0.2) # Move down 0.2m # Custom velocity and distance mc.forward(1.0, velocity=0.5) # Forward 1m at 0.5 m/s # Turn commands mc.turn_left(90) # Turn 90 degrees left mc.turn_right(45) # Turn 45 degrees right # Circle maneuvers mc.circle_left(0.5, velocity=0.3) # Circle left with 0.5m radius mc.circle_right(0.3, angle_degrees=180) # Half circle right # Move in 3D mc.move_distance(0.5, 0.3, 0.2) # Move diagonally # Non-blocking commands (returns immediately) mc.start_forward(velocity=0.2) time.sleep(1) mc.start_turn_left(rate=45) # Turn while moving forward time.sleep(2) mc.stop() # Stop and hover # Linear motion with yaw rate mc.start_linear_motion(0.2, 0, 0, rate_yaw=30) # Forward while rotating time.sleep(3) mc.stop() # Automatic landing when context exits ``` ## PositionHlCommander for Position-Based Control The `PositionHlCommander` uses the high-level commander with position setpoints. It tracks position internally and provides blocking movement commands with automatic timing. ```python import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.positioning.position_hl_commander import PositionHlCommander cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: # Set initial position and default parameters with PositionHlCommander( scf, x=0.0, y=0.0, z=0.0, default_velocity=0.3, default_height=0.5, controller=PositionHlCommander.CONTROLLER_PID, default_landing_height=0.0 ) as pc: # Movement commands (blocking) pc.forward(1.0) # Forward 1m pc.left(0.5) # Left 0.5m pc.up(0.3) # Up 0.3m pc.back(1.0) # Back 1m pc.right(0.5) # Right 0.5m pc.down(0.3) # Down 0.3m # Go to absolute position pc.go_to(1.0, 1.0, 0.8) # Go to (1, 1, 0.8) pc.go_to(0, 0) # Return to (0, 0, default_height) # Move with custom velocity pc.forward(0.5, velocity=0.5) # Get current position x, y, z = pc.get_position() print(f'Current position: ({x}, {y}, {z})') # Update default parameters pc.set_default_velocity(0.4) pc.set_default_height(0.6) # Set landing height for landing on elevated surfaces pc.set_landing_height(0.2) # Automatic landing when context exits ``` ## Logging Data from the Crazyflie The logging system allows subscribing to variables from the Crazyflie's Table of Contents (TOC). Variables are grouped and can be fetched at configurable rates. ```python import time import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.log import LogConfig from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' # Callback for received log data def log_callback(timestamp, data, logconf): print(f'[{timestamp}] {logconf.name}:') for name, value in data.items(): print(f' {name}: {value:.3f}') # Callback for log errors def log_error_callback(logconf, msg): print(f'Error in {logconf.name}: {msg}') with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: cf = scf.cf # Create log configuration log_config = LogConfig(name='StateEstimate', period_in_ms=100) log_config.add_variable('stateEstimate.x', 'float') log_config.add_variable('stateEstimate.y', 'float') log_config.add_variable('stateEstimate.z', 'float') log_config.add_variable('stabilizer.roll', 'float') log_config.add_variable('stabilizer.pitch', 'float') log_config.add_variable('stabilizer.yaw', 'float') log_config.add_variable('pm.vbat', 'FP16') # Use FP16 to save bandwidth # Add configuration to the Crazyflie cf.log.add_config(log_config) # Register callbacks log_config.data_received_cb.add_callback(log_callback) log_config.error_cb.add_callback(log_error_callback) # Start logging log_config.start() # Let it run for 5 seconds time.sleep(5) # Stop and delete the log configuration log_config.stop() log_config.delete() ``` ## SyncLogger for Simplified Logging The `SyncLogger` provides an iterator-based interface for synchronous log data access, useful for sequential scripts. ```python import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.log import LogConfig from cflib.crazyflie.syncCrazyflie import SyncCrazyflie from cflib.crazyflie.syncLogger import SyncLogger cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: log_config = LogConfig(name='Position', period_in_ms=100) log_config.add_variable('stateEstimate.x', 'float') log_config.add_variable('stateEstimate.y', 'float') log_config.add_variable('stateEstimate.z', 'float') # Use SyncLogger as context manager with SyncLogger(scf, log_config) as logger: count = 0 for timestamp, data, logconf in logger: x = data['stateEstimate.x'] y = data['stateEstimate.y'] z = data['stateEstimate.z'] print(f'[{timestamp}] Position: ({x:.2f}, {y:.2f}, {z:.2f})') count += 1 if count >= 50: # Get 50 samples then exit break ``` ## Reading and Writing Parameters The parameter system allows reading and writing configuration values on the Crazyflie. Parameters are organized in groups and can be persistent (stored in EEPROM). ```python import time import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' # Callback for parameter updates def param_callback(name, value): print(f'Parameter {name} updated to {value}') with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: cf = scf.cf # Wait for all parameters to be downloaded scf.wait_for_params() # Read a parameter value led_bitmask = cf.param.get_value('led.bitmask') print(f'Current LED bitmask: {led_bitmask}') # Add callback for specific parameter cf.param.add_update_callback(group='led', name='bitmask', cb=param_callback) # Set a parameter value cf.param.set_value('led.bitmask', '255') # Turn on all LEDs time.sleep(1) cf.param.set_value('led.bitmask', '0') # Turn off all LEDs # Add callback for entire group cf.param.add_update_callback(group='stabilizer', cb=param_callback) # Set controller type cf.param.set_value('stabilizer.controller', '1') # PID controller # For persistent parameters (stored in EEPROM) # Store current value to EEPROM def store_callback(name, success): print(f'Store {name}: {"Success" if success else "Failed"}') cf.param.persistent_store('sound.effect', callback=store_callback) # Get persistent parameter state def state_callback(name, state): if state: print(f'{name}: stored={state.is_stored}, default={state.default_value}') cf.param.persistent_get_state('sound.effect', callback=state_callback) # Clear stored value (revert to default) cf.param.persistent_clear('sound.effect', callback=store_callback) time.sleep(1) ``` ## Swarm Control The `Swarm` class enables controlling multiple Crazyflies simultaneously with parallel or sequential execution of functions. ```python import time import cflib.crtp from cflib.crazyflie.swarm import Swarm, CachedCfFactory from cflib.crazyflie.syncCrazyflie import SyncCrazyflie cflib.crtp.init_drivers() # Define URIs for swarm members (same radio channel) uris = [ 'radio://0/80/2M/E7E7E7E701', 'radio://0/80/2M/E7E7E7E702', 'radio://0/80/2M/E7E7E7E703', ] # Define functions to execute on each Crazyflie def arm(scf: SyncCrazyflie): scf.cf.supervisor.send_arming_request(True) time.sleep(1.0) def take_off(scf: SyncCrazyflie): scf.cf.high_level_commander.takeoff(1.0, 2.0) time.sleep(3.0) def land(scf: SyncCrazyflie): scf.cf.high_level_commander.land(0.0, 2.0) time.sleep(2.5) scf.cf.high_level_commander.stop() def run_sequence(scf: SyncCrazyflie, sequence): """Execute a sequence of positions for one Crazyflie.""" commander = scf.cf.high_level_commander for x, y, z, duration in sequence: commander.go_to(x, y, z, 0, duration, relative=True) time.sleep(duration) # Per-Crazyflie sequences (args_dict maps URI to function arguments) sequences = { uris[0]: [[(0.5, 0, 0, 2.0), (-0.5, 0, 0, 2.0)]], uris[1]: [[(0, 0.5, 0, 2.0), (0, -0.5, 0, 2.0)]], uris[2]: [[(0, 0, 0.3, 2.0), (0, 0, -0.3, 2.0)]], } # Use CachedCfFactory for faster connections with TOC caching factory = CachedCfFactory(rw_cache='./cache') with Swarm(uris, factory=factory) as swarm: print('Connected to all Crazyflies') # Reset position estimators and wait for stable positions swarm.reset_estimators() print('Estimators ready') # Get estimated positions positions = swarm.get_estimated_positions() for uri, pos in positions.items(): print(f'{uri}: ({pos.x:.2f}, {pos.y:.2f}, {pos.z:.2f})') # Execute functions in parallel on all Crazyflies swarm.parallel_safe(arm) swarm.parallel_safe(take_off) # Execute with per-Crazyflie arguments swarm.parallel_safe(run_sequence, args_dict=sequences) swarm.parallel_safe(land) # Alternative: sequential execution # swarm.sequential(land) ``` ## Uploading and Executing Trajectories Trajectories defined as polynomial segments can be uploaded to the Crazyflie's memory and executed autonomously using the high-level commander. ```python import sys import cflib.crtp from cflib.crazyflie import Crazyflie from cflib.crazyflie.mem import MemoryElement, Poly4D from cflib.crazyflie.syncCrazyflie import SyncCrazyflie import time cflib.crtp.init_drivers() uri = 'radio://0/80/2M/E7E7E7E7E7' # Trajectory as list of polynomial coefficients # [duration, x^0..x^7, y^0..y^7, z^0..z^7, yaw^0..yaw^7] trajectory = [ [1.0, 0, 0, 0, 0, 0.5, -0.2, 0.1, 0, # X coefficients 0, 0, 0, 0, 0, 0, 0, 0, # Y coefficients 0, 0, 0, 0, 0, 0, 0, 0, # Z coefficients 0, 0, 0, 0, 0, 0, 0, 0], # Yaw coefficients [1.0, 0.5, 0.3, -0.1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ] def upload_trajectory(cf, trajectory_id, trajectory_data): """Upload trajectory to Crazyflie memory.""" trajectory_mem = cf.mem.get_mems(MemoryElement.TYPE_TRAJ)[0] trajectory_mem.trajectory = [] total_duration = 0 for row in trajectory_data: duration = row[0] x = Poly4D.Poly(row[1:9]) y = Poly4D.Poly(row[9:17]) z = Poly4D.Poly(row[17:25]) yaw = Poly4D.Poly(row[25:33]) trajectory_mem.trajectory.append(Poly4D(duration, x, y, z, yaw)) total_duration += duration # Upload synchronously success = trajectory_mem.write_data_sync() if not success: raise RuntimeError('Trajectory upload failed') # Define trajectory in firmware cf.high_level_commander.define_trajectory( trajectory_id, offset=0, n_pieces=len(trajectory_mem.trajectory) ) return total_duration with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf: cf = scf.cf commander = cf.high_level_commander # Upload trajectory trajectory_id = 1 duration = upload_trajectory(cf, trajectory_id, trajectory) print(f'Uploaded trajectory (duration: {duration}s)') # Arm and take off cf.supervisor.send_arming_request(True) time.sleep(1.0) commander.takeoff(1.0, 2.0) time.sleep(3.0) # Execute trajectory # time_scale: 1.0=normal, >1.0=slower, <1.0=faster commander.start_trajectory( trajectory_id, time_scale=1.0, relative_position=True, # Start from current position relative_yaw=False, reversed=False ) time.sleep(duration + 0.5) # Land commander.land(0.0, 2.0) time.sleep(2.5) commander.stop() ``` ## Summary cflib provides a comprehensive toolkit for Crazyflie quadcopter development, ranging from low-level packet communication to high-level autonomous flight control. The primary use cases include rapid prototyping of autonomous flight behaviors, educational robotics projects, swarm coordination research, and integration with external positioning systems. The library supports multiple control paradigms: asynchronous event-driven programming for responsive applications, synchronous blocking operations for simple scripts, and both velocity-based and position-based flight control. The typical integration pattern involves initializing drivers, establishing a connection (optionally with TOC caching), configuring logging for telemetry, setting parameters as needed, and then using either the low-level Commander for manual control or the HighLevelCommander/MotionCommander/PositionHlCommander for autonomous maneuvers. For multi-robot applications, the Swarm class simplifies parallel operations across multiple Crazyflies while handling connection management and error reporting. All components support context managers for safe resource cleanup, and the callback system enables reactive applications that respond to connection events, parameter changes, and incoming log data.