### Local Kafka Cluster Setup (docker-compose) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/example-docker-compose.md This docker-compose configuration sets up a complete local Kafka environment. It includes services for Zookeeper, Kafka broker, Schema Registry, Kafka Connect, Confluent Control Center, and ksqlDB server. This setup is ideal for development and testing purposes. ```yaml version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.2 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.2 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: image: confluentinc/cp-schema-registry:7.3.2 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 # CLASSPATH required due to CC-2422 CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.2.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR control-center: image: confluentinc/cp-enterprise-control-center:7.3.2 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - connect - ksqldb-server ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 ksqldb-server: image: confluentinc/cp-ksqldb-server:7.3.2 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" ``` -------------------------------- ### Install Laravel Kafka with Composer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md Installs the laravel-kafka package into your Laravel project using Composer. This is the standard method for adding the package as a dependency. ```bash composer require mateusjunges/laravel-kafka ``` -------------------------------- ### Publish Laravel Kafka Configuration Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md Publishes the configuration file for the laravel-kafka package to your Laravel project's config directory. This allows you to customize Kafka settings. ```bash php artisan vendor:publish --tag=laravel-kafka-config ``` -------------------------------- ### Create Basic Kafka Consumer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/creating-consumer.md Demonstrates how to create a basic Kafka consumer instance using the Kafka facade. This is the starting point for consuming messages. ```PHP use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer(); ``` -------------------------------- ### Default Laravel Kafka Configuration Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md The default configuration file for laravel-kafka, defining essential Kafka parameters such as broker URLs, consumer group IDs, offset reset policies, auto-commit settings, and compression codecs. ```php env('KAFKA_BROKERS', 'localhost:9092'), /* | Kafka consumers belonging to the same consumer group share a group id. | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by | establishing that each partition is only consumed by a single consumer from the group. | This config defines the consumer group id you want to use for your project. */ 'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'), 'consumer_timeout_ms' => env("KAFKA_CONSUMER_DEFAULT_TIMEOUT", 2000), /* | After the consumer receives its assignment from the coordinator, | it must determine the initial position for each assigned partition. | When the group is first created, before any messages have been consumed, the position is set according to a configurable | offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset. | You can choose between "latest", "earliest" or "none". */ 'offset_reset' => env('KAFKA_OFFSET_RESET', 'latest'), /* | If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the | interval set by auto.commit.interval.ms. */ 'auto_commit' => env('KAFKA_AUTO_COMMIT', true), 'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5), 'partition' => env('KAFKA_PARTITION', 0), /* | Kafka supports 4 compression codecs: none , gzip , lz4 and snappy */ 'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'), /* | Choose if debug is enabled or not. */ 'debug' => env('KAFKA_DEBUG', false), /* | Repository for batching messages together | Implement BatchRepositoryInterface to save batches in different storage */ 'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class), /* | The sleep time in milliseconds that will be used when retrying flush */ 'flush_retry_sleep_in_ms' => 100, /* | The cache driver that will be used */ 'cache_driver' => env('KAFKA_CACHE_DRIVER', env('CACHE_DRIVER', 'file')), ]; ``` -------------------------------- ### Install Supervisor on Ubuntu/macOS (Bash) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md Provides commands to install the Supervisor process control system on Ubuntu and macOS. Supervisor is essential for keeping background consumer processes running reliably. It ensures that if a consumer crashes, it is automatically restarted. ```bash sudo apt-get install supervisor ``` ```bash brew install supervisor ``` -------------------------------- ### Start and Manage Supervisor Processes (Bash) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md Lists the necessary commands to reload Supervisor's configuration, update its process list, and start the defined consumer processes. These commands are crucial for deploying and managing the background consumer service. ```bash sudo supervisorctl reread ``` ```bash sudo supervisorctl update ``` ```bash sudo supervisorctl start my-topic-consumer:* ``` -------------------------------- ### Docker Compose for Kafka and ksqlDB Services Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/example-docker-compose.md Defines the Docker Compose services for a Kafka and ksqlDB environment. Includes configurations for ksqlDB CLI, ksql-datagen, and the REST proxy, specifying images, dependencies, ports, and environment variables. ```yaml KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:7.3.2 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:7.3.2 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b broker:29092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:7.3.2 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' ``` -------------------------------- ### Build and Consume Kafka Messages in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/consuming-messages.md This snippet demonstrates how to build a Kafka consumer and then consume messages using the laravel-kafka package. It requires the package to be installed and configured. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer()->build(); $consumer->consume(); ``` -------------------------------- ### Subscribe to Kafka Topics with Regex (PHP) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/using-regex-to-subscribe-to-kafka-topics.md This snippet shows how to subscribe a Kafka consumer to topics using a regular expression pattern. The pattern `^myPfx_.*` matches any topic starting with `myPfx_`. The consumer will see the new topics on its next periodic metadata refresh. ```php \Junges\Kafka\Facades\Kafka::consumer() ->subscribe('^myPfx_.*') ->withHandler(...); ``` -------------------------------- ### Update publishOn Method to publish and onTopic Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md The `publishOn` method has been renamed to `publish`. It no longer accepts a `$topics` parameter directly. Instead, specify the topic by chaining a call to `onTopic`. ```php \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic-name'); ``` -------------------------------- ### Mocking Kafka Consumers in Laravel Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/testing/mocking-your-kafka-consumer.md This example demonstrates how to mock Kafka messages and execute a consumer within a Laravel test environment. It shows how to use the `fake()` and `shouldReceiveMessages()` methods to control the messages processed by your consumer, allowing for thorough testing of your message handling logic. ```php public function test_post_is_marked_as_published() { // First, you use the fake method: \Junges\Kafka\Facades\Kafka::fake(); // Then, tells Kafka what messages the consumer should receive: \Junges\Kafka\Facades\Kafka::shouldReceiveMessages([ new \Junges\Kafka\Message\ConsumedMessage( topicName: 'mark-post-as-published-topic', partition: 0, headers: [], body: ['post_id' => 1], key: null, offset: 0, timestamp: 0 ), ]); // Now, instantiate your consumer and start consuming messages. It will consume only the messages // specified in `shouldReceiveMessages` method: $consumer = \Junges\Kafka\Facades\Kafka::consumer(['mark-post-as-published-topic']) ->withHandler(function (\Junges\Kafka\Contracts\ConsumerMessage $message) use (&$posts) { $post = Post::find($message->getBody()['post_id']); $post->update(['published_at' => now()->format("Y-m-d H:i:s")]); return 0; })->build(); $consumer->consume(); // Now, you can test if the post published_at field is not empty, or anything else you want to test: $this->assertNotNull($post->refresh()->published_at); } ``` -------------------------------- ### AVRO Serializer Configuration Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/custom-serializers.md Provides a detailed example of configuring the AVRO serializer for Laravel Kafka. This includes setting up the schema registry with caching, mapping schemas for topics, and initializing the serializer with the registry and record serializer. ```PHP use FlixTech\AvroSerializer\Objects\RecordSerializer; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addBodySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */) ); $registry->addKeySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */) ); $serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */); $producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer($serializer); ``` -------------------------------- ### Set onStopConsuming Callbacks During Consumer Build Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md Callbacks for `onStopConsuming` must now be defined during the consumer's build process, rather than after calling the `build` method. This ensures callbacks are registered before consumption begins. ```php $consumer = Kafka::consumer(['topic']) ->withConsumerGroupId('group') ->withHandler(new Handler) ->onStopConsuming(static function () { // Do something when the consumer stop consuming messages }) ->build() ``` -------------------------------- ### Use AVRO Deserializer with Kafka PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/custom-deserializers.md Provides a detailed example of setting up and using the AVRO deserializer for Kafka consumers. This includes configuring the schema registry, mapping schemas to topics for both keys and bodies, and initializing the AVRO deserializer with the necessary components. It also shows how to specify decoding modes for key or body. ```php use FlixTech\AvroSerializer\Objects\RecordSerializer; use FlixTech\SchemaRegistryApi\Registry\CachedRegistry; use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry; use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry; use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter; use GuzzleHttp\Client; $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => 'kafka-schema-registry:9081']) ) ), new AvroObjectCacheAdapter() ); $registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry); $recordSerializer = new RecordSerializer($cachedRegistry); //if no version is defined, latest version will be used //if no schema definition is defined, the appropriate version will be fetched form the registry $registry->addBodySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */) ); $registry->addKeySchemaMappingForTopic( 'test-topic', new \Junges\Kafka\Message\KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */) ); // if you are only decoding key or value, you can pass that mode as additional third argument // per default both key and body will get decoded $deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */); $consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer($deserializer); ``` -------------------------------- ### Define and Use Custom Committer Logic Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md Illustrates creating a `CustomCommitter` that only commits successful messages and a `CustomCommitterFactory` to provide this logic to the Kafka consumer. The example shows how to build the consumer with the custom factory. ```php class CustomCommitter implements CommitterContract { public function __construct(private KafkaConsumer $consumer) {} public function commitMessage(Message $message, bool $success): void { if (! $success) { return; } $this->consumer->commit($message); } public function commitDlq(Message $message): void { $this->consumer->commit($message); } } class CustomCommitterFactory implements CommitterFactory { public function make(KafkaConsumer $kafkaConsumer, Config $config): CommitterContract { return new RetryableCommitter( new SuccessCommitter($kafkaConsumer), new NativeSleeper(), $config->getMaxCommitRetries() ); } } use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer() ->usingCommitterFactory(new CustomCommitterFactory()) ->build(); ``` -------------------------------- ### Update withSasl Method Signature Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md The `withSasl` method signature has been updated to accept individual SASL parameters instead of a `Sasl` object. This change simplifies the configuration of SASL authentication. ```php public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT'); ``` -------------------------------- ### Handler Functions Require MessageConsumer Parameter Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md In v2.x, handler functions and classes now require a `JungesKafkaContractsMessageConsumer` instance as the second argument. This provides access to consumer-specific methods within the handler. ```php $consumer = Kafka::consumer(['topic']) ->withConsumerGroupId('group') ->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) { // }) ``` -------------------------------- ### Get a Fresh Producer Instance Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md When using asynchronous publishing, the builder is stored in memory. The `fresh` method on the Kafka facade allows you to obtain a new Kafka Manager with a fresh producer, ensuring isolation if needed. ```php use Junges\Kafka\Facades\Kafka; Kafka::fresh() ->asyncPublish('broker') ->onTopic('topic-name') ``` -------------------------------- ### Run Package Tests Source: https://github.com/mateusjunges/laravel-kafka/blob/master/README.md Execute the test suite for the Laravel Kafka package using Composer. This command verifies the package's functionality and stability. ```bash composer test ``` -------------------------------- ### Render Sponsor Request Component Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/introduction.md This snippet renders a custom Blade component for sponsor requests. It's a placeholder for dynamic content related to project sponsorship. ```blade ``` -------------------------------- ### Configure Kafka Producer Options Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-producers.md Demonstrates how to set Kafka producer configuration options using `withConfigOption` for single options and `withConfigOptions` for multiple options. It references external documentation for available properties. ```php use Junges\Kafka\Facades\Kafka; Kafka::publish('broker') ->onTopic('topic') ->withConfigOption('property-name', 'property-value') ->withConfigOptions([ 'property-name' => 'property-value' ]); ``` -------------------------------- ### Create Kafka Consumer with Topics and Broker Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/creating-consumer.md Shows how to create a Kafka consumer and specify the topics to consume, the broker address, and the consumer group ID. This configures the consumer for specific Kafka interactions. ```PHP use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer(['topic-1', 'topic-2'], 'group-id', 'broker'); ``` -------------------------------- ### Define Custom Producer Macro Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/setting-global-configuration.md Demonstrates how to use Laravel's macro functionality to create a custom producer method that applies specific configuration options globally. This bypasses the current limitation of not having direct global configuration settings. ```PHP // In a service provider: \Junges\Kafka\Facades\Kafka::macro('myProducer', function () { return $this->publish('broker') ->onTopic('my-awesome-topic') ->withConfigOption('key', 'value'); }); ``` -------------------------------- ### Create Laravel Kafka Consumer Class (PHP) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md Demonstrates creating a basic Kafka consumer class in Laravel using the `laravel-kafka` package. It shows how to define the command signature, description, and handle message consumption with a callback. Requires `laravel-kafka` package and a running Kafka broker. ```php withBrokers('localhost:8092') ->withAutoCommit() ->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) { // Handle your message here }) ->build(); $consumer->consume(); } } ``` -------------------------------- ### Publish Single Message to Kafka Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/publishing-to-kafka.md Demonstrates how to publish a single message to a Kafka topic. It shows how to configure the producer with topic, configuration options, Kafka key, and headers before sending the message. ```php use Junges\Kafka\Facades\Kafka; /** @var \Junges\Kafka\Producers\Builder $producer */ $producer = Kafka::publish('broker') ->onTopic('topic') ->withConfigOptions(['key' => 'value']) ->withKafkaKey('kafka-key') ->withHeaders(['header-key' => 'header-value']); $producer->send(); ``` -------------------------------- ### Set Kafka Options using withOptions/withOption in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md Allows setting Kafka configuration options for the consumer. Options can be provided as an associative array using `withOptions` or individually using the `withOption` method. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->withOptions([ 'option-name' => 'option-value' ]); // Or: $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->withOption('option-name', 'option-value'); ``` -------------------------------- ### Send Kafka Message Batches with PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-message-batch-to-kafka.md Demonstrates how to create a `MessageBatch` instance, push multiple `Message` objects into it, and then send the entire batch using the `sendBatch` method on a Kafka producer. This method is optimized for high throughput scenarios, sending messages asynchronously. ```php use Junges\Kafka\Facades\Kafka; use Junges\Kafka\Producers\MessageBatch; use Junges\Kafka\Message\Message; $message = new Message( headers: ['header-key' => 'header-value'], body: ['key' => 'value'], key: 'kafka key here', topicName: 'my_topic' ); $messageBatch = new MessageBatch(); $messageBatch->push($message); $messageBatch->push($message); $messageBatch->push($message); $messageBatch->push($message); /** @var \Junges\Kafka\Producers\Builder $producer */ $producer = Kafka::publish('broker') ->onTopic('topic') ->withConfigOptions(['key' => 'value']); $producer->sendBatch($messageBatch); ``` -------------------------------- ### Assign Multiple Partitions to Consumer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/assigning-partitions.md Assigns multiple partitions of a Kafka topic to a consumer by providing an array of `\RdKafka\TopicPartition` objects to the `assignPartitions` method. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->assignPartitions([ new \RdKafka\TopicPartition('your-topic-name', 1), new \RdKafka\TopicPartition('your-topic-name', 2), new \RdKafka\TopicPartition('your-topic-name', 3) ]); ``` -------------------------------- ### Consume Kafka Messages from Specific Partition Offset Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/consuming-from-specific-offsets.md Demonstrates how to use the `assignPartitions` method on the `ConsumerBuilder` to consume messages from a specific partition and offset within a Kafka topic. This requires creating an instance of `RdKafka\TopicPartition`. ```php $partition = 1; // The partition number you want to assign. $offset = 0; // The offset you want to start consuming messages from. $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->assignPartitions([ new \RdKafka\TopicPartition('your-topic-name', $partition, $offset) ]); ``` -------------------------------- ### Configure Supervisor for Laravel Kafka Consumer (Text) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md Details the configuration file structure for Supervisor to manage Laravel Kafka consumer processes. It specifies the program name, working directory, command to execute, and logging settings. This configuration ensures the consumer runs as a background service. ```text [program:my-topic-consumer] directory=/var/www/html process_name=%(program_name)_%(process_num)02d command=php artisan consume:my-topic autostart=true autorestart=true redirect_stderr=true stdout_logfile=/var/log/supervisor-laravel-worker.log stopwaitsecs=3600 ``` -------------------------------- ### Kafka Producer API Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md Provides methods for interacting with Kafka producers. Includes options for synchronous and asynchronous publishing, and obtaining fresh producer instances. ```APIDOC Kafka::publish(string $brokerName) - Publishes messages synchronously. - Returns a ProducerBuilder instance. - Parameters: - brokerName: The name of the Kafka broker configuration. Kafka::asyncPublish(string $brokerName) - Publishes messages asynchronously. - Returns a ProducerBuilder instance. - The producer is a singleton and flushes on application shutdown. - Parameters: - brokerName: The name of the Kafka broker configuration. Kafka::fresh() - Returns a new Kafka Manager instance with a newly created producer builder. - Useful for obtaining a fresh producer, especially with async publishing. - Added in v2.2.0. ``` -------------------------------- ### Enable and Configure Kafka Message Batching Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/handling-message-batch.md Demonstrates enabling Kafka message batching with a specified size limit and release interval. This allows for efficient handling of multiple messages, particularly useful for bulk database inserts. The provided handler function processes the collected batch of messages. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->enableBatching() ->withBatchSizeLimit(1000) ->withBatchReleaseInterval(1500) ->withHandler(function (\Illuminate\Support\Collection $collection, \Junges\Kafka\Contracts\MessageConsumer $consumer) { // Handle batch }) ->build(); $consumer->consume(); ``` -------------------------------- ### Basic Queueable Handler in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/queueable-handlers.md Demonstrates a basic queueable handler by implementing the `ShouldQueue` interface. This allows Kafka messages to be processed asynchronously via the Laravel queue system. The handler's `__invoke` method receives the Kafka message. ```php use Illuminate\Contracts\Queue\ShouldQueue; use Junges\Kafka\Contracts\Handler as HandlerContract; use Junges\Kafka\Contracts\KafkaConsumerMessage; class Handler implements HandlerContract, ShouldQueue { public function __invoke(KafkaConsumerMessage $message): void { // Handle the consumed message. } } ``` -------------------------------- ### Request Sponsor Component Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/requirements.md This component is used to display a sponsor request, likely within a templating engine for the Laravel framework. ```template ``` -------------------------------- ### Assign Single Partition to Consumer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/assigning-partitions.md Assigns a single partition of a Kafka topic to a consumer using the `assignPartitions` method with a `\RdKafka\TopicPartition` object. ```php $partition = 1; // The partition number you want to assign $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->assignPartitions([ new \RdKafka\TopicPartition('your-topic-name', $partition) ]); ``` -------------------------------- ### Add Middleware to Consumer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/middlewares.md Demonstrates how to add a custom middleware to a Kafka consumer using the `withMiddleware` method. The middleware receives the consumed message and a `next` callable, allowing for pre-processing or post-processing of messages before they are passed to the next handler in the chain. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->withMiddleware(function(\Junges\Kafka\Message\ConsumedMessage $message, callable $next) { // Perform some work here return $next($message); }); ``` -------------------------------- ### Publish Messages Synchronously Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md Use the `publish` method to send messages to Kafka. This method returns a `ProducerBuilder` instance, allowing further configuration of the producer before sending. ```php use Junges\Kafka\Facades\Kafka; Kafka::publish('broker')->onTopic('topic-name') ``` -------------------------------- ### Publish Messages Asynchronously Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md The `asyncPublish` method is recommended for sending a large volume of messages. It utilizes a singleton producer that flushes messages only when the application shuts down, reducing overhead. ```php use Junges\Kafka\Facades\Kafka; Kafka::asyncPublish('broker')->onTopic('topic-name') ``` -------------------------------- ### Configurable Queueable Handler in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/queueable-handlers.md Shows how to configure the queue connection and name for a queueable Kafka handler. By implementing `onConnection` and `onQueue` methods, you can direct the handler jobs to specific Laravel queue configurations. ```php use Illuminate\Contracts\Queue\ShouldQueue; use Junges\Kafka\Contracts\Handler as HandlerContract; use Junges\Kafka\Contracts\KafkaConsumerMessage; class Handler implements HandlerContract, ShouldQueue { public function __invoke(KafkaConsumerMessage $message): void { // Handle the consumed message. } public function onConnection(): string { return 'sqs'; // Specify your queue connection } public function onQueue(): string { return 'kafka-handlers'; // Specify your queue name } } ``` -------------------------------- ### Subscribe to Multiple Kafka Topics (PHP) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md Subscribes a Kafka consumer instance to multiple Kafka topics by passing topic names as separate arguments. This allows for flexible subscription to several topics simultaneously. ```php use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer()->subscribe('topic-1', 'topic-2', 'topic-n'); ``` -------------------------------- ### Implement Custom Committer and Factory Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md Defines custom `MyCommitter` and `MyCommitterFactory` classes implementing the `Committer` and `CommitterFactory` interfaces respectively, and shows how to integrate them with the Kafka consumer. ```php use Junges\Kafka\Config\Config; use Junges\Kafka\Contracts\Committer; use Junges\Kafka\Contracts\CommitterFactory; use RdKafka\KafkaConsumer; use RdKafka\Message; class MyCommitter implements Committer { public function commitMessage(Message $message, bool $success) : void { // ... } public function commitDlq(Message $message) : void { // ... } } class MyCommitterFactory implements CommitterFactory { public function make(KafkaConsumer $kafkaConsumer, Config $config) : Committer { // ... } } $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->usingCommitterFactory(new MyCommitterFactory()) ->build(); ``` -------------------------------- ### Subscribe to Multiple Kafka Topics via Array (PHP) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md Subscribes a Kafka consumer instance to multiple Kafka topics by providing an array of topic names. This is a convenient way to manage subscriptions for a larger number of topics. ```php use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer()->subscribe([ 'topic-1', 'topic-2', 'topic-n' ]); ``` -------------------------------- ### Assert Nothing Published in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/testing/assert-nothing-published.md This snippet demonstrates how to use the `assertNothingPublished` method to verify that no Kafka messages have been published. It sets up a fake Kafka producer and then asserts that no messages were sent. ```php use PHPUnit\Framework\TestCase; use Junges\Kafka\Facades\Kafka; use Junges\Kafka\Message\Message; class MyTest extends TestCase { public function testWithSpecificTopic() { Kafka::fake(); if (false) { $producer = Kafka::publish('broker') ->onTopic('some-kafka-topic') ->withHeaders(['key' => 'value']) ->withBodyKey('key', 'value'); $producer->send(); } Kafka::assertNothingPublished(); } } ``` -------------------------------- ### Define Kafka Producer Macro in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sending-multiple-messages-with-the-same-producer.md This snippet shows how to define a custom macro for the Kafka facade in a Laravel service provider. The macro encapsulates common publishing configurations, such as the broker, topic, and specific options, making it reusable for sending messages with consistent settings. ```php // In a service provider: \Junges\Kafka\Facades\Kafka::macro('myProducer', function () { return $this->publish('broker') ->onTopic('my-awesome-topic') ->withConfigOption('key', 'value'); }); // Usage: // \Junges\Kafka\Facades\Kafka::myProducer(); ``` -------------------------------- ### PHP Kafka Consumer with Callback Handler Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/message-handlers.md Demonstrates how to set a callback function as a message handler for a Kafka consumer using the `withHandler` method. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer(); // Using callback: $consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) { // Handle your message here }); ``` -------------------------------- ### Configure Dead Letter Queue (DLQ) in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md Configures a Dead Letter Queue (DLQ) for messages that fail processing. If no DLQ topic name is specified, it defaults to the original topic name with a '-dlq' suffix. Three header keys (`kafka_throwable_message`, `kafka_throwable_code`, `kafka_throwable_class_name`) are added to messages sent to the DLQ. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq(); //Or, specifying the dlq topic name: $consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq('your-dlq-topic-name') ``` -------------------------------- ### Set Kafka Partition Key in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md Sets the Kafka key for a message using the `withKafkaKey` method. This key is crucial for determining which partition a message is appended to within a Kafka log. ```php use Junges\Kafka\Facades\Kafka; Kafka::publish('broker')->onTopic('topic')->withKafkaKey('your-kafka-key'); ``` -------------------------------- ### Set Max Messages for Consumer in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md Limits the total number of messages a Kafka consumer will process. Use the `withMaxMessages` method to specify this limit. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxMessages(2); ``` -------------------------------- ### Set Entire Message Body with Message Object in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md Sets the entire message body using a `Junges\Kafka\Message\Message` object via the `withMessage` method. This provides a structured way to define message content, headers, and Kafka keys. ```php use Junges\Kafka\Facades\Kafka; use Junges\Kafka\Message\Message; $message = new Message( headers: ['header-key' => 'header-value'], body: ['key' => 'value'], key: 'kafka key here' ) Kafka::publish('broker')->onTopic('topic')->withMessage($message); ``` -------------------------------- ### Request Sponsor Component Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md A custom component or tag used for requesting a sponsor, likely within a templating system. ```+parse ``` -------------------------------- ### Set Max Processing Time for Consumer in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md Sets a maximum duration, in seconds, for which a Kafka consumer can actively process messages. Use the `withMaxTime` method to define this time limit. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxTime(3600); ``` -------------------------------- ### Configure Message Headers in PHP Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md Configures message headers using the `withHeaders` method. This allows you to attach custom key-value pairs to your Kafka messages, which can be useful for metadata or routing. ```php use Junges\Kafka\Facades\Kafka; Kafka::publish('broker') ->onTopic('topic') ->withHeaders([ 'header-key' => 'header-value' ]) ``` -------------------------------- ### Using Custom Serializer Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/custom-serializers.md Demonstrates how to use a custom message serializer with the Kafka producer. This involves creating a class that implements the `\Junges\Kafka\Contracts\MessageSerializer` contract and passing an instance to the `usingSerializer` method. ```PHP $producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer(new MyCustomSerializer()); ``` -------------------------------- ### Configure SASL Authentication with Laravel Kafka Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sasl-authentication.md Enables SASL authentication for Kafka clients by using the `withSasl` method. This method requires a `Junges\Kafka\Config\Sasl` instance to provide credentials like username, password, and authentication mechanism. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->withSasl( password: 'password', username: 'username', mechanisms: 'authentication mechanism' ); ``` -------------------------------- ### Subscribe to Single Kafka Topic (PHP) Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md Subscribes a Kafka consumer instance to a single Kafka topic. This method is part of the consumer builder pattern provided by the library. ```php use Junges\Kafka\Facades\Kafka; $consumer = Kafka::consumer()->subscribe('topic'); ``` -------------------------------- ### Configure TLS Authentication with Laravel Kafka Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sasl-authentication.md Sets up TLS authentication for Kafka clients by providing SSL-related configuration options via the `withOptions` method. This includes specifying certificate locations and endpoint identification algorithms. ```php $consumer = \Junges\Kafka\Facades\Kafka::consumer() ->withOptions([ 'ssl.ca.location' => '/some/location/kafka.crt', 'ssl.certificate.location' => '/some/location/client.crt', 'ssl.key.location' => '/some/location/client.key', 'ssl.endpoint.identification.algorithm' => 'none' ]); ```