### Start RocketMQ Producer and Consumer Examples (Shell) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md These commands demonstrate how to start the RocketMQ producer and consumer examples using the provided tools script. They set the NameServer address via an environment variable and execute the respective Java classes. ```shell cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4 export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ``` -------------------------------- ### MQTT Client Examples Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/08-mqtt/02RocketMQMQTTQuickStart.md Java code examples demonstrating how to implement MQTT clients for publishing and subscribing to messages using RocketMQ MQTT. ```java MqttConsumer.java // MQTT客户端启动订阅消息 MqttProducer.java // MQTT客户端启动发布消息 ``` -------------------------------- ### PushConsumer Example C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating message consumption using the PushConsumer with the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExamplePushConsumer.cpp return 0; } ``` -------------------------------- ### Start RocketMQ Broker (Single Node, Single Replica) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/05-deploymentOperations/01deploy.md Starts a single RocketMQ broker in a single-node, single-replica mode. This setup is not recommended for production due to its high risk of service unavailability if the broker restarts or goes down. It is primarily intended for local testing purposes. ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 & ``` -------------------------------- ### SimpleConsumer Example C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating message consumption using the SimpleConsumer with the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleSimpleConsumer.cpp return 0; } ``` -------------------------------- ### Start RocketMQ Components Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/03autofailover.md Shell commands to start NameServer, Controller, and Broker components using configuration files. ```shell $ nohup sh bin/mqnamesrv -c namesrv.conf & $ nohup sh bin/mqcontroller -c controller.conf & $ nohup sh bin/mqbroker -c broker.conf & ``` -------------------------------- ### RocketMQ Client Examples Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/08-mqtt/02RocketMQMQTTQuickStart.md Java code examples showing how to implement RocketMQ clients for publishing and subscribing to messages directly via RocketMQ. ```java RocketMQConsumer.java //RocketMQ客户端启动订阅消息 RocketMQProducer.java // RocketMQ客户端启动发布消息 ``` -------------------------------- ### Sync Send Example Producer C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating synchronous message sending using the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleProducer.cpp return 0; } ``` -------------------------------- ### Start RocketMQ Broker (Single Master) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/01deploy.md This command starts a single RocketMQ broker instance. It requires the NameServer address and a configuration file path. Ensure the ROCKETMQ_HOME environment variable is set. ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties & ``` -------------------------------- ### Start RocketMQ MQTT Services Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/08-mqtt/02RocketMQMQTTQuickStart.md Commands to start the meta service (metadata center) and the MQTT broker. These services are crucial for the MQTT functionality. ```bash sh meta.sh start sh mqtt.sh start ``` -------------------------------- ### Producer with FIFO Message Example C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating sending FIFO (First-In, First-Out) messages using the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleProducerWithFifoMessage.cpp return 0; } ``` -------------------------------- ### Start RocketMQ Proxy Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/01deploy.md This command starts a RocketMQ proxy instance. It requires the NameServer address. Multiple proxy instances can be started on different machines for scalability. ```shell nohup sh bin/mqproxy -n 192.168.1.1:9876 & ``` ```shell nohup sh bin/mqproxy -n 192.168.1.1:9876 & ``` ```shell nohup sh bin/mqproxy -n 192.168.1.1:9876 & ``` -------------------------------- ### Start RocketMQ Broker (Multi-Master Sync Dual-Write) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/01deploy.md This command starts RocketMQ brokers in a multi-master cluster using synchronous dual-write for HA. Each master has a slave, and a successful write requires confirmation from both. The NameServer address and configuration files are specified. ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties & ``` -------------------------------- ### Start RocketMQ Broker (Multi-Master Async Replication) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/01deploy.md This command starts RocketMQ brokers in a multi-master cluster with asynchronous replication. Each master is paired with a slave. The NameServer address is provided, along with specific configuration files for master and slave instances. ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties & ``` ```bash nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties & ``` -------------------------------- ### Start RocketMQ NameServer Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/05-deploymentOperations/01deploy.md Starts the RocketMQ NameServer, which is the entry point for all brokers and clients. It manages the metadata of the cluster. The command uses 'nohup' to run in the background and redirects output to a log file for verification. ```bash nohup sh mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log ``` -------------------------------- ### Start and verify NameServer Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/02-quickStart/01quickstart.md Launch the NameServer process and monitor the logs to confirm successful startup. ```shell ### start namesrv $ nohup sh bin/mqnamesrv & ### verify namesrv $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... ``` -------------------------------- ### Producer with Transactional Message Example C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating sending transactional messages using the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleProducerWithTransactionalMessage.cpp return 0; } ``` -------------------------------- ### Configure and Start RocketMQ Connect Runtime Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/06RocketMQ Connect In Action3.md Steps to build the runtime, configure the standalone properties, and launch the service. ```bash cd rocketmq-connect mvn -Prelease-connect -DskipTests clean install -U # Edit conf/connect-standalone.conf # Set pluginPaths=/usr/local/connector-plugins cd distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT sh bin/connect-standalone.sh -c conf/connect-standalone.conf & ``` -------------------------------- ### Start and verify Broker and Proxy Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/02-quickStart/01quickstart.md Launch the Broker and Proxy in local mode and verify the startup via proxy logs. ```shell ### start broker $ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & ### verify broker $ tail -f ~/logs/rocketmqlogs/proxy.log The broker[broker-a,192.169.1.2:10911] boot success... ``` -------------------------------- ### Producer with Delay Message Example C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating sending delay messages using the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleProducerWithTimedMessage.cpp return 0; } ``` -------------------------------- ### Async Send Example Producer C++ Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/13-sdk/03cplusplus.md Example demonstrating asynchronous message sending using the C++ gRPC SDK. Refer to the rocketmq-clients repository for full project details and runtime environment setup. ```cpp #include "../src/Producer.h" #include "../src/Message.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include "../src/MQAdminExt.h" #include "../src/MQClient.h" #include "../src/MQProducer.h" #include "../src/MQConsumer.h" #include "../src/DefaultMQProducer.h" #include "../src/DefaultMQPushConsumer.h" #include "../src/DefaultMQSimpleConsumer.h" #include "../src/MessageQueue.h" #include "../src/SendResult.h" #include "../src/TransactionResolution.h" #include "../src/MQAdminExtImpl.h" #include "../src/MQClientImpl.h" #include "../src/MQProducerImpl.h" #include "../src/MQConsumerImpl.h" #include "../src/DefaultMQProducerImpl.h" #include "../src/DefaultMQPushConsumerImpl.h" #include "../src/DefaultMQSimpleConsumerImpl.h" #include "../src/MessageQueueImpl.h" #include "../src/SendResultImpl.h" #include "../src/TransactionResolutionImpl.h" #include #include #include #include #include using namespace std; using namespace rocketmq; int main() { // Please refer to https://github.com/apache/rocketmq-clients/blob/master/cpp/examples/ExampleProducerWithAsync.cpp return 0; } ``` -------------------------------- ### Initialize Database Environments with Docker Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/06RocketMQ Connect In Action3.md Commands to spin up PostgreSQL and MySQL instances using Docker containers for testing. ```bash docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=start_data_engineer -e POSTGRES_PASSWORD=password debezium/postgres:14 docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9 ``` -------------------------------- ### Configure and Start RocketMQ Connect Runtime Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/04RocketMQ Connect In Action1.md Steps to build the runtime, configure the standalone worker, and launch the service. ```bash cd rocketmq-connect mvn -Prelease-connect -DskipTests clean install -U # After editing conf/connect-standalone.conf sh bin/connect-standalone.sh -c conf/connect-standalone.conf & ``` -------------------------------- ### Start JDBC Sink Connector Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/10-connect/06RocketMQ Connect In Action3.md This endpoint allows you to start a JDBC sink connector. It consumes data from a specified RocketMQ topic and writes it to a target table via JDBC. The example configures a MySQL sink connector. ```APIDOC ## POST /connectors/ ### Description Starts a JDBC sink connector to synchronize data from a RocketMQ topic to a target database table. ### Method POST ### Endpoint /connectors/ ### Parameters #### Path Parameters - **connector_name** (string) - Required - The name of the connector to be created. #### Request Body - **connector.class** (string) - Required - The connector class, e.g., `org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector`. - **max.task** (integer) - Optional - The maximum number of tasks for the connector. - **connect.topicnames** (string) - Required - The name of the RocketMQ topic to consume data from. - **connection.url** (string) - Required - The JDBC connection URL for the target database. - **connection.user** (string) - Required - The username for the database connection. - **connection.password** (string) - Required - The password for the database connection. - **pk.fields** (string) - Optional - Specifies the primary key fields. - **table.name.from.header** (boolean) - Optional - If true, the table name is derived from the message header. - **pk.mode** (string) - Optional - Defines the primary key mode (e.g., `record_key`). - **insert.mode** (string) - Optional - Defines the insert mode (e.g., `UPSERT`). - **db.timezone** (string) - Optional - The database timezone. - **table.types** (string) - Optional - The types of tables to include (e.g., `TABLE`). - **errors.deadletterqueue.topic.name** (string) - Optional - The name of the dead-letter queue topic. - **errors.log.enable** (boolean) - Optional - If true, errors will be logged. - **errors.tolerance** (string) - Optional - Error tolerance strategy (e.g., `ALL`). - **delete.enabled** (boolean) - Optional - If true, delete operations are enabled. - **key.converter** (string) - Optional - The converter class for keys (e.g., `org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter`). - **value.converter** (string) - Optional - The converter class for values (e.g., `org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter`). ### Request Example ```json { "connector.class": "org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConnector", "max.task": "2", "connect.topicnames": "debezium-source-topic1000", "connection.url": "jdbc:mysql://database ip:3306/bank1", "connection.user": "root", "connection.password": "debezium", "pk.fields": "id", "table.name.from.header": "true", "pk.mode": "record_key", "insert.mode": "UPSERT", "db.timezone": "UTC", "table.types": "TABLE", "errors.deadletterqueue.topic.name": "dlq-topic", "errors.log.enable": "true", "errors.tolerance": "ALL", "delete.enabled": "true", "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" } ``` ### Response #### Success Response (200) - **status** (string) - Indicates the status of the connector creation. #### Response Example ```json { "status": "success" } ``` ``` -------------------------------- ### Deploy Apache RocketMQ Locally Source: https://context7.com/apache/rocketmq-site/llms.txt Instructions for downloading, compiling, and starting the NameServer and Broker/Proxy components on a local environment using shell commands. ```bash unzip rocketmq-all-5.3.2-source-release.zip cd rocketmq-all-5.3.2-source-release/ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U cd distribution/target/rocketmq-5.3.2/rocketmq-5.3.2 nohup sh bin/mqnamesrv & nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & export NAMESRV_ADDR=localhost:9876 ``` -------------------------------- ### RocketMQ Service Startup Sequence Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/06-bestPractice/03access.md Commands to start the NameServer, Broker, and Proxy services in the correct order. Ensure NameServer is running before Broker and Proxy. ```bash # 1. 启动NameServer nohup sh bin/mqnamesrv & # 2. 启动Broker(存储节点) nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf & # 3. 启动Proxy(计算节点) nohup sh bin/mqproxy -n localhost:9876 -pc conf/rmq-proxy.json & ``` -------------------------------- ### FIFO Consumer: Consume Messages in Order (Bash & Java) Source: https://context7.com/apache/rocketmq-site/llms.txt Explains how to consume messages in the order they were stored using a FIFO (First-In, First-Out) consumer group. It includes a bash command to create the necessary consumer group and a Java example demonstrating the PushConsumer setup with a message listener that processes messages sequentially. This is crucial for applications where message order is critical. ```bash # 创建顺序消费组 sh bin/mqadmin updateSubGroup -c DefaultCluster -g FIFOConsumerGroup -n localhost:9876 -o true ``` ```java import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.*; import java.util.Collections; public class FIFOConsumerExample { public static void main(String[] args) throws ClientException, InterruptedException { String endpoint = "localhost:8081"; String topic = "FIFOTopic"; String consumerGroup = "FIFOConsumerGroup"; // 必须是顺序消费组 ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration configuration = ClientConfiguration.newBuilder() .setEndpoints(endpoint) .build(); FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG); // 顺序消费时,PushConsumer 会按消息存储顺序逐条投递 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(configuration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // 顺序消费:同一消息组的消息会按顺序到达 System.out.printf("[%s] Received: %s%n", messageView.getMessageGroup().orElse("N/A"), new String(messageView.getBody())); // 重要:顺序消息必须同步处理,不能异步分发 return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); pushConsumer.close(); } } ``` -------------------------------- ### Start RocketMQ Broker and Proxy (Windows) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/02-quickStart/02quickstartWithDocker.md Starts the RocketMQ Broker and Proxy within a Docker container on a Windows system. This command is similar to the Linux version but uses Windows-specific syntax for paths and line continuation. It configures the broker's IP, maps ports, sets the NameServer address, and mounts a local configuration file. Includes a command to verify the Broker's startup. ```shell # Configure the broker's IP address echo "brokerIP1=127.0.0.1" > broker.conf # Start the Broker and Proxy docker run -d ^ --name rmqbroker ^ --net rocketmq ^ -p 10912:10912 -p 10911:10911 -p 10909:10909 ^ -p 8080:8080 -p 8081:8081 \ -e "NAMESRV_ADDR=rmqnamesrv:9876" ^ -v %cd%\broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf ^ apache/rocketmq:5.3.2 sh mqbroker --enable-proxy \ -c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf # Verify if Broker started successfully docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log" ``` -------------------------------- ### Install Helm Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/02-quickStart/04quickstartWithHelmInKubernetes.md Installs Helm version 3.7.0+ on the system. This is a prerequisite for using Helm charts to deploy applications. ```bash helm version curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash ``` -------------------------------- ### Implement RocketMQ Producer Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/02-quickStart/03quickstartWithDockercompose.md A Java example demonstrating how to initialize a producer, construct a message with keys and tags, and send it to a specific topic. ```java import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProducerExample { private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class); public static void main(String[] args) throws ClientException { String endpoint = "localhost:8081"; String topic = "TestTopic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey") .setTag("messageTag") .setBody("messageBody".getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message", e); } } } ``` -------------------------------- ### Correct Consumer Initialization and Usage in Java Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/03-domainModel/08consumer.md Demonstrates the recommended way to initialize, use, and shut down a consumer in a single process to avoid performance degradation. This pattern ensures efficient resource utilization by reusing the consumer instance. ```java Consumer c = ConsumerBuilder.build(); for (int i =0;i broker.conf # Start the Broker and Proxy docker run -d \ --name rmqbroker \ --network rocketmq \ -p 10912:10912 -p 10911:10911 -p 10909:10909 \ -p 8080:8080 -p 8081:8081 \ -e "NAMESRV_ADDR=rmqnamesrv:9876" \ # In PowerShell, replace %cd% with $pwd -v ./broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf \ apache/rocketmq:5.3.2 sh mqbroker --enable-proxy \ -c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf # Verify if Broker started successfully docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log" ``` -------------------------------- ### Send and receive messages with tools Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/02-quickStart/01quickstart.md Set the NAMESRV_ADDR environment variable and run the built-in producer and consumer examples. ```shell $ export NAMESRV_ADDR=localhost:9876 $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... $ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt... ``` -------------------------------- ### Start RocketMQ Cluster with Docker Compose Source: https://github.com/apache/rocketmq-site/blob/new-official-website/i18n/en/docusaurus-plugin-content-docs/version-5.0/02-quickStart/03quickstartWithDockercompose.md Commands to start the RocketMQ cluster using Docker Compose. The commands differ slightly for Linux and Windows environments. ```bash docker-compose up -d ``` ```bash docker-compose -p rockermq_project up -d ``` -------------------------------- ### 启动 Broker+Proxy (Local模式单副本) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/05-deploymentOperations/01deploy.md 在 Local 模式下,启动同时包含 Broker 和 Proxy 的单副本节点。此配置风险较高,不建议在线上环境使用,仅适用于本地测试。命令中启用了 Proxy 功能。 ```bash nohup sh bin/mqbroker -n localhost:9876 --enable-proxy & tail -f ~/logs/rocketmqlogs/broker_default.log ``` -------------------------------- ### Start RocketMQ Broker with DLedger Configuration Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/06-bestPractice/02dledger.md Starts individual RocketMQ brokers using specified DLedger configuration files. This is part of setting up a new DLedger cluster. ```shell nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf & nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf & nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf & ``` -------------------------------- ### Configure and Start File Source Connector Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/03RocketMQ Connect Quick Start.md Create a source file and use the REST API to register a FileSourceConnector, which reads data from a local file and publishes it to a RocketMQ topic. ```shell mkdir -p /Users/YourUsername/rocketmqconnect/ touch /Users/YourUsername/rocketmqconnect/test-source-file.txt echo "Hello \r\nRocketMQ\r\n Connect" >> /Users/YourUsername/rocketmqconnect/test-source-file.txt curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{ "connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector", "filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt", "connect.topicname": "fileTopic" }' ``` -------------------------------- ### Start RocketMQ Connect Standalone Worker (Shell) Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/08RocketMQ Connect In Action5-ES.md This command starts the RocketMQ Connect worker in standalone mode using the specified configuration file. The '&' symbol runs the process in the background. ```shell sh bin/connect-standalone.sh -c conf/connect-standalone.conf & ``` -------------------------------- ### Start SFTP Sink Connector Source: https://github.com/apache/rocketmq-site/blob/new-official-website/versioned_docs/version-5.0/10-connect/07RocketMQ Connect In Action4.md This section describes how to start the SFTP sink connector using a curl command. The connector consumes data from a RocketMQ topic and writes it to a specified file via SFTP. ```APIDOC ## POST /connectors/SftpSinkConnector ### Description Starts the SFTP sink connector, which subscribes to RocketMQ Topic data, converts each message to a line of text, and writes it to a sink.txt file using the SFTP protocol. ### Method POST ### Endpoint http://localhost:8082/connectors/SftpSinkConnector ### Parameters #### Request Body - **connector.class** (string) - Required - The connector class name, e.g., "org.apache.rocketmq.connect.http.sink.SftpSinkConnector". - **host** (string) - Required - The SFTP server host. - **port** (integer) - Required - The SFTP server port. - **username** (string) - Required - The SFTP username. - **password** (string) - Required - The SFTP password. - **filePath** (string) - Required - The full path to the sink file on the SFTP server. - **connect.topicnames** (string) - Required - The RocketMQ topic name to subscribe to. - **fieldSeparator** (string) - Optional - The separator used for fields in the data. - **fieldSchema** (string) - Optional - The schema defining the fields and their order. ### Request Example ```json { "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector", "host": "127.0.0.1", "port": 22, "username": "YourUsername", "password": "yourPassword", "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt", "connect.topicnames": "sftpTopic", "fieldSeparator": "|", "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" } ``` ### Response #### Success Response (200) - **status** (integer) - Indicates the status of the request, 200 for success. - **body** (object) - Contains details about the created connector. #### Response Example ```json { "status": 200, "body": { "connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector", "host": "127.0.0.1", "port": 22, "username": "YourUsername", "password": "yourPassword", "filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt", "connect.topicnames": "sftpTopic", "fieldSeparator": "|", "fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit" } } ``` ```