### Serial Port Manager Example Setup Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb This snippet shows the initial imports and setup for using the SerialManager and Ch9329 classes, which are part of the serial port management utilities. ```python #!/usr/bin/env python3 """ Example usage of the SerialManager for Openterface Mini KVM """ import logging import time from serialPort.SerialManager import SerialManager from serialPort.Ch9329 import CMD_GET_INFO, CMD_GET_PARA_CFG ``` -------------------------------- ### Install Project Dependencies Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Install the project locally using pip. This command ensures all project dependencies are correctly set up for development. ```bash pip install -e . ``` -------------------------------- ### Device Configuration and Callback Setup Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Defines device VID/PID configurations and sets up a callback function to process device change events. The callback reports added, removed, and modified devices. ```python # Device configuration - Openterface VID/PID values HID_VID = "534D" HID_PID = "2109" Serial_port_VID = "1A86" Serial_port_PID = "7523" print(f"πŸ” Device Configuration:") print(f" Serial Port - VID:{Serial_port_VID} PID:{Serial_port_PID}") print(f" HID Device - VID:{HID_VID} PID:{HID_PID}") # Create cross-platform device manager and selector device_manager = DeviceFactory.create_device_manager(Serial_port_VID, Serial_port_PID, HID_VID, HID_PID) device_selector = DeviceSelector(device_manager) print(f"βœ… Device manager created for {DeviceFactory.get_current_platform()} platform") def format_device_brief(device): """Format device info briefly for display""" parts = [] if device.get('serial_port_path'): parts.append(f"Serial:{device['serial_port_path']}") if device.get('camera_path'): parts.append(f"Video") if device.get('audio_path'): parts.append(f"Audio") if device.get('HID_path'): parts.append(f"HID") return " | ".join(parts) if parts else "Unknown device" def device_change_callback(event_data): """Callback function called when device changes are detected""" print(f"\n🚨 === Device Change Detected at {event_data['timestamp']} ===") changes_from_last = event_data['changes_from_last'] changes_from_initial = event_data['changes_from_initial'] # Report changes from last scan if changes_from_last['added_devices']: print(f"πŸ“± NEW DEVICES CONNECTED ({len(changes_from_last['added_devices'])}):") for device in changes_from_last['added_devices']: print(f" βž• {format_device_brief(device)}") if changes_from_last['removed_devices']: print(f"πŸ”ŒDEVICES DISCONNECTED ({len(changes_from_last['removed_devices'])}):") for device in changes_from_last['removed_devices']: print(f" βž– {format_device_brief(device)}") if changes_from_last['modified_devices']: print(f"πŸ”„DEVICES MODIFIED ({len(changes_from_last['modified_devices'])}):") for change in changes_from_last['modified_devices']: print(f" πŸ”€ {format_device_brief(change['new'])}") # Report overall state compared to initial current_count = len(event_data['current_devices']) initial_count = len(event_data['initial_snapshot'].devices) print(f"πŸ“Š Total devices: {current_count} (was {initial_count} initially)") if changes_from_initial['added_devices']: print(f"πŸ“ˆ Net new since start: {len(changes_from_initial['added_devices'])}") if changes_from_initial['removed_devices']: print(f"πŸ“‰ Net removed since start: {len(changes_from_initial['removed_devices'])}") print("=" * 60) print("βœ… Configuration and callback function defined!") ``` -------------------------------- ### Install Openterface Python Source: https://github.com/techxartisanstudio/openterface_python/blob/main/README.md Install the Openterface Python package and its dependencies. Ensure you have Python 3.7+. ```bash pip install -r requirements.txt pip install -e . ``` -------------------------------- ### Start Video Stream on Windows Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Initiates a video stream using FFmpeg on a Windows system. It requires device paths for camera and audio, along with a video URL for streaming. ```python # steam example from device import DeviceGroupsWin from device import VideoFFmpeg HID_VID = "534D" HID_PID = "2109" Serial_port_VID = "1A86" Serial_port_PID = "7523" device_info_list = DeviceGroupsWin.search_phycial_device(Serial_port_VID, Serial_port_PID, HID_VID, HID_PID) video_path = device_info_list[0]['camera_path'] audio_path = device_info_list[0]['audio_path'] video_url = "rtp://192.168.100.66:5000" VideoFFmpeg.start_stream(video_path, audio_path, video_url, "windows") ``` -------------------------------- ### Start Hot-Plugging Monitoring Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Initializes and starts a hot-plugging monitor for USB devices. This function requires device IDs and sets up a callback to detect device additions and removals. ```python # Import the hot-plugging detection functionality from device import DeviceGroupsWin import time from datetime import datetime # Device IDs for Openterface HID_VID = "534D" HID_PID = "2109" Serial_port_VID = "1A86" Serial_port_PID = "7523" def hotplug_callback(event_data): """Callback function for device changes""" print(f"πŸ”„ Device change detected at {event_data['timestamp']}") changes = event_data['changes_from_last'] if changes['added_devices']: print(f" βž• Added: {len(changes['added_devices'])} devices") if changes['removed_devices']: print(f" βž– Removed: {len(changes['removed_devices'])} devices") current_count = len(event_data['current_devices']) print(f" πŸ“Š Total devices now: {current_count}") # Create hotplug monitor monitor = DeviceGroupsWin.HotplugMonitor( serial_vid=Serial_port_VID, serial_pid=Serial_port_PID, hid_vid=HID_VID, hid_pid=HID_PID, poll_interval=3.0 # Check every 3 seconds ) # Add callback and start monitoring monitor.add_callback(hotplug_callback) monitor.start_monitoring() print("🟒 Hot-plug monitoring started!") print("πŸ’‘ Connect or disconnect Openterface devices to see real-time detection") print("⏱️ Initial device scan completed") ``` -------------------------------- ### Start Hotplug Monitoring Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Starts the hotplug monitoring process and includes a short delay to allow the initial device scan to complete. ```python # Start monitoring and capture initial device state print("πŸš€ Starting hotplug monitoring...") # Start the monitoring process monitor.start_monitoring() # Wait a moment for initial scan to complete time.sleep(1) ``` -------------------------------- ### Get Current Device State Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Retrieves and prints the current state of connected devices, including the initial state captured when monitoring started. This helps in understanding the device inventory. ```python # Check current device state current_state = monitor.get_current_state() initial_state = monitor.get_initial_state() if initial_state: print(f"πŸ“‹ Initial State (captured at {initial_state['timestamp']}):") print(f" Device count: {initial_state['device_count']}") for i, device in enumerate(initial_state['devices'], 1): print(f" Device {i}:") if device.get('serial_port_path'): print(f" Serial Port: {device['serial_port_path']}") if device.get('camera_path'): print(f" Camera: Available") if device.get('audio_path'): print(f" Audio: Available") if device.get('HID_path'): print(f" HID: Available") else: print("⚠️ No devices detected in initial scan") print(f"\nπŸ’‘ To stop monitoring, run: monitor.stop_monitoring()") ``` -------------------------------- ### Start Video Streaming in Jupyter Notebook Source: https://github.com/techxartisanstudio/openterface_python/blob/main/README.md Initiates video and audio streaming from the Openterface MiniKVM device. This snippet requires device information and specifies the video stream URL. ```python from device import DeviceGroupsWin from device import VideoFFmpeg HID_VID = "534D" HID_PID = "2109" Serial_port_VID = "1A86" Serial_port_PID = "7523" device_info_list = DeviceGroupsWin.search_phycial_device(Serial_port_VID, Serial_port_PID, HID_VID, HID_PID) video_path = device_info_list[0]['camera_path'] audio_path = device_info_list[0]['audio_path'] video_url = "rtp://192.168.100.66:5000" VideoFFmpeg.start_stream(video_path, audio_path, video_url, "windows") ``` -------------------------------- ### Display Initial Device State Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Captures and prints the initial state of connected Openterface devices upon starting the monitor. It shows the number of devices found and their basic details like serial port and camera paths. ```python # Display initial device state initial_state = monitor.get_initial_state() if initial_state: print(f"\nπŸ“‹ Initial Device State (captured at {initial_state['timestamp']}):") print(f" Found {initial_state['device_count']} device(s)") for i, device in enumerate(initial_state['devices'], 1): print(f" {i}. {format_device_brief(device)}") print(f" Serial Port: {device.get('serial_port_path', 'N/A')}") print(f" HID Path: {device.get('HID_path', 'N/A')}") print(f" Camera: {device.get('camera_path', 'N/A')}") print(f" Audio: {device.get('audio_path', 'N/A')}") print() else: print("⚠️ No initial devices found matching the specified VID/PID") print("🟒 Monitoring is now active!") print("πŸ’‘ Connect or disconnect Openterface devices to see changes in real-time") ``` -------------------------------- ### Configure Logging and Serial Manager with Callbacks Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Sets up basic logging and a SerialManager instance with event and data-ready callbacks. This is useful for monitoring serial communication and handling incoming data. ```python import logging import time from serialPort.SerialManager import SerialManager # Configure logging logging.basicConfig( level=logging.DEBUG, # Changed to DEBUG for more detailed output format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def event_callback(event_type, data): """Callback function for serial port events""" print(f"Event: {event_type}, Data: {data}") def data_ready_callback(data): """Callback function for received data""" print(f"Received data: {data.hex(' ')}") def main(): # Create SerialManager instance serial_manager = SerialManager() # Set callbacks serial_manager.set_event_callback(event_callback) serial_manager.set_data_ready_callback(data_ready_callback) # Specify the serial port path (modify this for your system) # Windows: "COM3", "COM4", etc. # Linux/Mac: "/dev/ttyUSB0", "/dev/ttyACM0", etc. port_path = "COM7" # Change this to your actual port print(f"Attempting to connect to {port_path}...") # Connect to the device if serial_manager.connect(port_path): print(f"Successfully connected to: {serial_manager.get_port_name()}") try: # Keep the program running and periodically check device status while True: if serial_manager.is_ready(): print(f"Device Status:") print(f" Num Lock: {serial_manager.get_num_lock_state()}") print(f" Caps Lock: {serial_manager.get_caps_lock_state()}") print(f" Scroll Lock: {serial_manager.get_scroll_lock_state()}") # Send info command to refresh status serial_manager.send_async_command(CMD_GET_INFO) # CMD_GET_INFO is not defined in the snippet, assuming it's a constant time.sleep(5) else: print("Device connection lost!") break except KeyboardInterrupt: print("\nShutting down...") finally: serial_manager.disconnect() else: print(f"Failed to connect to {port_path}") print("Please check:") print("1. The device is connected") print("2. The port path is correct") print("3. No other application is using the port") if __name__ == "__main__": main() ``` -------------------------------- ### Run the Main Openterface Application Source: https://github.com/techxartisanstudio/openterface_python/blob/main/README.md Executes the primary Openterface Python application. This is the entry point for running the KVM server. ```bash python src/main.py ``` -------------------------------- ### Create and Configure Hotplug Monitor Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Initializes a cross-platform hotplug monitor with specified device identifiers and a polling interval. The monitor is then configured to use the defined device change callback. ```python # Create and configure the cross-platform hotplug monitor print("πŸ”§ Creating Cross-Platform Hotplug Monitor...") monitor = DeviceFactory.create_hotplug_monitor( serial_vid=Serial_port_VID, serial_pid=Serial_port_PID, hid_vid=HID_VID, hid_pid=HID_PID, poll_interval=2.0 # Check every 2 seconds ) # Add our callback function monitor.add_callback(device_change_callback) print("βœ… Cross-platform hotplug monitor created and configured!") print(f"⏰ Polling interval: {monitor.poll_interval} seconds") print(f"πŸ–₯️ Platform: {DeviceFactory.get_current_platform()}") ``` -------------------------------- ### Compare Device Snapshots Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Takes two snapshots of connected devices with a time delay in between, then compares them to identify added, removed, or modified devices. This is useful for manual change detection. ```python # Manual snapshot comparison example print("πŸ“Έ Taking manual snapshots for comparison...") # Take first snapshot snapshot1 = DeviceGroupsWin.DeviceSnapshot(Serial_port_VID, Serial_port_PID, HID_VID, HID_PID) print(f" Snapshot 1 taken at {snapshot1.timestamp} - Found {len(snapshot1.devices)} devices") # Wait a moment (you can connect/disconnect devices during this time) print("⏳ Waiting 10 seconds... (connect/disconnect devices now)") time.sleep(10) # Take second snapshot snapshot2 = DeviceGroupsWin.DeviceSnapshot(Serial_port_VID, Serial_port_PID, HID_VID, HID_PID) print(f" Snapshot 2 taken at {snapshot2.timestamp} - Found {len(snapshot2.devices)} devices") # Compare snapshots changes = snapshot2.compare_with(snapshot1) print(f"\nπŸ” Comparison Results:") print(f" Added devices: {len(changes['added_devices'])}") print(f" Removed devices: {len(changes['removed_devices'])}") print(f" Modified devices: {len(changes['modified_devices'])}") if changes['added_devices']: print(" βž• New devices:") for device in changes['added_devices']: print(f" - Serial: {device.get('serial_port_path', 'N/A')}") if changes['removed_devices']: print(" βž– Removed devices:") for device in changes['removed_devices']: print(f" - Serial: {device.get('serial_port_path', 'N/A')}") ``` -------------------------------- ### Check Current Monitoring Status and Device Selection Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Retrieves and displays the current monitoring status, comparing it with the initial state to highlight any added or removed devices. It also lists available port chains for device selection. ```python # Check current monitoring status and device selection by port chain # Run this cell to see the current device state at any time current_state = monitor.get_current_state() initial_state = monitor.get_initial_state() if current_state and initial_state: print(f"πŸ“Š Current Monitoring Status:") print(f" Initial devices: {initial_state['device_count']} (at {initial_state['timestamp']})") print(f" Current devices: {current_state['device_count']} (at {current_state['timestamp']})") # Compare current with initial current_snapshot = monitor.device_manager.create_snapshot() changes = current_snapshot.compare_with(monitor.initial_snapshot) if changes['added_devices'] or changes['removed_devices']: print(f"\nπŸ”„ Changes since start:") if changes['added_devices']: print(f" βž• Added: {len(changes['added_devices'])} devices") if changes['removed_devices']: print(f" βž– Removed: {len(changes['removed_devices'])} devices") else: print(f"\nβœ… No changes since monitoring started") print(f"\nπŸƒβ€β™‚οΈ Monitor is {'running' if monitor.running else 'stopped'}") # Show available port chains for device selection print(f"\nπŸ”— Available Port Chains:") port_chains = device_selector.device_manager.list_available_port_chains() for i, port_chain in enumerate(port_chains, 1): devices = device_selector.device_manager.get_devices_by_port_chain(port_chain) print(f" {i}. {port_chain} ({len(devices)} device(s))") for device in devices: print(f" - {device}") else: print("⚠️ Monitor not initialized or no data available") ``` -------------------------------- ### Connect and Simulate Keyboard Commands Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Connects to a serial port, automatically reconfiguring baudrate if necessary, and then simulates various keyboard commands like opening a terminal, typing text, and pressing Enter. Useful for automating tasks on a connected device. ```python from serialPort.SerialManager import SerialManager import time import logging # Enable debug logging to see the connection process logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') def data_ready_callback(data): """Callback function for received data""" print(f"Received data: {data.hex(' ')}") def main(): serial_manager = SerialManager() serial_manager.set_data_ready_callback(data_ready_callback) # Note: The baudrate parameter is now ignored - always uses 115200 # If device is at 9600, it will be automatically reconfigured to 115200 port_path = "COM7" print(f"Connecting to {port_path}...") print("Note: Will always connect at 115200 baud. If device is at 9600, it will be reconfigured automatically.") if not serial_manager.connect(port_path): print(f"Failed to connect to {port_path}") print("Please check:") print("1. Device is connected") print("2. Port path is correct") print("3. No other application is using the port") return print(f"Successfully connected to: {serial_manager.get_port_name()}") print(f"Final baudrate: {serial_manager.ser_port.baudrate if serial_manager.ser_port else 'Unknown'}") # Test keyboard commands print("\nSending keyboard commands...") time.sleep(0.5) # Open terminal (Ctrl+Alt+T for Linux, Win+R for Windows) serial_manager.keyboard.send_key_combination("ctrl", "alt", "t") time.sleep(2) # Send simple command serial_manager.keyboard.send_text("ls") time.sleep(0.5) # Press Enter serial_manager.keyboard.send_key_press(0x28) # Enter key code time.sleep(0.5) # Send echo command serial_manager.keyboard.send_text("echo 'Hello, Openterface!'") time.sleep(2) serial_manager.keyboard.send_key_press(0x28) time.sleep(0.5) # Clear screen serial_manager.keyboard.send_text("clear") time.sleep(0.5) # Press Enter again serial_manager.keyboard.send_key_press(0x28) # Enter key code time.sleep(0.5) serial_manager.keyboard.send_text("exit") time.sleep(0.5) serial_manager.keyboard.send_key_press(0x28) # Enter key code time.sleep(0.5) print("Keyboard test completed!") serial_manager.disconnect() if __name__ == "__main__": main() ``` -------------------------------- ### Display Detailed Device Information and Selection Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Provides a function to display detailed information for a list of devices, including their port chain, serial port, HID, camera, and audio paths. It then displays both initial and current device details and demonstrates selecting a device by its port chain. ```python # Display detailed device information and device selection # Run this cell to see detailed information about all currently detected devices def display_detailed_device_info(devices, title="Device Information"): print(f"πŸ“± {title}") print("=" * 60) if not devices: print(" No devices found") return for i, device in enumerate(devices, 1): print(f"Device {i}:") print(f" Port Chain: {device.get('port_chain', 'Unknown')}") print(f" Serial Port ID: {device.get('serial_port', 'Not found')}") print(f" Serial Port Path: {device.get('serial_port_path', 'Not found')}") print(f" HID ID: {device.get('HID', 'Not found')}") print(f" HID Path: {device.get('HID_path', 'Not found')}") print(f" Camera ID: {device.get('camera', 'Not found')}") print(f" Camera Path: {device.get('camera_path', 'Not found')}") print(f" Audio ID: {device.get('audio', 'Not found')}") print(f" Audio Path: {device.get('audio_path', 'Not found')}") print() # Display initial devices if monitor.initial_snapshot: initial_devices = [dev.to_dict() if hasattr(dev, 'to_dict') else dev for dev in monitor.initial_snapshot.devices] display_detailed_device_info(initial_devices, "Initial Devices") # Display current devices current_devices = monitor.device_manager.discover_devices() current_devices_dict = [dev.to_dict() for dev in current_devices] display_detailed_device_info(current_devices_dict, "Current Devices") # Show device selection by port chain print("🎯 Device Selection by Port Chain:") print("=" * 60) grouped_devices = device_selector.list_devices_grouped_by_port_chain() if grouped_devices: for port_chain, devices in grouped_devices.items(): print(f"Port Chain: {port_chain}") for device in devices: print(f" - {device}") print() # Example: Select first available device if grouped_devices: first_port_chain = list(grouped_devices.keys())[0] selected_device = device_selector.select_device_by_port_chain(first_port_chain) if selected_device: print(f"🎯 Example - Selected device from port chain '{first_port_chain}':") print(f" {selected_device}") else: print(" No devices available for selection") ``` -------------------------------- ### Stop Hotplug Monitoring and Display Summary Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Use this code to stop the hotplug monitoring service and then display a summary of the monitoring session. It shows the platform, duration, initial and final device counts, and any added or removed devices. ```python print("πŸ›‘ Stopping cross-platform hotplug monitoring...") monitor.stop_monitoring() print("βœ… Monitoring stopped successfully!") # Display final summary final_state = monitor.get_current_state() initial_state = monitor.get_initial_state() if final_state and initial_state: print(f"\nπŸ“ˆ Final Summary:") print(f" Platform: {DeviceFactory.get_current_platform()}") print(f" Monitoring duration: {final_state['timestamp'] - initial_state['timestamp']}") print(f" Initial device count: {initial_state['device_count']}") print(f" Final device count: {final_state['device_count']}") # Calculate final changes final_snapshot = monitor.device_manager.create_snapshot() final_changes = final_snapshot.compare_with(monitor.initial_snapshot) if final_changes['added_devices'] or final_changes['removed_devices']: print(f" Net changes:") if final_changes['added_devices']: print(f" βž• Added: {len(final_changes['added_devices'])} devices") if final_changes['removed_devices']: print(f" βž– Removed: {len(final_changes['removed_devices'])} devices") else: print(f" βœ… No net changes detected") print("\n🎯 Cross-platform hotplug detection demo completed!") ``` -------------------------------- ### Import Cross-Platform Modules Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb Imports necessary system, time, and device-related modules for cross-platform compatibility. Ensures the parent directory is in the Python path for module imports. ```python import sys import os import time from datetime import datetime import threading # Add the parent directory to the path to import device modules sys.path.insert(0, os.path.join(os.path.dirname(os.getcwd()), '..')) # Import cross-platform device modules from device import DeviceFactory from device.AbstractDeviceManager import DeviceSelector from utils import logger # Configure logging CoreLogger = logger.core_logger print("βœ… Cross-platform modules imported successfully!") print(f"πŸ“ Working directory: {os.getcwd()}") print(f"🐍 Python path: {sys.path[0]}") print(f"πŸ–₯️ Current platform: {DeviceFactory.get_current_platform()}") print(f"βœ… Platform supported: {DeviceFactory.is_platform_supported()}") ``` -------------------------------- ### Stop Monitoring Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/Testing.ipynb This snippet is intended to stop the ongoing monitoring service. It's a crucial step for resource management and clean shutdown of the monitoring process. ```python # Stop monitoring ``` -------------------------------- ### Stop Hot-Plugging Monitoring Source: https://github.com/techxartisanstudio/openterface_python/blob/main/src/notebook/example.ipynb Stops the active hot-plugging monitoring service. This should be called when the monitoring functionality is no longer needed to free up resources. ```python # Stop the hot-plug monitoring monitor.stop_monitoring() print("πŸ›‘ Hot-plug monitoring stopped") ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.