### Get Package Prefix and Share Directory Source: https://github.com/ros2/ros2cli/blob/rolling/ros2pkg/CHANGELOG.rst Demonstrates how to retrieve the installation prefix of a ROS 2 package and its share directory using the 'ros2 pkg prefix' command. Includes usage examples and help output. ```bash $ ros2 pkg prefix ament_flake8 /home/nick/ros2_ws/install $ ros2 pkg prefix --share ament_flake8 /home/nick/ros2_ws/install Share dir: /home/nick/ros2_ws/install/share/ament_flake8 $ ros2 pkg prefix -h usage: ros2 pkg prefix [-h] [--share] package_name Output the prefix path of a package positional arguments: package_name The package name optional arguments: -h, --help show this help message and exit --share show share directory for the package ``` -------------------------------- ### Start, Stop, and Status for ROS 2 Daemon Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md Use these commands to manage the ROS 2 daemon process. The 'start' command initiates the daemon, 'stop' terminates it, and 'status' checks its current state. Use '--help' to see all available daemon sub-commands. ```bash ros2 daemon start # Starts the daemon ros2 daemon stop # Stops the daemon ros2 daemon status # Checks daemon status ros2 daemon --help # Shows available verbs ``` -------------------------------- ### Start ROS 2 Daemon Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon.md Entry point for the `_ros2_daemon` command. Starts the daemon process by parsing arguments, validating environment, and creating an XMLRPC server. ```python from ros2cli.daemon import main main() ``` -------------------------------- ### Get All Registered Entry Points Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/entry_points.md Discovers and returns all entry points across all installed distributions for registered extension point groups. Provides a comprehensive view of available extensions. ```python from ros2cli.entry_points import get_all_entry_points all_eps = get_all_entry_points() for group_name, entries in all_eps.items(): print(f'{group_name}:') for name, (dist, ep) in entries.items(): print(f' {name} from {dist.name}') ``` -------------------------------- ### Daemon Process Command Line Example Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Example of how daemon process tags are represented on the command line. This shows the equivalent command-line arguments for the Python dictionary tags. ```bash python -c ... --name ros2-daemon --ros-domain-id 0 --rmw-implementation rmw_fastrtps_cpp ``` -------------------------------- ### Publish with QoS Profile Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Example of publishing a message with a specific QoS profile preset. This is useful for controlling communication reliability and history. ```bash ros2 topic pub /my_topic std_msgs/String "data: hello" --qos-profile sensor_data ``` -------------------------------- ### Example Usage of DirectNode Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Demonstrates how to use DirectNode to connect to a local node and retrieve topic information. Requires importing DirectNode and argparse. ```python from ros2cli.node.direct import DirectNode import argparse args = argparse.Namespace( spin_time=0.5, use_sim_time=False, start_parameter_services=False, node_name_suffix='_test' ) with DirectNode(args, node_name='inspector') as node: names = node.get_topic_names_and_types() for topic, types in names: print(f'{topic}: {types}') ``` -------------------------------- ### List Installed ROS 2 Extensions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md This command lists all installed ROS 2 extensions, such as commands and verbs. Use the '--all' flag to show extensions that failed to load or instantiate, and '--verbose' for detailed information including module and distribution. ```bash ros2 extensions ros2 extensions -a ros2 extensions -v ``` -------------------------------- ### Register ROS 2 CLI Entry Points in setup.py Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Example of how to register custom commands and verbs for the ROS 2 CLI by defining entry points in the setup.py file. ```python entry_points={ 'ros2cli.command': [ 'mycommand = mypackage.command:MyCommand', ], 'mycommand.verb': [ 'myverb = mypackage.verb:MyVerb', ], } ``` -------------------------------- ### Subscribe with QoS Overrides Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Example of subscribing to a topic with explicit QoS overrides for depth, reliability, and profile. Use this to fine-tune subscription behavior beyond presets. ```bash ros2 topic sub /my_topic --qos-profile sensor_data --qos-depth 10 --qos-reliability reliable ``` -------------------------------- ### Add QoS Arguments and Choose QoS Profile Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Example showing how to add Quality of Service (QoS) arguments to a parser and then use them to choose an appropriate QoS profile for a ROS 2 node. ```python import argparse from ros2cli.qos import add_qos_arguments, choose_qos from ros2cli.node.strategy import NodeStrategy parser = argparse.ArgumentParser() add_qos_arguments(parser, 'publish') args = parser.parse_args() with NodeStrategy(args) as node: qos = choose_qos(node, args.topic, args) ``` -------------------------------- ### Example Usage of daemonize Function Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemonize.md Demonstrates how to use the `daemonize` function to spawn a background process. Includes setting up a callable, defining tags for identification, and handling potential runtime errors during daemon startup. ```python from ros2cli.daemon.daemonize import daemonize def my_daemon(): import time print('Daemon started') time.sleep(100) print('Daemon exiting') tags = { 'name': 'my-daemon', 'ros_domain_id': 0, } try: daemonize(my_daemon, tags=tags, timeout=10.0, debug=False) print('Daemon spawned successfully') except RuntimeError as e: print(f'Failed to spawn daemon: {e}') ``` -------------------------------- ### Example VerbExtension Implementation Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/verb.md This snippet shows a basic implementation of the VerbExtension class, including adding arguments and the main execution logic. ```python from ros2cli.verb import VerbExtension class MyVerb(VerbExtension): def add_arguments(self, parser, cli_name): parser.add_argument('--verbose', action='store_true') def main(self, *, args): if args.verbose: print('Executing my verb with verbose output') return 0 ``` -------------------------------- ### Add Custom Checks with Python Entry Points Source: https://github.com/ros2/ros2cli/blob/rolling/ros2doctor/README.md Example of how to define entry points in setup.py to add custom checks and reports to ros2doctor. ```python entry_points={ 'ros2doctor.checks': [ 'PlatformCheck = ros2doctor.api.platform:PlatformCheck', 'NetworkCheck = ros2doctor.api.network:NetworkCheck', ], 'ros2doctor.report': [ 'PlatformReport = ros2doctor.api.platform:PlatformReport', 'NetworkReport = ros2doctor.api.network:NetworkReport', ], } ``` -------------------------------- ### Start ROS 2 Daemon Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon_verbs.md Starts the ROS 2 daemon process. Use the --debug or -d flag to print debug messages. ```bash ros2 daemon start ros2 daemon start --debug ros2 daemon start -d ``` -------------------------------- ### Python Example of Automatic QoS Selection Logic Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Illustrates the logic for automatic QoS profile selection when explicit overrides are not provided. It shows how the system chooses a compatible profile based on existing publishers. ```python # Topic has 2 publishers: # - Publisher A: RELIABLE + TRANSIENT_LOCAL # - Publisher B: BEST_EFFORT + VOLATILE # Chosen profile: BEST_EFFORT + VOLATILE (compatible with both) # Prints: "Some, but not all, publishers are offering RELIABLE..." ``` -------------------------------- ### Example Usage of DaemonNode Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Shows how to use DaemonNode to connect to the ROS 2 daemon and retrieve a list of nodes. Includes a check for daemon connectivity. ```python from ros2cli.node.daemon import DaemonNode import argparse args = argparse.Namespace() with DaemonNode(args) as node: if node.connected: names = node.get_node_names_and_namespaces() print(f'Nodes: {names}') else: print('Daemon not available') ``` -------------------------------- ### ros2 daemon start Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon_verbs.md Starts the ros2 daemon process. It can optionally print debug messages. ```APIDOC ## POST /daemon/start ### Description Verb to start the ros2 daemon process. It prints a message indicating whether the daemon was started or is already running. ### Method POST ### Endpoint /daemon/start ### Parameters #### Query Parameters - **--debug, -d** (boolean) - Optional - Print debug messages (daemon output visible) ### Request Example ```bash ros2 daemon start ros2 daemon start --debug ros2 daemon start -d ``` ### Response #### Success Response (200) - **message** (string) - Indicates the status of the daemon start operation (e.g., "The daemon has been started" or "The daemon is already running") #### Response Example ```json { "message": "The daemon has been started" } ``` ``` -------------------------------- ### Example Implementation of CommandExtension Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/command.md This snippet shows how to implement a custom command by inheriting from `CommandExtension`. It demonstrates adding subparsers on demand and handling the execution of the selected verb. ```python from ros2cli.command import CommandExtension, add_subparsers_on_demand class MyCommand(CommandExtension): def add_arguments(self, parser, cli_name): add_subparsers_on_demand( parser, cli_name, '_verb', 'my_command.verb' ) def main(self, *, parser, args): extension = getattr(args, '_verb', None) if extension is None: parser.print_help() return 0 return extension.main(args=args) ``` -------------------------------- ### Daemon Process Tags Example Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Example of Python dictionary defining tags for a ROS 2 daemon process. These tags help identify the process in system monitoring tools. ```python { 'name': 'ros2-daemon', 'ros_domain_id': 0, 'rmw_implementation': 'rmw_fastrtps_cpp' } ``` -------------------------------- ### Daemon Management: Spawn and Shutdown Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/INDEX.md Provides examples for spawning and shutting down the ROS 2 daemon process. Both functions accept arguments and a timeout. ```python from ros2cli.node.daemon import spawn_daemon, shutdown_daemon spawn_daemon(args, timeout=10) shutdown_daemon(args, timeout=10) ``` -------------------------------- ### List Installed Extensions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md This snippet shows the basic usage of the 'ros2 extensions' command to list all installed ROS 2 CLI extensions. It iterates through extension point groups and prints each entry point. ```python def main(self, *, parser, args): # Gets all entry points across all extension point groups # Iterates through each group (sorted) # Within each group, prints each entry point (sorted) # Calls print_entry_point() for each pass ``` ```bash ros2 extensions ``` -------------------------------- ### Custom Command Extension Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/INDEX.md Example of creating a custom command by extending CommandExtension. The main method should return an exit code. ```python from ros2cli.command import CommandExtension class MyCommand(CommandExtension): def main(self, *, parser, args): return 0 ``` -------------------------------- ### Basic CLI Entry Point Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/INDEX.md This snippet shows the basic usage of the ros2cli main function to start the CLI. It returns an exit code. ```python from ros2cli.cli import main exit_code = main() ``` -------------------------------- ### Usage of add_subparsers_on_demand Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/command.md This example demonstrates how to use `add_subparsers_on_demand` to create argparse subparsers for extensions. It shows how to specify the CLI name, destination attribute, entry point group, and how to hide specific extensions from help messages. ```python from ros2cli.command import add_subparsers_on_demand import argparse parser = argparse.ArgumentParser() add_subparsers_on_demand( parser, 'ros2 mycommand', '_verb', 'mycommand.verb', hide_extensions=['internal'] ) args = parser.parse_args(['myverb']) ``` -------------------------------- ### ros2 daemon main entry point Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon.md The `main` function serves as the entry point for the `_ros2_daemon` command. It is responsible for parsing arguments, validating the environment, setting up an XMLRPC server, and starting the daemon process. ```APIDOC ## Function: main ### Description Entry point for the `_ros2_daemon` command. Starts the daemon process. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Arguments - **--rmw-implementation** (str) - Required - RMW implementation name (must match rclpy) - **--ros-domain-id** (int) - Required - ROS domain ID (must match ROS_DOMAIN_ID env var) - **--timeout** (int) - Optional - Daemon inactivity timeout in seconds (default: 7200) ### Behavior 1. Parses arguments 2. Validates rmw_implementation and ros_domain_id match current environment 3. Creates XMLRPC server 4. Starts serving (blocks until timeout or shutdown) ### Throws - **AssertionError**: If rmw_implementation or ros_domain_id don't match environment ### Example Usage: ```bash # Called by daemon spawning process python -c "from ros2cli.daemon import main; main()" ``` ``` -------------------------------- ### Set ROS_DOMAIN_ID and Start Daemon Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Sets the ROS_DOMAIN_ID environment variable to 5 and starts the ROS 2 daemon. The daemon will listen on port 11516 (11511 + 5). ```bash export ROS_DOMAIN_ID=5 ros2 daemon start # Daemon listens on port 11516 (11511 + 5) ``` -------------------------------- ### serve Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon.md Starts serving XMLRPC requests for a given daemon server instance. It handles introspection methods and provides a shutdown mechanism. ```APIDOC ## Function: serve ### Description Serve XMLRPC requests from a daemon server until timeout or shutdown. ### Parameters #### Path Parameters - **server** (LocalXMLRPCServer) - Required - Configured XMLRPC server instance - **timeout** (int) - Optional - Inactivity timeout in seconds (2 hours default) ### Behavior 1. Creates a NetworkAwareNode for introspection 2. Registers all node graph inspection methods as XMLRPC functions 3. Sets up timeout handler (checks inactivity every 0.2s) 4. Registers `system.shutdown()` for remote shutdown 5. Handles requests in a loop until timeout or shutdown signal 6. Catches KeyboardInterrupt gracefully ### Registered Methods (XMLRPC callable from clients) - `get_name()` - `get_namespace()` - `get_node_names_and_namespaces()` - `get_node_names_and_namespaces_with_enclaves()` - `get_topic_names_and_types()` - `get_service_names_and_types()` - `get_action_names_and_types()` - `get_publisher_names_and_types_by_node()` - `get_publishers_info_by_topic()` - `get_subscriber_names_and_types_by_node()` - `get_subscriptions_info_by_topic()` - `get_service_names_and_types_by_node()` - `get_servers_info_by_service()` - `get_clients_info_by_service()` - `get_client_names_and_types_by_node()` - `get_action_server_names_and_types_by_node()` - `get_action_client_names_and_types_by_node()` - `count_publishers()` - `count_subscribers()` - `count_clients()` - `count_services()` - `system.shutdown()` - shut down the daemon ### Example Usage ```python from ros2cli.daemon import make_xmlrpc_server, serve with make_xmlrpc_server() as server: serve(server, timeout=300) # 5 minute timeout ``` ``` -------------------------------- ### get_all_entry_points Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/entry_points.md Retrieves all entry points registered across all installed ROS distributions. It provides a comprehensive view of available extensions. ```APIDOC ## Function: get_all_entry_points ### Description Get all entry points registered across all installed distributions. ### Signature ```python def get_all_entry_points() ``` ### Return Type `dict[str, dict[str, tuple[Distribution, EntryPoint]]]` Two-level mapping: group_name → name → (distribution, entry_point). ### Example Usage ```python from ros2cli.entry_points import get_all_entry_points all_eps = get_all_entry_points() for group_name, entries in all_eps.items(): print(f'{group_name}:') for name, (dist, ep) in entries.items(): print(f' {name} from {dist.name}') ``` ``` -------------------------------- ### List Extensions with Verbose Output Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md Use the --verbose flag to get detailed information about each extension, including its module name, attributes, and distribution. ```bash ros2 extensions --verbose ``` -------------------------------- ### QoS Durability Warning Example Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/errors.md This message indicates that some publishers are offering TRANSIENT_LOCAL QoS, but to ensure connection to all publishers, the system falls back to VOLATILE. ```text Some, but not all, publishers are offering QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to QoSDurabilityPolicy.VOLATILE as it will connect to all publishers ``` -------------------------------- ### QoS Reliability Warning Example Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/errors.md This message indicates that some publishers are offering RELIABLE QoS, but to ensure connection to all publishers, the system falls back to BEST_EFFORT. ```text Some, but not all, publishers are offering QoSReliabilityPolicy.RELIABLE. Falling back to QoSReliabilityPolicy.BEST_EFFORT as it will connect to all publishers ``` -------------------------------- ### Example Usage of NetworkAwareNode Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Illustrates how NetworkAwareNode automatically handles network interface changes by resetting the underlying node. Used for maintaining connectivity. ```python from ros2cli.node.network_aware import NetworkAwareNode import argparse args = argparse.Namespace(spin_time=0.5) with NetworkAwareNode(args) as node: # Node automatically resets if network changes topics = node.get_topic_names_and_types() ``` -------------------------------- ### Get Entry Points by Group Name Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/entry_points.md Retrieves entry points for a specified group without loading them. Useful for inspecting available extensions. ```python from ros2cli.entry_points import get_entry_points olds = get_entry_points('ros2cli.command') for name, ep in eps.items(): print(f'{name}: {ep.value}') ``` -------------------------------- ### Create a Custom ROS 2 CLI Command Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Example of how to create a custom command extension for the ROS 2 CLI. This involves subclassing `CommandExtension` and defining how subcommands (verbs) are handled. ```python from ros2cli.command import CommandExtension, add_subparsers_on_demand class MyCommand(CommandExtension): def add_arguments(self, parser, cli_name): add_subparsers_on_demand(parser, cli_name, '_verb', 'mycommand.verb') def main(self, *, parser, args): verb = getattr(args, '_verb', None) if verb is None: parser.print_help() return 0 return verb.main(args=args) ``` -------------------------------- ### Node Graph Access with NodeStrategy Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/INDEX.md Demonstrates how to access the ROS 2 graph using NodeStrategy. It shows how to get topic names and types within a context manager. ```python from ros2cli.node.strategy import NodeStrategy with NodeStrategy(args) as strategy: topics = strategy.get_topic_names_and_types() ``` -------------------------------- ### Interactive Fuzzy Selection with fzf Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/helpers.md Launches an interactive fuzzy selection interface using 'fzf'. Requires a TTY and 'fzf' to be installed. Returns the selected item or None if cancelled. ```Python from ros2cli.helpers import interactive_select options = ['option1', 'option2', 'option3'] selection = interactive_select(options, 'Pick one: ') if selection: print(f'You selected: {selection}') ``` -------------------------------- ### Create a Custom ROS 2 CLI Verb Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Example demonstrating how to create a custom verb extension for a ROS 2 CLI command. This involves subclassing `VerbExtension` and defining its arguments and main logic. ```python from ros2cli.verb import VerbExtension class MyVerb(VerbExtension): def add_arguments(self, parser, cli_name): parser.add_argument('--verbose', action='store_true') def main(self, *, args): # Access node graph through args.node if provided return 0 ``` -------------------------------- ### Serve Daemon XMLRPC Requests Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon.md Starts serving XMLRPC requests from a daemon server. The server will continue until the specified timeout is reached or a shutdown signal is received. It gracefully handles KeyboardInterrupt. ```python from ros2cli.daemon import make_xmlrpc_server, serve with make_xmlrpc_server() as server: serve(server, timeout=300) # 5 minute timeout ``` -------------------------------- ### Customizing ros2cli.cli.main with a Custom Description Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/cli.md Illustrates how to provide a custom description that will be displayed in the CLI's help messages. ```python from ros2cli.cli import main # With custom description result = main( script_name='ros2', description='Custom ROS 2 CLI tool' ) ``` -------------------------------- ### Create QoS Profile from Short Keys and Overrides Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md Generate a new QoSProfile based on a preset name, applying overrides for reliability, depth, and other settings using short string keys. Returns a new QoSProfile object. ```python from ros2cli.qos import qos_profile_from_short_keys # Create from preset with overrides profile = qos_profile_from_short_keys( 'default', reliability='reliable', depth=20 ) ``` -------------------------------- ### Get ROS Domain ID Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/helpers.md Retrieves the ROS domain ID from the environment. Defaults to 0 if not set. ```python from ros2cli.helpers import get_ros_domain_id domain_id = get_ros_domain_id() print(f'ROS domain: {domain_id}') ``` -------------------------------- ### DaemonNode Context Manager Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Manages the XMLRPC connection lifecycle for the DaemonNode, ensuring proper setup and teardown. ```python def __enter__(self) -> DaemonNode: ... def __exit__(self, exc_type, exc_value, traceback): ... ``` -------------------------------- ### ros2 extensions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md Lists all installed extensions within the ROS 2 system, including commands, verbs, and other components. ```APIDOC ## ros2 extensions ### Description Built-in command to list all installed extensions (commands, verbs, etc.). ### Method Not applicable (CLI command) ### Endpoint Not applicable (CLI command) ### Parameters #### Query Parameters - **--all, -a** (flag) - Optional - Show extensions that failed to load or instantiate (prefixed with '- ') - **--verbose, -v** (flag) - Optional - Show detailed info (module, attributes, distribution) ### Behavior Lists all installed extensions, with options for showing failed loads and verbose details. ### Example Usage: ```bash ros2 extensions ros2 extensions --verbose ros2 extensions -a ``` ### Response Not applicable (CLI command execution) ### Error Handling: Not explicitly documented. ``` -------------------------------- ### Initialize and Use NodeStrategy Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Demonstrates how to initialize the NodeStrategy with specific arguments and use it as a context manager to retrieve node names and namespaces. This is useful for interacting with the ROS 2 graph. ```python from ros2cli.node.strategy import NodeStrategy import argparse args = argparse.Namespace(no_daemon=False, spin_time=0.5) with NodeStrategy(args, node_name='my_node') as strategy: nodes = strategy.get_node_names_and_namespaces() for name, ns in nodes: print(f'{ns}/{name}') ``` -------------------------------- ### Basic Usage of ros2cli.cli.main Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/cli.md Demonstrates the simplest way to invoke the main function with default arguments. This will parse `sys.argv[1:]` and execute the default ROS 2 CLI behavior. ```python from ros2cli.cli import main # Basic usage with default arguments result = main() ``` -------------------------------- ### ros2 daemon Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md Manages the ROS 2 daemon, which is responsible for background services. It allows starting, stopping, and checking the status of the daemon. ```APIDOC ## ros2 daemon ### Description Built-in command for daemon management (start, stop, status). ### Method Not applicable (CLI command) ### Endpoint Not applicable (CLI command) ### Parameters This command uses subparsers for its operations. See example usage. ### Behavior Calls `add_subparsers_on_demand()` to load daemon verb extensions from `ros2cli.daemon.verb` entry point group. ### Example Usage: ```bash ros2 daemon start ros2 daemon stop ros2 daemon status ros2 daemon --help ``` ### Response Not applicable (CLI command execution) ### Error Handling: Not explicitly documented, but typically returns non-zero exit codes for errors. ``` -------------------------------- ### instantiate_extensions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/plugin_system.md Loads and instantiates all extensions within a specified entry point group. It handles caching of instances and logs any errors encountered during the instantiation process. ```APIDOC ## instantiate_extensions ### Description Loads and instantiates all extensions in a group. Extensions that fail to instantiate are logged and excluded. ### Method `instantiate_extensions(group_name, *, exclude_names=None, unique_instance=False)` ### Parameters #### Path Parameters - **group_name** (str) - Required - Entry point group name - **exclude_names** (set[str] | None) - Optional - Names to exclude from instantiation - **unique_instance** (bool) - Optional - If True, create new instance each call (don't cache) ### Return Type `dict[str, object]` Mapping of extension names to instantiated extension objects. ### Example Usage ```python from ros2cli.plugin_system import instantiate_extensions extensions = instantiate_extensions('ros2cli.command') for name, ext in extensions.items(): print(f'{name}: {ext}') ``` ``` -------------------------------- ### Get RMW Additional Environment Variables Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/helpers.md Retrieves specific environment variables required for RMW implementations. For 'rmw_zenoh_cpp', it includes 'RUST_LOG'. ```Python from ros2cli.helpers import get_rmw_additional_env env = get_rmw_additional_env('rmw_zenoh_cpp') # {'RMW_IMPLEMENTATION': 'rmw_zenoh_cpp', 'RUST_LOG': 'z=error'} ``` -------------------------------- ### Get Daemon XMLRPC Server Address Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/daemon.md Retrieves the host and port for the daemon's XMLRPC server. The host is always '127.0.0.1'. ```python from ros2cli.daemon import get_address host, port = get_address() print(f'Daemon at {host}:{port}') ``` -------------------------------- ### Get Local IP Addresses Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/xmlrpc.md Retrieve a list of all local IPv4 addresses configured on the machine. This is useful for network configuration and diagnostics. ```python from ros2cli.xmlrpc.local_server import get_local_ipaddrs addrs = get_local_ipaddrs() for addr in addrs: print(f'Local address: {addr}') ``` -------------------------------- ### qos_profile_from_short_keys Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md Creates a new QoSProfile from a preset name and applies optional overrides for reliability, durability, depth, history, liveliness, and liveliness lease duration using short key strings. ```APIDOC ## Function: qos_profile_from_short_keys ### Description Create a QoSProfile from a preset name and optional overrides. ### Signature ```python def qos_profile_from_short_keys( preset_profile: str, reliability: Optional[str] = None, durability: Optional[str] = None, depth: Optional[int] = None, history: Optional[str] = None, liveliness: Optional[str] = None, liveliness_lease_duration_s: Optional[float] = None, ) -> rclpy.qos.QoSProfile ``` ### Parameters #### Path Parameters - **preset_profile** (str) - Required - Preset name ('default', 'sensor_data', 'parameters', etc.) - **reliability** (str | None) - Optional - Override: 'reliable' or 'best_effort' - **durability** (str | None) - Optional - Override: 'volatile' or 'transient_local' - **depth** (int | None) - Optional - Override: queue depth - **history** (str | None) - Optional - Override: 'keep_last' or 'keep_all' - **liveliness** (str | None) - Optional - Override: 'automatic', 'manual_by_topic', 'manual_by_node' - **liveliness_lease_duration_s** (float | None) - Optional - Override: lease duration in seconds ### Return Type `rclpy.qos.QoSProfile` New QoSProfile with settings from preset and applied overrides. ### Example Usage ```python from ros2cli.qos import qos_profile_from_short_keys # Create from preset with overrides profile = qos_profile_from_short_keys( 'default', reliability='reliable', depth=20 ) ``` ``` -------------------------------- ### ros2cli.cli.main Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/cli.md The main function serves as the entry point for the ROS 2 CLI. It initializes the argument parser, loads command extensions, and executes the selected command. ```APIDOC ## Function: main ### Description This function is the primary entry point for the ROS 2 CLI. It sets up the argument parsing, handles command extensions, and executes the user-specified command. ### Signature ```python def main(*, script_name='ros2', argv=None, description=None, extension=None) ``` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters - **script_name** (str) - Optional - Name of the command line tool, used in help messages and parsing. Defaults to 'ros2'. - **argv** (list[str] | None) - Optional - Command line arguments to parse. If None, uses sys.argv[1:]. Defaults to None. - **description** (str | None) - Optional - Description shown in help text. Auto-generated if None. Defaults to None. - **extension** (CommandExtension | None) - Optional - Pre-instantiated command extension to use. If None, loads from entry points. Defaults to None. ### Return Type `int` or `str` ### Behavior - Creates top-level ArgumentParser with subparser support for commands - Adds `--use-python-default-buffering` flag to control stdout buffering behavior - Configures line buffering for stdout by default (unless flag is set) - Loads command extensions on-demand from `ros2cli.command` entry point group - Hides special commands ('extension_points', 'extensions') from help output - Integrates argcomplete for shell completion if available - Calls `extension.main()` on the selected command - Handles KeyboardInterrupt (returns signal.SIGINT), ExternalShutdownException (returns signal.SIGTERM), and RuntimeError ### Throws - **SystemExit**: When parsing fails or help is requested. ### Example Usage ```python from ros2cli.cli import main # Basic usage with default arguments result = main() # Custom script name result = main(script_name='myros2tool', argv=['topic', 'list']) # With custom description result = main( script_name='ros2', description='Custom ROS 2 CLI tool' ) ``` ``` -------------------------------- ### Entry Points Flow Return Types Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/types.md Illustrates the return types for get_entry_points(), load_entry_points(), and instantiate_extensions() functions, showing data flow for extension loading. ```python dict[str, importlib.metadata.EntryPoint] # get_entry_points() dict[str, type] # load_entry_points() dict[str, Any] # instantiate_extensions() ``` -------------------------------- ### Configure QoS Profile with Short Keys Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md Modify an existing QoSProfile in-place using short string keys for reliability, durability, depth, history, and liveliness settings. Ignores None values and handles special cases for transient local durability. ```python from ros2cli.qos import profile_configure_short_keys from rclpy.qos import QoSPresetProfiles profile = QoSPresetProfiles.get_from_short_key('default') profile_configure_short_keys( profile, reliability='reliable', durability='transient_local', depth=10 ) ``` -------------------------------- ### Configure Daemon Timeout Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Starts the ROS 2 daemon with a custom timeout of 300 seconds (5 minutes). The daemon will shut down after this period of inactivity. ```bash python -m ros2cli.daemon --ros-domain-id 0 --rmw-implementation rmw_fastrtps_cpp --timeout 300 # Daemon will shut down after 5 minutes of inactivity ``` -------------------------------- ### VerbExtension Class (Daemon) Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/verb.md Specific extension point for daemon verb extensions, used for verbs related to daemon management (start, stop, status). ```APIDOC ## Class: VerbExtension (Daemon) **Module:** `ros2cli.verb.daemon` **Signature:** ```python class VerbExtension: ``` **Description:** Extension point for daemon verb extensions. Specific to daemon-related verbs like start, stop, status. **Class Attributes:** | Attribute | Type | Default | Description | |-----------|------|---------|-------------| | NAME | str | None | Entry point name (set automatically) | | EXTENSION_POINT_VERSION | str | '0.1' | Plugin system version supported | **Constructor:** ```python def __init__(self): ``` Validates version compatibility. **Methods:** ### add_arguments ```python def add_arguments(self, parser, cli_name): pass ``` Optional method to add verb-specific arguments. **Parameters:** | Parameter | Type | Description | |-----------|------|-------------| | parser | argparse.ArgumentParser | Parser to add arguments to | | cli_name | str | CLI command name (e.g., 'ros2 daemon start') | ### main ```python def main(self, *, args): raise NotImplementedError() ``` Required method to execute the verb. **Parameters:** | Parameter | Type | Description | |-----------|------|-------------| | args | argparse.Namespace | Parsed arguments | **Return Type:** `int` Exit code (0 for success). ``` -------------------------------- ### Create and Use LocalXMLRPCServer Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/xmlrpc.md Instantiate a LocalXMLRPCServer restricted to local connections, register a method, and handle a single request. Ensure the server is closed after use. ```python from ros2cli.xmlrpc.local_server import LocalXMLRPCServer server = LocalXMLRPCServer(('127.0.0.1', 11511)) def my_method(x): return x * 2 server.register_function(my_method) server.handle_request() # Handle one request server.server_close() ``` -------------------------------- ### Daemon StartVerb Args Class Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/types.md Defines arguments specific to the Daemon StartVerb, including a debug flag. ```python class DaemonStartArgs: debug: bool = False ``` -------------------------------- ### Disable Plugin System Logger Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Example of how to disable logs from the 'ros2cli.plugin_system' logger by setting its level to CRITICAL. This is useful for reducing noise during plugin loading. ```python import logging logging.getLogger('ros2cli.plugin_system').setLevel(logging.CRITICAL) ``` -------------------------------- ### QoS Utilities Functions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Helper functions for managing Quality of Service (QoS) configurations, including adding QoS arguments, choosing QoS profiles, and creating QoS profiles from short keys. ```python def add_qos_arguments(parser, entity_type, default_profile='default'): ... def choose_qos(node, topic_name, qos_args) -> QoSProfile: ... def qos_profile_from_short_keys(preset_profile, **overrides) -> QoSProfile: ... ``` -------------------------------- ### List Extensions with Verbose Output and Including Failed Loads Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md Combines the --verbose and -a flags to show detailed information for all extensions, including those that failed to load. ```bash ros2 extensions -a --verbose ``` -------------------------------- ### Load Entry Point Classes Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/entry_points.md Loads and returns entry point classes for a given group, optionally excluding specific names. Useful for dynamically loading command classes. ```python from ros2cli.entry_points import load_entry_points commands = load_entry_points('ros2cli.command', exclude_names={'internal'}) for name, cmd_class in commands.items(): print(f'{name}: {cmd_class.__name__}') ``` -------------------------------- ### Instantiate Extensions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/plugin_system.md Loads and instantiates all extensions within a specified group. Caches instances by default, but can be configured to create a new instance for each call. Logs warnings for instantiation failures. ```python from ros2cli.plugin_system import instantiate_extensions extensions = instantiate_extensions('ros2cli.command') for name, ext in extensions.items(): print(f'{name}: {ext}') ``` -------------------------------- ### List Extensions Including Failed Loads Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md The -a or --all flag includes extensions that failed to load, displaying the command name and the reason for the failure (e.g., ImportError message). ```bash ros2 extensions -a # Include failed loads ``` -------------------------------- ### Choose QoS Profile for Subscription Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md This function selects an appropriate QoS profile for a subscription. It prioritizes explicit command-line overrides and falls back to analyzing existing publishers on the topic or a default preset profile. ```python from ros2cli.qos import choose_qos, add_qos_arguments import argparse import rclpy rclpy.init() node = rclpy.create_node('chooser') parser = argparse.ArgumentParser() add_qos_arguments(parser, 'subscribe') args = parser.parse_args([]) profile = choose_qos(node, '/my_topic', args) print(f'Chosen reliability: {profile.reliability}') node.destroy_node() rclpy.shutdown() ``` -------------------------------- ### Customizing ros2cli.cli.main with Script Name and Arguments Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/cli.md Shows how to specify a custom script name for help messages and provide a specific list of arguments to be parsed, overriding `sys.argv`. ```python from ros2cli.cli import main # Custom script name result = main(script_name='myros2tool', argv=['topic', 'list']) ``` -------------------------------- ### Get Daemon Server Address Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/configuration.md Demonstrates how to retrieve the host and port for the ROS 2 daemon server using the get_address function. The port is calculated based on the ROS_DOMAIN_ID environment variable. ```python from ros2cli.daemon import get_address host, port = get_address() # Returns ('127.0.0.1', 11511 + int(os.environ.get('ROS_DOMAIN_ID', 0))) ``` -------------------------------- ### Create ServerProxy Client Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/xmlrpc.md Establish an XMLRPC client proxy to a remote server and call a method, such as 'system.listMethods', to introspect the available services. ```python from ros2cli.xmlrpc.client import ServerProxy proxy = ServerProxy('http://127.0.0.1:11511/ros2cli/') result = proxy.system.listMethods() ``` -------------------------------- ### List ROS 2 Extension Points Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/commands.md This command lists all available extension points in the ROS 2 system. Use the '--verbose' flag for detailed information and the '--all' flag to include extension points that failed to import. ```bash ros2 extension_points ros2 extension_points --verbose ros2 extension_points -a # Include failed loads ``` -------------------------------- ### choose_qos Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md Selects an appropriate QoS profile based on explicit command-line arguments or by analyzing existing publishers on a topic. This function is crucial for ensuring compatible QoS settings between ROS 2 nodes. ```APIDOC ## Function: choose_qos ### Description Selects an appropriate QoS profile based on explicit arguments or topic publishers. It prioritizes user-specified overrides and falls back to analyzing network conditions if necessary. ### Signature ```python def choose_qos(node, topic_name: str, qos_args) ``` ### Parameters * **node** (rclpy.node.Node) - Required - ROS 2 node for discovery. * **topic_name** (str) - Required - Topic name to check publishers for. * **qos_args** (argparse.Namespace) - Required - Parsed QoS arguments from `add_qos_arguments()`. ### Return Type `rclpy.qos.QoSProfile` - Chosen QoS profile. ### Behavior 1. **Override Check**: If any QoS override arguments (e.g., `qos_reliability`, `qos_durability`) are provided in `qos_args`, it uses `qos_profile_from_short_keys()` with these overrides and returns immediately. 2. **Publisher Analysis**: If no overrides are specified, it analyzes existing publishers on the `topic_name`: - Discovers all publishers for the topic. - Counts publishers with 'RELIABLE' reliability and 'TRANSIENT_LOCAL' durability. - If all publishers share the same reliability and durability settings, it matches those settings. - If publishers have mixed settings: - For reliability, it defaults to 'BEST_EFFORT' (compatible with all). - For durability, it defaults to 'VOLATILE' (compatible with all). - A warning message is printed to stdout indicating the compatibility choices. 3. **Fallback**: If no publishers are found on the topic, it falls back to using a preset QoS profile. ### Example Usage ```python from ros2cli.qos import choose_qos, add_qos_arguments import argparse import rclpy rclpy.init() node = rclpy.create_node('chooser') parser = argparse.ArgumentParser() add_qos_arguments(parser, 'subscribe') args = parser.parse_args([]) profile = choose_qos(node, '/my_topic', args) print(f'Chosen reliability: {profile.reliability}') node.destroy_node() rclpy.shutdown() ``` ``` -------------------------------- ### profile_configure_short_keys Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/qos.md Configures an existing QoSProfile in-place by overriding specific settings using short key strings for reliability, durability, depth, history, liveliness, and liveliness lease duration. ```APIDOC ## Function: profile_configure_short_keys ### Description Configures a QoSProfile by overriding specific settings using short key strings. ### Signature ```python def profile_configure_short_keys( profile: rclpy.qos.QoSProfile = None, reliability: Optional[str] = None, durability: Optional[str] = None, depth: Optional[int] = None, history: Optional[str] = None, liveliness: Optional[str] = None, liveliness_lease_duration_s: Optional[int] = None, ) -> None ``` ### Parameters #### Path Parameters - **profile** (rclpy.qos.QoSProfile) - Optional - Profile to modify in-place - **reliability** (str | None) - Optional - 'reliable' or 'best_effort' (short keys) - **durability** (str | None) - Optional - 'volatile' or 'transient_local' (short keys) - **depth** (int | None) - Optional - Queue depth (must be >=0 if set) - **history** (str | None) - Optional - 'keep_last' or 'keep_all' (short keys) - **liveliness** (str | None) - Optional - 'automatic', 'manual_by_topic', 'manual_by_node' (short keys) - **liveliness_lease_duration_s** (int | None) - Optional - Lease duration in seconds (must be >=0 if set) ### Behavior - Modifies profile in-place - Uses `get_from_short_key()` to convert string keys to enum values - Special case: if durability is TRANSIENT_LOCAL and depth==0, sets depth to 1 - Ignores None values (no override applied) ### Example Usage ```python from ros2cli.qos import profile_configure_short_keys from rclpy.qos import QoSPresetProfiles profile = QoSPresetProfiles.get_from_short_key('default') profile_configure_short_keys( profile, reliability='reliable', durability='transient_local', depth=10 ) ``` ``` -------------------------------- ### Common QoS Args Class Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/types.md Defines common arguments for Quality of Service (QoS) profiles, including profile name, depth, history, reliability, durability, and liveliness settings. ```python class QoSArgs: qos_profile: str = 'default' qos_depth: int | None = None qos_history: str | None = None qos_reliability: str | None = None qos_durability: str | None = None qos_liveliness: str | None = None qos_liveliness_lease_duration_seconds: float | None = None ``` -------------------------------- ### ROS 2 CLI Helper Functions Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Utility functions for common ROS 2 CLI operations, such as getting the ROS domain ID, waiting for specific conditions, checking discovery configuration, and interactive selection. ```python def get_ros_domain_id() -> int def wait_for(predicate, timeout, period=0.1) -> bool def check_discovery_configuration(): ... def interactive_select(items, prompt='Select an item:') -> str | None def before_invocation(func, hook): ... ``` -------------------------------- ### Manage ROS 2 CLI Entry Points Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/README.md Functions for retrieving and loading entry points for ROS 2 CLI plugins. These are essential for discovering and activating available commands and extensions. ```python def get_entry_points(group_name) -> dict def load_entry_points(group_name, *, exclude_names=None) -> dict def get_all_entry_points() -> dict def get_first_line_doc(any_type) -> str ``` -------------------------------- ### Add Node Strategy Arguments to Parser Source: https://github.com/ros2/ros2cli/blob/rolling/_autodocs/api-reference/node_strategy.md Shows how to add command-line arguments for node strategy configuration, such as disabling the daemon or enabling simulation time, to an argument parser. ```python from ros2cli.node.strategy import add_arguments import argparse parser = argparse.ArgumentParser() add_arguments(parser) args = parser.parse_args(['--no-daemon', '--use-sim-time']) ```