### Complete Operation Example: IoT Data System Setup Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/Elasticsearch.md A multi-step example demonstrating the creation of multiple Elasticsearch connectors for different IoT data streams, followed by listing connectors and publishing a test message. ```bash # 1. Create sensor data Elasticsearch connector robust-ctl mqtt connector create \ --connector-name "iot_sensor_es" \ --connector-type "Elasticsearch" \ --config '{"url": "http://localhost:9200", "index": "iot_sensors", "auth_type": "basic", "username": "elastic", "password": "elastic123"}' \ --topic-id "iot/sensors/+/data" ``` ```bash # 2. Create device status Elasticsearch connector robust-ctl mqtt connector create \ --connector-name "device_status_es" \ --connector-type "Elasticsearch" \ --config '{"url": "http://localhost:9200", "index": "device_status", "auth_type": "basic", "username": "elastic", "password": "elastic123"}' \ --topic-id "iot/devices/+/status" ``` ```bash # 3. Create alarm message Elasticsearch connector robust-ctl mqtt connector create \ --connector-name "alarm_es" \ --connector-type "Elasticsearch" \ --config '{"url": "http://localhost:9200", "index": "alarms", "auth_type": "basic", "username": "elastic", "password": "elastic123", "timeout_secs": 60}' \ --topic-id "iot/alarms/#" ``` ```bash # 4. View created connectors robust-ctl mqtt connector list ``` ```bash # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/data" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Install Go MQTT Client Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/go-sdk.md Install the Eclipse Paho MQTT Go Client library using go get. This is the first step to using the SDK. ```bash go get github.com/eclipse/paho.mqtt.golang ``` -------------------------------- ### Go NATS Quick Start Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md Demonstrates subscribing, publishing, and request-reply patterns using the nats.go library. Ensure the nats.go library is installed. ```bash go get github.com/nats-io/nats.go ``` ```go package main import ( "fmt" "time" "github.com/nats-io/nats.go" ) func main() { nc, _ := nats.Connect("nats://localhost:4222") defer nc.Close() // Subscribe nc.Subscribe("hello.ச்செய", func(m *nats.Msg) { fmt.Printf("Received: %s\n", m.Data) }) // Publish nc.Publish("hello.world", []byte("Hello RobustMQ!")) // Request-Reply msg, _ := nc.Request("hello.query", []byte("ping"), 2*time.Second) fmt.Printf("Reply: %s\n", msg.Data) time.Sleep(time.Second) } ``` -------------------------------- ### Complete IoT Data Storage System Setup Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/PostgreSQL.md A multi-step example demonstrating the creation of multiple PostgreSQL connectors for different data types (sensor readings, device status, alarms) and a test message publication. ```bash # 1. Create sensor data PostgreSQL connector robust-ctl mqtt connector create \ --connector-name "iot_sensor_postgres" \ --connector-type "Postgres" \ --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "sensor_readings", "pool_size": 25, "enable_batch_insert": true}' \ --topic-id "iot/sensors/+/readings" # 2. Create device status PostgreSQL connector robust-ctl mqtt connector create \ --connector-name "device_status_postgres" \ --connector-type "Postgres" \ --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "device_status", "enable_upsert": true, "conflict_columns": "client_id"}' \ --topic-id "iot/devices/+/status" # 3. Create alarm message PostgreSQL connector robust-ctl mqtt connector create \ --connector-name "alarm_postgres" \ --connector-type "Postgres" \ --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "alarm_logs", "pool_size": 15}' \ --topic-id "iot/alarms/#" # 4. View created connectors robust-ctl mqtt connector list # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/readings" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Go Project Setup and Execution Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/go-sdk.md Commands for initializing a Go module, installing dependencies, running the program, building a binary, and executing the compiled program. Essential for setting up and running your Go MQTT client project. ```bash # Initialize Go module go mod init robustmq-go-client # Install dependencies go mod tidy # Run program go run main.go # Build binary go build -o robustmq-client main.go # Run compiled program ./robustmq-client ``` -------------------------------- ### RobustMQ Installation and Basic Usage Source: https://github.com/robustmq/robustmq/blob/main/docs/en/OverView/What-is-RobustMQ.md Installs RobustMQ using a curl script, starts the server, and demonstrates publishing a message via MQTT and consuming it via Kafka and NATS. ```bash # One-line install curl -fsSL https://raw.githubusercontent.com/robustmq/robustmq/main/scripts/install.sh | bash # Start service robust-server start # Publish via MQTT mqttx pub -h localhost -p 1883 -t "robustmq.multi.protocol" -m "Hello RobustMQ!" # Consume the same message via Kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic robustmq.multi.protocol --from-beginning # Consume the same message via NATS nats sub "robustmq.multi.protocol" ``` -------------------------------- ### NATS CLI Quickstart: Subscribe and Publish Source: https://context7.com/robustmq/robustmq/llms.txt Demonstrates subscribing to a topic and publishing a message using the NATS CLI. Ensure the NATS CLI is installed and RobustMQ is running with NATS enabled. ```bash # CLI quickstart (NATS CLI) nats sub "hello.விடும்" --server nats://localhost:4222 & nats pub "hello.world" "Hello RobustMQ!" --server nats://localhost:4222 # Terminal 1 receives: [hello.world] Hello RobustMQ! ``` -------------------------------- ### Complete IoT Sensor Data Logging System Setup Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/LocalFile.md This comprehensive example demonstrates setting up multiple LocalFile connectors for different IoT data streams (sensors, device status, alarms), listing them, and publishing a test message. It covers the full lifecycle from creation to testing. ```bash # 1. Create sensor data connector robust-ctl mqtt connector create \ --connector-name "iot_sensor_logger" \ --connector-type "LocalFile" \ --config '{"local_file_path": "/var/log/robustmq/iot_sensors.log"}' \ --topic-id "iot/sensors/+/data" # 2. Create device status connector robust-ctl mqtt connector create \ --connector-name "device_status_logger" \ --connector-type "LocalFile" \ --config '{"local_file_path": "/var/log/robustmq/device_status.log"}' \ --topic-id "iot/devices/+/status" # 3. Create alarm message connector robust-ctl mqtt connector create \ --connector-name "alarm_logger" \ --connector-type "LocalFile" \ --config '{"local_file_path": "/var/log/robustmq/alarms.log"}' \ --topic-id "iot/alarms/#" # 4. View created connectors robust-ctl mqtt connector list # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/data" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Install and Start RobustMQ Broker Source: https://github.com/robustmq/robustmq/blob/main/README.md Installs RobustMQ using a curl script and starts the broker server. Ensure you have necessary permissions. ```bash curl -fsSL https://raw.githubusercontent.com/robustmq/robustmq/main/scripts/install.sh | bash broker-server start ``` -------------------------------- ### Quick Start RobustMQ Server Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Blogs/08.md Download, extract, and run the RobustMQ server binary to get started. Ensure you have wget and tar installed. ```bash # Download binary wget https://github.com/robustmq/robustmq/releases/download/v0.2.0/robustmq-0.2.0-linux-amd64.tar.gz ``` ```bash # Extract and run tar -xzf robustmq-0.2.0-linux-amd64.tar.gz cd robustmq-0.2.0-linux-amd64 ./bin/robust-server start ``` -------------------------------- ### JavaScript/Node.js NATS Quick Start Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md Demonstrates NATS publish, subscribe, and request-reply using the nats.js library. Install via npm. ```bash npm install nats ``` ```javascript import { connect, StringCodec } from "nats"; const nc = await connect({ servers: "nats://localhost:4222" }); const sc = StringCodec(); // Subscribe const sub = nc.subscribe("hello.ச்செய"); (async () => { for await (const msg of sub) { console.log(`Received: ${sc.decode(msg.data)}`); } })(); // Publish nc.publish("hello.world", sc.encode("Hello RobustMQ!")); // Request-Reply const reply = await nc.request("hello.query", sc.encode("ping"), { timeout: 2000 }); console.log(`Reply: ${sc.decode(reply.data)}`); await nc.close(); ``` -------------------------------- ### Complete IoT Message Routing System Setup Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/RabbitMQ.md A comprehensive example demonstrating the creation of multiple RabbitMQ connectors for an IoT system. It includes connectors for sensor data, device status, and critical alarms, followed by listing connectors and publishing a test message. ```bash # 1. Create sensor data RabbitMQ connector (Topic Exchange) robust-ctl mqtt connector create \ --connector-name "iot_sensor_rabbitmq" \ --connector-type "RabbitMQ" \ --config '{"server": "rabbitmq.iot.local", "port": 5672, "username": "iot_user", "password": "iot_pass", "virtual_host": "/iot", "exchange": "iot_topic_exchange", "routing_key": "sensor.temperature.data", "delivery_mode": "Persistent", "enable_tls": false}' \ --topic-id "iot/sensors/temperature/+" # 2. Create device status RabbitMQ connector robust-ctl mqtt connector create \ --connector-name "device_status_rabbitmq" \ --connector-type "RabbitMQ" \ --config '{"server": "rabbitmq.iot.local", "port": 5672, "username": "iot_user", "password": "iot_pass", "virtual_host": "/iot", "exchange": "iot_topic_exchange", "routing_key": "device.status", "delivery_mode": "Persistent", "enable_tls": false}' \ --topic-id "iot/devices/+/status" # 3. Create alarm message RabbitMQ connector (high priority) robust-ctl mqtt connector create \ --connector-name "alarm_rabbitmq" \ --connector-type "RabbitMQ" \ --config '{"server": "rabbitmq.iot.local", "port": 5672, "username": "iot_user", "password": "iot_pass", "virtual_host": "/iot", "exchange": "iot_alarms", "routing_key": "alarm.critical", "delivery_mode": "Persistent", "enable_tls": false}' \ --topic-id "iot/alarms/#" # 4. View created connectors robust-ctl mqtt connector list # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temperature/001" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Python NATS Quick Start Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md Shows how to subscribe, publish, and perform request-reply operations with nats-py. Install the library using pip. ```bash pip install nats-py ``` ```python import asyncio import nats async def main(): nc = await nats.connect("nats://localhost:4222") # Subscribe async def handler(msg): print(f"Received: {msg.data.decode()}") await nc.subscribe("hello.ச்செய", cb=handler) # Publish await nc.publish("hello.world", b"Hello RobustMQ!") # Request-Reply reply = await nc.request("hello.query", b"ping", timeout=2) print(f"Reply: {reply.data.decode()}") await asyncio.sleep(1) await nc.close() asyncio.run(main()) ``` -------------------------------- ### Run Example with Maven Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/java-sdk.md Execute the main class using Maven for running examples. ```bash mvn exec:java -Dexec.mainClass="io.robustmq.App" ``` -------------------------------- ### Complete IoT Data Storage System Setup with robust-ctl Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/MySQL.md A comprehensive example demonstrating the creation of multiple MySQL connectors for different IoT data types (sensor readings, device status, alarm logs) and a test message publication. ```bash # 1. Create sensor data MySQL connector robust-ctl mqtt connector create \ --connector-name "iot_sensor_mysql" \ --connector-type "MySQL" \ --config '{"host": "mysql.iot.local", "port": 3306, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "sensor_readings", "pool_size": 25, "enable_batch_insert": true}' \ --topic-id "iot/sensors/+/readings" # 2. Create device status MySQL connector robust-ctl mqtt connector create \ --connector-name "device_status_mysql" \ --connector-type "MySQL" \ --config '{"host": "mysql.iot.local", "port": 3306, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "device_status", "enable_upsert": true}' \ --topic-id "iot/devices/+/status" # 3. Create alarm message MySQL connector robust-ctl mqtt connector create \ --connector-name "alarm_mysql" \ --connector-type "MySQL" \ --config '{"host": "mysql.iot.local", "port": 3306, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "alarm_logs", "pool_size": 15}' \ --topic-id "iot/alarms/#" # 4. View created connectors robust-ctl mqtt connector list # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/readings" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Start Local Documentation Development Server Source: https://github.com/robustmq/robustmq/blob/main/docs/en/ContributionGuide/ContributingDoc/Build-Doc-Env.md Run this command to start a local development server for the documentation. ```shell npm run docs:dev ``` -------------------------------- ### Run Async MQTT Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/python-sdk.md Execute the asynchronous MQTT client example script. ```bash python async_client.py ``` -------------------------------- ### Rust NATS Quick Start Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md A Rust example demonstrating NATS subscribe, publish, and receive functionality using the async-nats crate. Add dependencies to Cargo.toml. ```toml [dependencies] async-nats = "0.37" tokio = { version = "1", features = ["full"] } ``` ```rust #[tokio::main] async fn main() -> Result<(), async_nats::Error> { let client = async_nats::connect("nats://localhost:4222").await?; // Subscribe let mut sub = client.subscribe("hello.ச்செய").await?; // Publish client.publish("hello.world", "Hello RobustMQ!".into()).await?; // Receive if let Some(msg) = sub.next().await { println!("Received: {:?}", msg.payload); } Ok(()) } ``` -------------------------------- ### Install MQTTX CLI on Windows Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/MQTTX-Guide.md Install MQTTX CLI using npm or Chocolatey. Ensure Node.js and npm or Chocolatey are installed. ```bash # Using npm npm install -g @emqx/mqttx-cli # Or using Chocolatey choco install mqttx-cli ``` -------------------------------- ### Install RobustMQ with One-Line Installer Source: https://github.com/robustmq/robustmq/blob/main/docs/en/InstallationDeployment/Binary-Deployment.md Use this command to quickly install RobustMQ. It downloads and executes an installation script. ```bash curl -fsSL https://raw.githubusercontent.com/robustmq/robustmq/main/scripts/install.sh | bash ``` -------------------------------- ### Run Data Pipeline Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/python-sdk.md Execute the Python script for the data pipeline example. ```bash python data_pipeline.py ``` -------------------------------- ### Start RobustMQ Admin Server Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Api/COMMON.md Instructions for starting the admin-server service using Cargo or a pre-compiled binary. ```bash # Start admin-server cargo run --bin admin-server ``` ```bash # Or use compiled binary ./target/release/admin-server ``` -------------------------------- ### Complete IoT Data Storage System Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/GreptimeDB.md This example demonstrates the full process of setting up multiple GreptimeDB connectors for different data types (sensor data, device status, alarms), viewing them, and publishing a test message. ```bash # 1. Create sensor data GreptimeDB connector robust-ctl mqtt connector create \ --connector-name "iot_sensor_greptimedb" \ --connector-type "GreptimeDB" \ --config '{"server_addr": "greptimedb:4000", "database": "iot_sensors", "user": "sensor_user", "password": "sensor_pass", "precision": "ms"}' \ --topic-id "iot/sensors/+/data" # 2. Create device status GreptimeDB connector robust-ctl mqtt connector create \ --connector-name "device_status_greptimedb" \ --connector-type "GreptimeDB" \ --config '{"server_addr": "greptimedb:4000", "database": "device_status", "user": "device_user", "password": "device_pass", "precision": "s"}' \ --topic-id "iot/devices/+/status" # 3. Create alarm message GreptimeDB connector robust-ctl mqtt connector create \ --connector-name "alarm_greptimedb" \ --connector-type "GreptimeDB" \ --config '{"server_addr": "greptimedb:4000", "database": "alarms", "user": "alarm_user", "password": "alarm_pass", "precision": "s"}' \ --topic-id "iot/alarms/#" # 4. View created connectors robust-ctl mqtt connector list # 5. Test connector (publish test message) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/data" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ``` -------------------------------- ### Start RobustMQ Server Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md Installs and starts the RobustMQ server. Ensure NATS port 4222 is available. ```bash curl -fsSL https://raw.githubusercontent.com/robustmq/robustmq/main/scripts/install.sh | bash robust-server start ``` -------------------------------- ### Smart Home Monitoring: Subscribe and Publish Examples Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/WildcardSubscription.md This example demonstrates a practical application for smart home monitoring using wildcard subscriptions. It shows how to subscribe to all sensor data within different rooms and how to publish data from various sensors. ```bash # Subscribe to sensor data from all rooms mqttx sub -t 'home/+/sensor/#' -h '117.72.92.117' -p 1883 -v # Publish sensor data from different rooms mqttx pub -t 'home/livingroom/sensor/temperature' -m '{"value":22.5}' -h '117.72.92.117' -p 1883 mqttx pub -t 'home/bedroom/sensor/humidity' -m '{"value":55}' -h '117.72.92.117' -p 1883 mqttx pub -t 'home/kitchen/sensor/light' -m '{"value":300}' -h '117.72.92.117' -p 1883 ``` -------------------------------- ### MQTT Topic Rewrite Rules Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/TopicRewrite.md These are example rewrite rules. Rule 1 rewrites topics starting with 'y/', followed by any character, then 'z/', and any characters, to 'y/z/' followed by the second captured group. Rule 2 rewrites topics starting with 'x/y/' followed by any characters to 'z/y/x/' followed by the first captured group. Rule 3 rewrites topics starting with 'x/y/' followed by digits to 'z/y/' followed by the captured digits. ```text Rule 1: y/+/z/# → y/z/$2 (regex: ^y/(.+)/z/(.+)$) Rule 2: x/# → z/y/x/$1 (regex: ^x/y/(.+)$) Rule 3: x/y/+ → z/y/$1 (regex: ^x/y/(\d+)$) ``` -------------------------------- ### Start RobustMQ Server in Single-Machine Cluster Mode Source: https://github.com/robustmq/robustmq/blob/main/docs/en/InstallationDeployment/Binary-Deployment.md Starts three RobustMQ nodes for a single-machine cluster setup, suitable for development and testing. Each node uses a different configuration file and port. ```bash # Run each in a separate terminal robust-server start config/cluster/server-1.toml ``` ```bash robust-server start config/cluster/server-2.toml ``` ```bash robust-server start config/cluster/server-3.toml ``` -------------------------------- ### Download and Extract RobustMQ Binary Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Blogs/03.md Download the RobustMQ binary installation package and extract it to start a cluster. ```bash $ wget https://github.com/robustmq/robustmq/releases/download/v0.1.33/robustmq-v0.1.33-linux-amd64.tar.gz $ tar -xzvf robustmq-v0.1.33-linux-amd64.tar.gz $ cd robustmq-v0.1.33-linux-amd64 $ tree . ``` ```bash . ├── bin │ ├── robust-bench │ ├── robust-ctl │ └── robust-server ├── config │ ├── certs │ │ ├── ca.pem │ │ ├── cert.pem │ │ └── key.pem │ ├── logger.toml │ ├── server.toml │ ├── server.toml.template │ ├── version.ini │ └── version.txt ├── docs ├── libs │ ├── broker-server │ ├── cli-bench │ └── cli-command └── package-info.txt ``` -------------------------------- ### Complete Example: Subscribe and Publish Source: https://github.com/robustmq/robustmq/blob/main/docs/en/QuickGuide/PublicMqttServer.md This example demonstrates a full cycle of subscribing to a topic and then publishing a message to it, simulating a data flow. It requires two terminals. ```bash # Terminal 1: Subscribe to temperature sensor data mqttx sub -h 117.72.92.117 -p 1883 -u admin -P robustmq -t "sensors/temperature" --verbose # Terminal 2: Send temperature data mqttx pub -h 117.72.92.117 -p 1883 -u admin -P robustmq -t "sensors/temperature" -m '{"sensor": "temp-001", "value": 23.5, "unit": "celsius", "timestamp": "2024-01-01T12:00:00Z"}' ``` -------------------------------- ### Kafka Connector JSON Configuration Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/Kafka.md Example of Kafka connector configuration in JSON format. Use this for basic setup. ```json { "bootstrap_servers": "localhost:9092", "topic": "mqtt_messages", "key": "sensor_data" } ``` -------------------------------- ### Basic API Requests Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Api/COMMON.md Examples of making GET requests to retrieve service version, cluster status, and paginated lists. ```bash # Get service version curl -X GET http://localhost:8080/ ``` ```bash # Get cluster status curl -X GET http://localhost:8080/api/status ``` ```bash # List query with pagination curl "http://localhost:8080/api/cluster/user/list?limit=10&page=1&sort_field=username&sort_by=asc" ``` -------------------------------- ### Get Cluster Status Source: https://github.com/robustmq/robustmq/blob/main/chaos-test/SKILL.md Checks the current status of the RobustMQ cluster. Used to ensure a clean state before starting tests. ```shell cluster_manage(action=status) ``` -------------------------------- ### Sensor Data Processing Example with MQTTX CLI Source: https://github.com/robustmq/robustmq/blob/main/docs/zh/RobustMQ-MQTT/SharedSubscription.md Demonstrates setting up two shared subscription groups for sensor temperature data processing and publishing sample data. ```bash # 数据处理组1 mqttx sub -t '$share/processor1/sensor/temperature' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$share/processor1/sensor/temperature' -h '117.72.92.117' -p 1883 -v # 数据处理组2 mqttx sub -t '$share/processor2/sensor/temperature' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$share/processor2/sensor/temperature' -h '117.72.92.117' -p 1883 -v # 发布传感器数据 mqttx pub -t 'sensor/temperature' -m '{"value":25.5,"timestamp":"2024-01-01T12:00:00Z"}' -h '117.72.92.117' -p 1883 ``` -------------------------------- ### MQTT Shared Subscription Example Source: https://context7.com/robustmq/robustmq/llms.txt Demonstrates setting up and using MQTT shared subscriptions for load balancing messages across multiple subscribers within a group. ```bash # Pattern: $share/{group_name}/{topic} or $queue/{topic} # 3 competing workers in "processors" group — each message goes to exactly one worker mqttx sub -t '$share/processors/sensor/data' -h localhost -p 1883 -u admin -P robustmq & mqttx sub -t '$share/processors/sensor/data' -h localhost -p 1883 -u admin -P robustmq & mqttx sub -t '$share/processors/sensor/data' -h localhost -p 1883 -u admin -P robustmq & # A second independent group receives ALL messages too (group isolation) mqttx sub -t '$share/analytics/sensor/data' -h localhost -p 1883 -u admin -P robustmq & # Queue subscription (no explicit group name — all subs share one group) mqttx sub -t '$queue/job/queue' -h localhost -p 1883 & mqttx sub -t '$queue/job/queue' -h localhost -p 1883 & # Publish to the original topic — shared subscription routing is broker-side mqttx pub -t 'sensor/data' -m '{"temp": 25.5}' -h localhost -p 1883 ``` -------------------------------- ### Log Processing System Example with MQTTX CLI Source: https://github.com/robustmq/robustmq/blob/main/docs/zh/RobustMQ-MQTT/SharedSubscription.md Sets up a shared subscription group for application logs and publishes sample INFO and ERROR log messages. ```bash # 日志处理组 mqttx sub -t '$share/log_processor/application/logs' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$share/log_processor/application/logs' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$share/log_processor/application/logs' -h '117.72.92.117' -p 1883 -v # 发布日志消息 mqttx pub -t 'application/logs' -m '{"level":"INFO","message":"User login successful","user_id":"123"}' -h '117.72.92.117' -p 1883 mqttx pub -t 'application/logs' -m '{"level":"ERROR","message":"Database connection failed","error":"timeout"}' -h '117.72.92.117' -p 1883 ``` -------------------------------- ### Query Blacklist using HTTP API Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Security/Blacklist.md Examples of querying blacklists using HTTP GET requests to the /api/mqtt/blacklist/list endpoint, with options for pagination and filtering. ```APIDOC ## Query Blacklist ```bash # Query all blacklists with pagination curl "http://localhost:8080/api/mqtt/blacklist/list?limit=10&page=1" # Query specific type blacklist with filtering curl "http://localhost:8080/api/mqtt/blacklist/list?limit=10&page=1&filter_field=blacklist_type&filter_values=User&exact_match=true" ``` ``` -------------------------------- ### Get Cluster Status Response Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Api/CLUSTER.md Example JSON response when querying the cluster status. It includes version, node list, and Raft group status. ```json { "code": 0, "data": "success", "error": null } ``` -------------------------------- ### Smart Home Device Control Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/AutoSubscription.md This example illustrates smart home device control. A smart light connects and is auto-subscribed to a control topic. A home automation system then sends control commands to the light. ```bash # Smart light connects and is auto-subscribed to control topic mqttx conn -i light001 -h '117.72.92.117' -p 1883 # Home automation system sends control commands mqttx pub -t 'home/light001/control' -m '{"action":"turn_on","brightness":80,"color":"white"}' -h '117.72.92.117' -p 1883 # Light automatically receives and processes the command ``` -------------------------------- ### Java NATS Quick Start Source: https://github.com/robustmq/robustmq/blob/main/docs/en/nats/QuickStart.md Provides a Java example for NATS operations including subscribe, publish, and request-reply using the jnats library. Add the dependency to your pom.xml. ```xml io.nats jnats 2.20.5 ``` ```java Connection nc = Nats.connect("nats://localhost:4222"); // Subscribe Dispatcher d = nc.createDispatcher((msg) -> { System.out.println("Received: " + new String(msg.getData())); }); d.subscribe("hello.ச்செய"); // Publish nc.publish("hello.world", "Hello RobustMQ!".getBytes()); // Request-Reply Message reply = nc.request("hello.query", "ping".getBytes(), Duration.ofSeconds(2)); System.out.println("Reply: " + new String(reply.getData())); nc.close(); ``` -------------------------------- ### Connection Benchmark (Create Mode) Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Bench/Bench-CLI.md Example of running a connection benchmark for MQTT in create mode. ```bash robust-bench mqtt conn \ --host 127.0.0.1 \ --port 1883 \ --count 10000 \ --concurrency 1000 \ --mode create ``` -------------------------------- ### Task Queue Processing Example with MQTTX CLI Source: https://github.com/robustmq/robustmq/blob/main/docs/zh/RobustMQ-MQTT/SharedSubscription.md Sets up a queue-based subscription for task processing and publishes sample tasks to the queue. ```bash # 工作节点订阅任务队列 mqttx sub -t '$queue/job/queue' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$queue/job/queue' -h '117.72.92.117' -p 1883 -v mqttx sub -t '$queue/job/queue' -h '117.72.92.117' -p 1883 -v # 发布任务到队列 mqttx pub -t 'job/queue' -m '{"job_id":"J001","type":"image_processing","data":"base64..."}' -h '117.72.92.117' -p 1883 mqttx pub -t 'job/queue' -m '{"job_id":"J002","type":"data_analysis","data":"csv_data"}' -h '117.72.92.117' -p 1883 ``` -------------------------------- ### NATS Request-Reply Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/QuickGuide/Experience-NATS.md Implement synchronous request-response communication. Terminal 1 starts a service that replies to requests, and Terminal 2 sends a request and waits for a reply. ```bash # Terminal 1: start a service that replies to requests nats reply orders.query '{"status":"ok"}' # Terminal 2: send a request and wait for the reply nats request orders.query '{"id":"001"}' ``` -------------------------------- ### Install MQTTX CLI on Linux Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/MQTTX-Guide.md Install MQTTX CLI using npm or by downloading the binary. If using the binary, ensure it's executable and moved to your PATH. ```bash # Using npm npm install -g @emqx/mqttx-cli # Or download binary wget https://github.com/emqx/MQTTX/releases/latest/download/mqttx-cli-linux-x64 chmod +x mqttx-cli-linux-x64 sudo mv mqttx-cli-linux-x64 /usr/local/bin/mqttx ``` -------------------------------- ### Deploy RobustMQ with Docker Compose Source: https://github.com/robustmq/robustmq/blob/main/docs/en/InstallationDeployment/Docker-Deployment.md Clone the RobustMQ repository and use Docker Compose to start, check the status of, or stop the RobustMQ service. This method is recommended for more complex setups. ```bash # Clone the repo git clone https://github.com/robustmq/robustmq.git cd robustmq # Start docker compose -f docker/robustmq/docker-compose.yml up -d robustmq # Check status docker compose -f docker/robustmq/docker-compose.yml ps # Stop docker compose -f docker/robustmq/docker-compose.yml down ``` -------------------------------- ### Run Basic MQTT Example Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/python-sdk.md Execute the main Python script to run a basic MQTT client example. ```bash python main.py ``` -------------------------------- ### Example: Basic Webhook Connector Creation with robust-ctl Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/Bridge/Webhook.md Demonstrates a practical example of creating a Webhook connector using robust-ctl, including essential parameters like name, type, configuration URL, and topic. ```bash robust-ctl mqtt connector create \ --connector-name "webhook_connector_01" \ --connector-type "webhook" \ --config '{"url": "http://localhost:8080/webhook"}' \ --topic-id "sensor/data" ``` -------------------------------- ### Basic MQTT Connection, Publish, and Subscribe with Go Source: https://github.com/robustmq/robustmq/blob/main/docs/en/RobustMQ-MQTT/SDK/go-sdk.md Demonstrates establishing a basic connection to RobustMQ, setting up message handlers, subscribing to a topic, publishing a message, and then unsubscribing and disconnecting. Configure connection options like broker address, client ID, username, password, and keep-alive intervals. ```go package main import ( "fmt" "log" "os" "time" "github.com/eclipse/paho.mqtt.golang" ) // Message handler callback function var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Message received - Topic: %s\n", msg.Topic()) fmt.Printf("Message content: %s\n", msg.Payload()) fmt.Printf("QoS: %d\n", msg.Qos()) fmt.Printf("Retained: %t\n", msg.Retained()) fmt.Println("==================") } // Connection lost callback function var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connection lost: %v\n", err) fmt.Println("Attempting to reconnect...") } // Connection success callback function var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Successfully connected to RobustMQ") } func main() { // Enable debug logging mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0) mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0) // Create client options opts := mqtt.NewClientOptions() opts.AddBroker("tcp://localhost:1883") opts.SetClientID("robustmq_go_client") opts.SetUsername("your_username") opts.SetPassword("your_password") // Set connection parameters opts.SetKeepAlive(60 * time.Second) opts.SetDefaultPublishHandler(messageHandler) opts.SetPingTimeout(1 * time.Second) opts.SetConnectTimeout(5 * time.Second) opts.SetAutoReconnect(true) opts.SetMaxReconnectInterval(1 * time.Minute) // Set callback functions opts.SetConnectionLostHandler(connectLostHandler) opts.SetOnConnectHandler(connectHandler) // Create client client := mqtt.NewClient(opts) // Connect to RobustMQ Broker if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } // Subscribe to topic topic := "robustmq/go/test/#" if token := client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil { fmt.Println("Subscription failed:", token.Error()) os.Exit(1) } fmt.Printf("Successfully subscribed to topic: %s\n", topic) // Publish message pubTopic := "robustmq/go/test/hello" payload := "Hello RobustMQ from Go client!" token := client.Publish(pubTopic, 1, false, payload) token.Wait() fmt.Printf("Message published to topic: %s\n", pubTopic) // Wait to receive messages time.Sleep(3 * time.Second) // Unsubscribe if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil { fmt.Println("Unsubscribe failed:", token.Error()) os.Exit(1) } fmt.Printf("Unsubscribed from topic: %s\n", topic) // Disconnect client.Disconnect(250) fmt.Println("Disconnected from RobustMQ") } ``` -------------------------------- ### Start File Write Executor with Compio and Thread Affinity Source: https://github.com/robustmq/robustmq/blob/main/docs/en/Blogs/58.md Initializes a file write executor using compio for asynchronous I/O and pins each thread to a specific CPU core for performance. This setup is beneficial for reducing cache misses and system calls. ```rust pub struct FileWriteExecutor { shards: Vec>, } impl FileWriteExecutor { pub fn start(num_threads: usize) -> Arc { let mut shards = Vec::with_capacity(num_threads); for core_id in 0..num_threads { let (tx, rx) = std::sync::mpsc::sync_channel::(8192); shards.push(tx); std::thread::Builder::new() .name(format!("storage-io-{}", core_id)) .spawn(move || { // Pin to a fixed CPU core to keep page cache and TLB warm core_affinity::set_for_current(CoreId { id: core_id }).ok(); // compio runtime = io_uring event loop compio::runtime::Runtime::new() .unwrap() .block_on(file_write_thread_main(rx)); }) .unwrap(); } Arc::new(Self { shards }) } /// partition_id determines which shard to use; writes to the same partition are ordered pub async fn append( &self, partition_id: u64, path: PathBuf, data: Bytes, ) -> Result { let shard = (partition_id as usize) % self.shards.len(); let (reply_tx, reply_rx) = oneshot::channel(); self.shards[shard] .send(FileTask::Append { path, data, reply: reply_tx }) .map_err(|_| IoError::ExecutorShutdown)?; reply_rx.await.map_err(|_| IoError::ReplyDropped)? } } async fn file_write_thread_main(rx: std::sync::mpsc::Receiver) { loop { // Receive tasks within the compio context let task = compio::runtime::spawn_blocking(move || rx.recv()) .await .unwrap(); match task { Ok(FileTask::Append { path, data, reply }) => { // io_uring asynchronous file append // The kernel holds the buffer until the write completes; application awaits completion let result = async { let file = compio::fs::OpenOptions::new() .append(true) .open(&path) .await?; let (_, offset) = file.write_all_at(data, u64::MAX).await?; Ok::(offset) } .await .map_err(IoError::Io); let _ = reply.send(result); } Err(_) => return, } } } ``` -------------------------------- ### Full Operation Example - IoT Data System Source: https://github.com/robustmq/robustmq/blob/main/docs/zh/RobustMQ-MQTT/Bridge/Kafka.md A comprehensive example demonstrating the creation of multiple Kafka connectors for different IoT data streams (sensors, device status, alarms) and publishing a test message. ```bash # 1. 创建传感器数据 Kafka 连接器 robust-ctl mqtt connector create \ --connector-name "iot_sensor_kafka" \ --connector-type "Kafka" \ --config '{"bootstrap_servers": "kafka1:9092,kafka2:9092", "topic": "iot_sensors", "key": "sensor_key"}' \ --topic-id "iot/sensors/+/data" # 2. 创建设备状态 Kafka 连接器 robust-ctl mqtt connector create \ --connector-name "device_status_kafka" \ --connector-type "Kafka" \ --config '{"bootstrap_servers": "kafka1:9092,kafka2:9092", "topic": "device_status", "key": "device_key"}' \ --topic-id "iot/devices/+/status" # 3. 创建告警消息 Kafka 连接器 robust-ctl mqtt connector create \ --connector-name "alarm_kafka" \ --connector-type "Kafka" \ --config '{"bootstrap_servers": "kafka1:9092,kafka2:9092", "topic": "alarms", "key": "alarm_key"}' \ --topic-id "iot/alarms/#" # 4. 查看创建的连接器 robust-ctl mqtt connector list # 5. 测试连接器(发布测试消息) robust-ctl mqtt publish \ --username "test_user" \ --password "test_pass" \ --topic "iot/sensors/temp_001/data" \ --qos 1 \ --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}' ```