### Run Spring Boot Application Source: https://github.com/sonus21/rqueue/blob/master/rqueue-spring-boot-nats-example/README.md Builds and runs the Rqueue Spring Boot NATS example application using Gradle. ```sh ./gradlew :rqueue-spring-boot-nats-example:bootRun ``` -------------------------------- ### Start NATS Server with JetStream Source: https://github.com/sonus21/rqueue/blob/master/rqueue-spring-boot-nats-example/README.md Starts a NATS server with JetStream enabled, which is required for Rqueue's NATS backend. This can be done using the native binary or Docker. ```sh # native binary nats-server -js # docker docker run -p 4222:4222 nats:latest -js ``` -------------------------------- ### Disable Auto-Startup for RQueue Container Source: https://github.com/sonus21/rqueue/wiki/Configuration Set `autoStartup` to false to manually control the RQueue container's start and stop lifecycle. The application must explicitly call start(), stop(), and destroy() methods. ```java class RqueueConfiguration { @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); //... factory.setAutoStartup(false); return factory; } } ``` -------------------------------- ### Register Queues on Bootstrap Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/producer-consumer.md Register queues when the Rqueue bootstrap process is complete to avoid 'QueueDoesNotExist' errors. This example demonstrates listening for RqueueBootstrapEvent and registering queues. ```java import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import com.github.sonus21.rqueue.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.manager.RqueueEndpointManager; @Component class AppMessageSender implements ApplicationListener { @Autowired private RqueueEndpointManager rqueueEndpointManager; @Override public void onApplicationEvent(RqueueBootstrapEvent event) { if (!event.isStartup()) { return; } for (String queue : queues) { String[] priorities = getPriority(queue); rqueueEndpointManager.registerQueue(queue, priorities); } } private String[] getPriority(String queue) { return new String[]{}; } } ``` -------------------------------- ### Configure Logging Middleware in RQueue Source: https://github.com/sonus21/rqueue/wiki/Middleware Add a `LoggingMiddleware` to the `SimpleRqueueListenerContainerFactory` to log job details before processing. This is a common setup for monitoring and debugging. ```java public class RqueueConfiguration { @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); factory.useMiddleware(new LoggingMiddleware()); // add other middlewares here return factory; } } ``` -------------------------------- ### Manual Control of RQueue Container Lifecycle Source: https://github.com/sonus21/rqueue/wiki/Configuration Implement methods to manually start, stop, and destroy the RQueue message listener container when auto-startup is disabled. ```java public class BootstrapController { @Autowired private RqueueMessageListenerContainer rqueueMessageListenerContainer; // ... public void start() { // ... rqueueMessageListenerContainer.start(); } public void stop() { // ... rqueueMessageListenerContainer.stop(); } public void destroy() { // ... rqueueMessageListenerContainer.destroy(); } //... ``` -------------------------------- ### Start NATS Server with JetStream Enabled Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Run the NATS server with the JetStream enabled flag. ```sh # native binary nats-server -js # Docker docker run -p 4222:4222 nats:latest -js ``` -------------------------------- ### Manual Rqueue Container Lifecycle Management Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/configuration.md Manually start, stop, and destroy the Rqueue message listener container when auto-startup is disabled. ```java public class BootstrapController { @Autowired private RqueueMessageListenerContainer rqueueMessageListenerContainer; // ... public void start() { // ... rqueueMessageListenerContainer.start(); } public void stop() { // ... rqueueMessageListenerContainer.stop(); } public void destroy() { // ... rqueueMessageListenerContainer.destroy(); } //... } ``` -------------------------------- ### Configure Redis Cluster Connection Factory Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/redis-configuration.md Configure Rqueue to use a Redis Cluster with the Lettuce client, which supports essential `EVALSHA` requests. This setup is recommended for clustered environments and requires specifying all cluster nodes. Ensure `readFrom` is set to `MASTER_PREFERRED`. ```java @Configuration public class RqueueConfiguration { // this property must be set to true if you're using webflux or reactive redis @Value("${rqueue.reactive.enabled:false}") private boolean reactiveEnabled; @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { // here always use MASTER_PREFERRED otherwise it will to start LettuceClientConfiguration lettuceClientConfiguration = LettuceClientConfiguration.builder().readFrom(ReadFrom.MASTER_PREFERRED).build(); // add all nodes RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(); List redisNodes = new ArrayList<>(); redisNodes.add(new RedisNode("127.0.0.1", 9000)); redisNodes.add(new RedisNode("127.0.0.1", 9001)); redisNodes.add(new RedisNode("127.0.0.1", 9002)); redisNodes.add(new RedisNode("127.0.0.1", 9003)); redisNodes.add(new RedisNode("127.0.0.1", 9004)); redisNodes.add(new RedisNode("127.0.0.1", 9005)); redisClusterConfiguration.setClusterNodes(redisNodes); // create lettuce connection factory LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, lettuceClientConfiguration); lettuceConnectionFactory.afterPropertiesSet(); SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory = new SimpleRqueueListenerContainerFactory(); simpleRqueueListenerContainerFactory.setRedisConnectionFactory(lettuceConnectionFactory); // set polling interval, by default its 5 seconds simpleRqueueListenerContainerFactory.setPollingInterval(Constants.ONE_MILLI); // if reactive redis is enabled set the correct connection factory if (reactiveEnabled) { simpleRqueueListenerContainerFactory.setReactiveRedisConnectionFactory( lettuceConnectionFactory); } // set any other property if you need // return connection factory return simpleRqueueListenerContainerFactory; } } ``` -------------------------------- ### Implement Rate Limiting Middleware Source: https://github.com/sonus21/rqueue/wiki/FAQ Create a custom middleware implementing RateLimiterMiddleware to control message processing rates. This example uses Guava's RateLimiter and checks queue names or message types for throttling. ```java class MyRateLimiter implements RateLimiterMiddleware { // Guava rate limiter, you can use any other rate limiter final RateLimiter rateLimiter; TestRateLimiter(RateLimiter rateLimiter) { this.rateLimiter = rateLimiter; } @Override public boolean isThrottled(Job job) { // here you can check queue and any other details for rate limiting RqueueMessage rqueueMessage = job.getRqueueMessage(); // check for rate-limited-queue if (rqueueMessage.getQueueName().equals("rate-limited-queue")) { return rateLimiter.tryAcquire(); } // checking message object type, rate limiting is enabled for RateLimitedMessage Object message = job.getMessage(); if (message instanceof RateLimitedMessage) { return rateLimiter.tryAcquire(); } return true; } } ``` -------------------------------- ### Configure Scheduler Auto Start Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/producer-consumer.md If set to 'false', the scheduler will only use Redis events for message movement instead of dedicated polling threads. ```properties rqueue.scheduler.auto.start=true ``` -------------------------------- ### Configure RQueue Listener with Job Check-in Source: https://github.com/sonus21/rqueue/blob/master/README.md Example of a listener for a weekly chat indexing job that also performs a check-in operation on the job. It requires injecting the Rqueue Job object. ```java // checkin job example @RqueueListener(value = "chat-indexing-weekly", priority = "5", priorityGroup = "chat") public void onMessage(ChatIndexing chatIndexing, @Header(RqueueMessageHeaders.JOB) com.github.sonus21.rqueue.core.Job job) { log.info("ChatIndexing message: {}", chatIndexing); job.checkIn("Chat indexing..."); } } ``` -------------------------------- ### Configure RQueue Listener for Simple Messages Source: https://github.com/sonus21/rqueue/blob/master/README.md Use @RqueueListener to mark a method as a listener for a simple queue. This example shows a basic message handler. ```java import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Autowired; import com.github.sonus21.rqueue.annotation.RqueueListener; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class MessageListener { @RqueueListener(value = "simple-queue") public void simpleMessage(String message) { log.info("simple-queue: {}", message); } ``` -------------------------------- ### Three-Replica Production NATS Stream Setup Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Configure NATS JetStream stream properties for a production environment with three replicas, file storage, and a 7-day message age limit. ```properties rqueue.nats.stream.replicas=3 rqueue.nats.stream.storage=FILE rqueue.nats.stream.max-age=7d ``` -------------------------------- ### Create Rqueue-Queue-Config KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-queue-config' KV bucket with specified replicas and storage. This bucket stores persistent queue configuration. ```bash nats kv add rqueue-queue-config --replicas=3 --storage=file ``` -------------------------------- ### List All Rqueue KV Buckets Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Use this NATS CLI command to list all KV buckets created by Rqueue, identified by the 'rqueue-' prefix. This helps in verifying bucket creation. ```sh nats kv ls | grep rqueue- ``` -------------------------------- ### Configure Rqueue with Standalone or Clustered Redis Connection Factory Source: https://github.com/sonus21/rqueue/wiki/Redis-Configuration This snippet shows how to set the Redis connection factory for Rqueue's listener container. It demonstrates creating configurations for both standalone and clustered Redis instances and initializing a Lettuce connection factory. ```java class RqueueConfiguration { @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); // StandAlone redis can use Jedis connection factory but for clustered redis, it's required to use Lettuce // because EVALSHA will not work with Jedis. // Stand alone redis configuration, Set fields of redis configuration RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration(); // Redis cluster, set required fields RedisClusterConfiguration redisConfiguration = new RedisClusterConfiguration(); // Create lettuce connection factory LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory( redisConfiguration); redisConnectionFactory.afterPropertiesset(); factory.setRedisConnectionFactory(redisConnectionFactory); return factory; } } ``` -------------------------------- ### Pre-create NATS Streams for Rqueue Source: https://github.com/sonus21/rqueue/blob/master/README.md Use these `nats stream add` commands to pre-create streams when `rqueue.nats.autoCreateStreams` is set to false. This is necessary for deployments where application credentials lack the permission to add streams at runtime. ```sh nats stream add rqueue-js-orders --subjects rqueue.js.orders ... nats stream add rqueue-js-orders-high --subjects rqueue.js.orders.high ... nats stream add rqueue-js-orders-low --subjects rqueue.js.orders.low ... nats stream add rqueue-js-orders-dlq --subjects rqueue.js.orders.dlq ... ``` -------------------------------- ### Enqueue Job Object Source: https://github.com/sonus21/rqueue/blob/master/rqueue-spring-boot-nats-example/README.md Enqueues a Job object to the 'job-queue'. This example also shows how a Dead Letter Queue (DLQ) can be wired to 'job-morgue'. ```sh # enqueue a Job object to job-queue (with DLQ wired to job-morgue) curl http://localhost:8080/job ``` -------------------------------- ### Create Rqueue-Message-Metadata KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-message-metadata' KV bucket with specified replicas and storage. This bucket is used for message metadata. ```bash nats kv add rqueue-message-metadata --replicas=3 --storage=file ``` -------------------------------- ### Configure Auto-Create NATS KV Buckets Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Controls whether rqueue should automatically create NATS KV buckets if they do not exist. The default is true. ```properties rqueue.nats.autoCreateKvBuckets=true ``` -------------------------------- ### Get Pending and Scheduled Message Count Source: https://github.com/sonus21/rqueue/wiki/FAQ Retrieves the total count of pending and scheduled messages for a given queue. This helps approximate the job position in the queue. ```java class TestJobPosition { @Autowired private RqueueQueueMetrics rqueueQueueMetrics; public long getTestQueueSize() { // not considering processing queue as they are currently being processed return rqueueQueueMetrics.getPendingMessageCount("test-queue") + rqueueQueueMetrics .getScheduledMessageCount("test-queue"); } } ``` -------------------------------- ### Add Rqueue Spring Boot Starter Dependency (Maven) Source: https://github.com/sonus21/rqueue/wiki/Home Add the Rqueue Spring Boot starter dependency to your Maven project to integrate Rqueue. ```xml com.github.sonus21 rqueue-spring-boot-starter 2.9.0-RELEASE ``` -------------------------------- ### Get Rqueue Queue Configuration from KV Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Retrieve the queue configuration stored in the 'rqueue-queue-config' KV bucket using this NATS CLI command. This allows inspection of registered queue settings. ```sh nats kv get rqueue-queue-config orders ``` -------------------------------- ### Run Core, Redis, and NATS Unit Tests Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Execute unit tests for the rqueue-core, rqueue-redis, and rqueue-nats modules, filtered by the 'unit' tag. ```bash ./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-nats:test -DincludeTags=unit ``` -------------------------------- ### Generic Envelope Type with Rqueue Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/configuration.md Example of using a generic envelope type like `Event` for enqueuing and consuming messages with Rqueue. The serialized form includes envelope and type parameter information. ```java // A generic envelope type public class Event { private String id; private T payload; // getters/setters ... } // Enqueue Event event = new Event<>("evt-123", order); rqueueMessageEnqueuer.enqueue("order-queue", event); // Consume @RqueueListener(value = "order-queue") public void onEvent(Event event) { ... } ``` -------------------------------- ### Run Unit Tests Source: https://github.com/sonus21/rqueue/blob/master/nats-task-v2.md Execute unit tests for core, redis, web, and nats modules. Include tests tagged with 'unit'. ```bash ./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-web:test :rqueue-nats:test -DincludeTags=unit ``` -------------------------------- ### Spring Boot Starter Dependency (Maven) Source: https://github.com/sonus21/rqueue/blob/master/docs/index.md Add this dependency to your Maven pom.xml to use Rqueue with Spring Framework. ```xml com.github.sonus21 rqueue-spring 4.0.0-RELEASE ``` -------------------------------- ### Get Rqueue Message Metadata from KV Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Retrieve message metadata, such as delivery status and retry count, from the 'rqueue-message-metadata' KV bucket using this NATS CLI command. Replace '' with the actual message ID. ```sh nats kv get rqueue-message-metadata ``` -------------------------------- ### Configure RQueue Listener with Priority Group and Level Source: https://github.com/sonus21/rqueue/blob/master/README.md Set up a listener for chat indexing messages, specifying a priority group and a specific priority level. ```java @RqueueListener(value = "chat-indexing", priority = "20", priorityGroup = "chat") public void onMessage(ChatIndexing chatIndexing) { log.info("ChatIndexing message: {}", chatIndexing); } ``` -------------------------------- ### Define Listener for Dead Letter Queue Source: https://github.com/sonus21/rqueue/blob/master/docs/faq.md Configure a separate listener for the dead letter queue by specifying its name and enabling the listener. This example shows how to handle messages from both the main queue and its corresponding dead letter queue. ```java @Component class ReservationRequestMessageConsumer { @RqueueListener( value = "reservation.request.queue", deadLetterQueue = "reservation.request.dead.letter.queue.name", deadLetterQueueListenerEnabled = "true", numRetries = "3") public void onMessageReservationRequest(ReservationRequest request) throws Exception { // Handle messages from main queue } @RqueueListener(value = "reservation.request.dead.letter.queue", numRetries = "1") public void onMessageReservationRequestDeadLetterQueue( ReservationRequest request, @Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) throws Exception { // Handle messages from dead letter queue } } ``` -------------------------------- ### Configure Static Resource Handler with Path Prefix Source: https://github.com/sonus21/rqueue/wiki/Dashboard Configures the static resource handler to accommodate a URL path prefix, essential when Rqueue is deployed behind a proxy or application server with a custom base path. It uses the 'rqueue.web.url.prefix' property. ```java public class MvcConfig implements WebMvcConfigurer { @Value("${rqueue.web.url.prefix:}") private String rqueueWebUrlPrefix; @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { if (!StringUtils.isEmpty(rqueueWebUrlPrefix)) { registry .addResourceHandler(rqueueWebUrlPrefix + "/**") .addResourceLocations("classpath:/public/"); } else if (!registry.hasMappingForPattern("/**")) { registry.addResourceHandler("/**").addResourceLocations("classpath:/public/"); } } } ``` -------------------------------- ### Add Rqueue Spring Boot Starter Dependency (Gradle) Source: https://github.com/sonus21/rqueue/wiki/Home Add the Rqueue Spring Boot starter dependency to your Gradle project to integrate Rqueue. ```groovy implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.9.0-RELEASE' ``` -------------------------------- ### Run NATS Rqueue System Config DAO Integration Test Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Execute integration tests for the NATS Rqueue System Configuration DAO. ```bash ./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.dao.NatsRqueueSystemConfigDaoIT" ``` -------------------------------- ### Create Rqueue-Jobs KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-jobs' KV bucket with specified replicas, storage, and TTL. This bucket stores job history. ```bash nats kv add rqueue-jobs --replicas=3 --storage=file --ttl=7d ``` -------------------------------- ### Configure RQueue Listener for Push Notifications Source: https://github.com/sonus21/rqueue/blob/master/README.md Set up a listener for push notifications with retry and dead-letter queue configurations. ```java @RqueueListener(value = "push-notification-queue", numRetries = "3", deadLetterQueue = "failed-notification-queue") public void onMessage(Notification notification) { log.info("Push notification: {}", notification); } ``` -------------------------------- ### Configure Static Resource Handler Source: https://github.com/sonus21/rqueue/wiki/Dashboard Adds a resource handler to serve static resources for the Rqueue dashboard. Ensure this is configured if Rqueue endpoints are not directly accessible. ```java public class MvcConfig implements WebMvcConfigurer { @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { //... if (!registry.hasMappingForPattern("/**")) { registry.addResourceHandler("/**").addResourceLocations("classpath:/public/"); } } } ``` -------------------------------- ### Pre-create NATS KV Buckets Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Manually create NATS KV buckets with appropriate TTLs to store Rqueue's persistent state, job history, locks, and worker registry information. Match TTL values to your Rqueue settings. ```sh # Persistent state — no TTL nats kv add rqueue-queue-config --replicas=3 --storage=file nats kv add rqueue-message-metadata --replicas=3 --storage=file # Job history — match rqueue.message.durability (default 7 days) nats kv add rqueue-jobs --replicas=3 --storage=file --ttl=7d # Locks — cover your longest expected lock hold nats kv add rqueue-locks --replicas=3 --storage=file --ttl=10m # Worker registry — match rqueue.worker.registry.worker.ttl (default 300 s) nats kv add rqueue-workers --replicas=3 --storage=file --ttl=5m # Queue heartbeats — match rqueue.worker.registry.queue.ttl (default 3600 s) nats kv add rqueue-worker-heartbeats --replicas=3 --storage=file --ttl=1h ``` -------------------------------- ### Check RQueue Queue Sizes (1.x to 2.x Migration) Source: https://github.com/sonus21/rqueue/wiki/Migrations Use these Redis commands to check the current state of your queues when migrating from RQueue 1.x to 2.x. Ensure all queues are empty before proceeding if you are not using version 2.x for existing data. ```redis LLEN ``` ```redis ZCARD rqueue-delay:: ``` ```redis ZCARD rqueue-processing:: ``` -------------------------------- ### Configure Redis Sentinel Connection Factory Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/redis-configuration.md Sets up a Lettuce connection factory for Redis Sentinel, prioritizing master connections. This is used to create the Rqueue listener container factory. ```java import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.LettuceClientConfiguration; import org.springframework.data.redis.connection.RedisSentinelConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import com.github.sonus21.rqueue.core.SimpleRqueueListenerContainerFactory; import com.github.sonus21.rqueue.utils.Constants; @Configuration public class RedisClusterBaseApplication { private boolean reactiveEnabled = false; // Assuming this is set elsewhere @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { LettuceClientConfiguration lettuceClientConfiguration = LettuceClientConfiguration.builder().readFrom(ReadFrom.MASTER_PREFERRED).build(); // Sentinel redis configuration, Set fields of redis configuration RedisSentinelConfiguration redisConfiguration = new RedisSentinelConfiguration(); // Create lettuce connection factory LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration, lettuceClientConfiguration); lettuceConnectionFactory.afterPropertiesSet(); SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory = new SimpleRqueueListenerContainerFactory(); simpleRqueueListenerContainerFactory.setRedisConnectionFactory(lettuceConnectionFactory); simpleRqueueListenerContainerFactory.setPollingInterval(Constants.ONE_MILLI); if (reactiveEnabled) { simpleRqueueListenerContainerFactory.setReactiveRedisConnectionFactory( lettuceConnectionFactory); } return simpleRqueueListenerContainerFactory; } } ``` -------------------------------- ### Listening to Weekly Chat Indexing with Job Check-in Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for weekly chat indexing messages that requires job check-in. ```java @RqueueListener(value = "chat-indexing-weekly", priority = "5", priorityGroup = "chat") public void onMessage(ChatIndexing chatIndexing, @Header(RqueueMessageHeaders.JOB) com.github.sonus21.rqueue.core.Job job) { log.info("ChatIndexing message: {}", chatIndexing); job.checkIn("Chat indexing..."); } ``` -------------------------------- ### Enable Rqueue Scheduler Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/producer-consumer.md Globally enable or disable the Rqueue scheduler using this property. ```properties rqueue.scheduler.enabled=true ``` -------------------------------- ### Listening to Jobs with Retries and DLQ Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for jobs with retry settings and a dead-letter queue. ```java @RqueueListener(value = "job-queue", numRetries = "3", deadLetterQueue = "failed-job-queue", concurrency = "5-10") public void onMessage(Job job) { log.info("Job alert: {}", job); } ``` -------------------------------- ### Configure Rqueue Metrics Prefix Source: https://github.com/sonus21/rqueue/blob/master/docs/monitoring.md Set a custom prefix for Rqueue metrics to organize them within your monitoring system. This helps avoid naming collisions with other components. ```properties rqueue.metrics.prefix=rq. ``` -------------------------------- ### Enqueueing a Simple Message Source: https://github.com/sonus21/rqueue/wiki/Message-Deduplication This snippet shows how to enqueue a simple message and record its enqueue time using a `MessageRepository`. It uses `RqueueMessageEnqueuer` to add the message to a queue. ```java interface MessageRepository { Long getLatestEnqueueAt(String messageId); void addEnqueueAt(String messageId, Long time); } class SimpleMessage { private String id; } class MessageSender { @Autowited private MessageRepository messageRepository; @Autowired private RqueueMessageEnqueuer rqueueMessageEnqueuer; public void sendMessage(SimpleMessage message) { String id = message.getId(); //TODO handle error case messageRepository.addEnqueueAt(id, System.currentTimeMillis()); rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10)); } } ``` -------------------------------- ### Pre-create NATS Streams for a Queue and its DLQ Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Use these NATS CLI commands to manually create streams for a queue and its corresponding dead-letter queue. Ensure stream names and subjects align with your Rqueue configuration. ```sh nats stream add rqueue-js-orders \ --subjects "rqueue.js.orders" \ --storage file --replicas 3 --retention limits nats stream add rqueue-js-orders-dlq \ --subjects "rqueue.js.orders.dlq" \ --storage file --replicas 3 --retention limits ``` -------------------------------- ### Configure Redis DB Version via Application Property Source: https://github.com/sonus21/rqueue/blob/master/docs/migrations.md Add this property to your application configuration when upgrading from Rqueue 1.x to 2.x. This helps Rqueue correctly interpret existing queue data. This is not required if your queues are empty. ```properties rqueue.db.version=1 ``` -------------------------------- ### List All Rqueue Streams Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/nats-configuration.md Use this NATS CLI command to list all streams managed by Rqueue, identified by the 'rqueue-js-' prefix. This helps in verifying stream creation. ```sh nats stream ls | grep rqueue-js- ``` -------------------------------- ### Configure Rqueue Listener Container Factory Source: https://github.com/sonus21/rqueue/blob/master/docs/configuration/configuration.md This snippet shows the basic structure for configuring the SimpleRqueueListenerContainerFactory bean in your Rqueue application. ```java @Configuration public class RqueueConfiguration { @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { // return SimpleRqueueListenerContainerFactory object } } ``` -------------------------------- ### Configure Pre Execution Message Processor Source: https://github.com/sonus21/rqueue/wiki/Callback-Events Set up a pre-execution message processor to intercept messages before they are handled by the message listener. If the processor returns false, the message handler will not be invoked. ```java class RqueueConfiguration { private MessageProcessor preExecutorMessageProcessor() { // return message processor object } @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); MessageProcessor preExecutorMessageProcessor = preExecutorMessageProcessor(); factory.setPreExecutionMessageProcessor(preExecutorMessageProcessor); return factory; } } ``` -------------------------------- ### Enqueueing a Simple Message (Alternative Scenario) Source: https://github.com/sonus21/rqueue/wiki/Message-Deduplication This snippet demonstrates enqueueing a message without immediately recording its enqueue time in the repository. The `MessageSender` relies on the `RqueueMessageEnqueuer` for scheduling. ```java interface MessageRepository { Long getEnqueueAt(String messageId); void saveEnqueueAt(String messageId, Long time); } class MessageSender { @Autowired private RqueueMessageEnqueuer rqueueMessageEnqueuer; public void sendMessage(SimpleMessage message) { rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10)); } } ``` -------------------------------- ### Run NATS Backend End-to-End Integration Tests Source: https://github.com/sonus21/rqueue/blob/master/nats-task-v2.md Execute end-to-end integration tests specifically for the NATS backend within the Spring Boot starter. ```bash ./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT" ``` -------------------------------- ### Run NATS Rqueue Job DAO Integration Test Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Execute integration tests for the NATS Rqueue Job DAO. ```bash ./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.dao.NatsRqueueJobDaoIT" ``` -------------------------------- ### Spring Application Configuration Source: https://github.com/sonus21/rqueue/blob/master/docs/index.md Enable Rqueue and define a RedisConnectionFactory bean in your Spring configuration class. ```java @EnableRqueue public class Application { @Bean public RedisConnectionFactory redisConnectionFactory() { // return a Redis connection factory } } ``` -------------------------------- ### Listening to Chat Indexing with Priority Group Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for chat indexing messages with a specific priority group. ```java @RqueueListener(value = "chat-indexing", priority = "20", priorityGroup = "chat") public void onMessage(ChatIndexing chatIndexing) { log.info("ChatIndexing message: {}", chatIndexing); } ``` -------------------------------- ### Create Rqueue-Worker-Heartbeats KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-worker-heartbeats' KV bucket with specified replicas, storage, and TTL. This bucket stores worker heartbeat information. ```bash nats kv add rqueue-worker-heartbeats --replicas=3 --storage=file --ttl=10m ``` -------------------------------- ### Listening to Notifications with Retries and DLQ Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for notifications with retry settings and a dead-letter queue. ```java @RqueueListener(value = "push-notification-queue", numRetries = "3", deadLetterQueue = "failed-notification-queue") public void onMessage(Notification notification) { log.info("Push notification: {}", notification); } ``` -------------------------------- ### Listening to Daily Chat Indexing with Priority Group Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for daily chat indexing messages with a specific priority group. ```java @RqueueListener(value = "chat-indexing-daily", priority = "10", priorityGroup = "chat") public void onMessage(ChatIndexing chatIndexing) { log.info("ChatIndexing message: {}", chatIndexing); } ``` -------------------------------- ### Publishing Simple Messages Source: https://github.com/sonus21/rqueue/wiki/Home Use enqueue to send a simple message to a queue. ```java rqueueMessageEnqueuer.enqueue("simple-queue", "Rqueue is configured"); ``` -------------------------------- ### Create Rqueue-Locks KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-locks' KV bucket with specified replicas, storage, and TTL. This bucket is used for distributed locks. ```bash nats kv add rqueue-locks --replicas=3 --storage=file --ttl=10m ``` -------------------------------- ### Create Rqueue-Workers KV Bucket Source: https://github.com/sonus21/rqueue/blob/master/README.md Manually create the 'rqueue-workers' KV bucket with specified replicas, storage, and TTL. This bucket is used for worker registry information. ```bash nats kv add rqueue-workers --replicas=3 --storage=file --ttl=5m ``` -------------------------------- ### Listening to Simple Messages Source: https://github.com/sonus21/rqueue/wiki/Home Use RqueueListener to define a method that listens to messages on a specific queue. ```java @RqueueListener(value = "simple-queue") public void simpleMessage(String message) { log.info("simple-queue: {}", message); } ``` -------------------------------- ### Configure Post Execution Message Processor Source: https://github.com/sonus21/rqueue/wiki/Callback-Events Set up a post-execution message processor that is called after a message has been successfully consumed. ```java class RqueueConfiguration { private MessageProcessor postExecutionMessageProcessor() { // return message processor object } @Bean public SimpleRqueueListenerContainerFactory simpleRqueueListenerContainerFactory() { SimpleRqueueListenerContainerFactory factory = new SimpleRqueueListenerContainerFactory(); MessageProcessor postExecutionMessageProcessor = postExecutionMessageProcessor(); factory.setPostExecutionMessageProcessor(postExecutionMessageProcessor); return factory; } } ``` -------------------------------- ### Run Gradle Tests Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Executes core module tests, including those for Redis, Web, and NATS backends, with unit test tags. It also runs a specific end-to-end integration test for the NATS backend. ```bash ./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-web:test :rqueue-nats:test -DincludeTags=unit ``` ```bash ./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT" ``` -------------------------------- ### Configure Multicast Message Listener Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/message-handling.md Annotate a class with @RqueueListener and register it as a Spring bean. Then, annotate each handler method with @RqueueHandler, designating one as primary using 'primary = true'. ```java import org.springframework.stereotype.Component; import org.springframework.beans.factory.annotation.Value; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import io.rqueue.listener.annotation.RqueueHandler; import io.rqueue.listener.annotation.RqueueListener; @RqueueListener(value = "${user.banned.queue.name}", active = "${user.banned.queue.active}") @Slf4j @Component @RequiredArgsConstructor public class UserBannedMessageListener { private final ConsumedMessageStore consumedMessageStore; @Value("${user.banned.queue.name}") private String userBannedQueue; @RqueueHandler public void handleMessage1(UserBanned userBanned) throws JsonProcessingException { consumedMessageStore.save(userBanned, "handleMessage1", userBannedQueue); log.info("handleMessage1 {}", userBanned); } @RqueueHandler public void handleMessage2(UserBanned userBanned) throws JsonProcessingException { consumedMessageStore.save(userBanned, "handleMessage2", userBannedQueue); log.info("handleMessage2 {}", userBanned); } @RqueueHandler(primary = true) public void handleMessagePrimary(UserBanned userBanned) throws JsonProcessingException { consumedMessageStore.save(userBanned, "handleMessagePrimary", userBannedQueue); log.info("handleMessagePrimary {}", userBanned); } @RqueueHandler public void handleUserBanned(UserBanned userBanned) throws JsonProcessingException { consumedMessageStore.save(userBanned, "handleUserBanned", userBannedQueue); log.info("handleUserBanned {}", userBanned); } } ``` -------------------------------- ### Registering Queues on Bootstrap Event Source: https://github.com/sonus21/rqueue/wiki/Producer-Consumer Register queues when the application receives a RqueueBootstrapEvent to avoid QueueDoesNotExist errors. Ensure the event is for startup. ```java import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import com.github.sonus21.rqueue.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.manager.RqueueEndpointManager; @Component class AppMessageSender implements ApplicationListener { @Autowired private RqueueEndpointManager rqueueEndpointManager; @Override public void onApplicationEvent(RqueueBootstrapEvent event) { if (!event.isStartup()) { return; } for (String queue : queues) { String[] priorities = getPriority(queue); rqueueEndpointManager.registerQueue(queue, priorities); } } private String[] getPriority(String queue) { return new String[]{}; } } ``` -------------------------------- ### Enqueueing a Simple Message Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/message-deduplication.md This snippet shows how to enqueue a `SimpleMessage` with a defined ID and schedule it for processing. It also records the enqueue time in a repository. ```java interface MessageRepository { Long getLatestEnqueueAt(String messageId); void addEnqueueAt(String messageId, Long time); } class SimpleMessage { private String id; } class MessageSender { @Autowited private MessageRepository messageRepository; @Autowired private RqueueMessageEnqueuer rqueueMessageEnqueuer; public void sendMessage(SimpleMessage message) { String id = message.getId(); //TODO handle error case messageRepository.addEnqueueAt(id, System.currentTimeMillis()); rqueueMessageEnqueuer.enqueueIn("simple-queue", message, Duration.ofMinutes(10)); } } ``` -------------------------------- ### Listening to SMS with Priority Source: https://github.com/sonus21/rqueue/wiki/Home Configure a listener for SMS messages with defined priority levels. ```java @RqueueListener(value = "sms", priority = "critical=10,high=8,medium=4,low=1") public void onMessage(Sms sms) { log.info("Sms : {}", sms); } ``` -------------------------------- ### Configure Listener for Consumer Cluster Source: https://github.com/sonus21/rqueue/blob/master/docs/message-handling/producer-consumer.md Set 'active = true' in @RqueueListener to enable message consumption in the consumer cluster. ```java @RqueueListener(value = "my-queue", active = true) public void myMessageListener(String message) { // Process message } ``` -------------------------------- ### Run NATS Rqueue Message Metadata Service Integration Test Source: https://github.com/sonus21/rqueue/blob/master/nats-task.md Execute integration tests for the NATS Rqueue Message Metadata Service. ```bash ./gradlew :rqueue-nats:test --tests "com.github.sonus21.rqueue.nats.service.NatsRqueueMessageMetadataServiceIT" ``` -------------------------------- ### Custom ThreadPoolTaskExecutor Configuration Source: https://github.com/sonus21/rqueue/wiki/Configuration Demonstrates the configuration of a custom ThreadPoolTaskExecutor with core pool size, max pool size, and queue capacity. ```java ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setThreadNamePrefix( "ListenerExecutor" ); threadPoolTaskExecutor.setCorePoolSize(corePoolSize); threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize); threadPoolTaskExecutor.setQueueCapacity(queueCapacity); threadPoolTaskExecutor.afterPropertiesSet(); factory.setTaskExecutor(threadPoolTaskExecutor); ```