### Local Kafka Cluster Setup (docker-compose)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/example-docker-compose.md
This docker-compose configuration sets up a complete local Kafka environment. It includes services for Zookeeper, Kafka broker, Schema Registry, Kafka Connect, Confluent Control Center, and ksqlDB server. This setup is ideal for development and testing purposes.
```yaml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.3.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.3-7.1.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:7.3.2
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.3.2
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
```
--------------------------------
### Install Laravel Kafka with Composer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md
Installs the laravel-kafka package into your Laravel project using Composer. This is the standard method for adding the package as a dependency.
```bash
composer require mateusjunges/laravel-kafka
```
--------------------------------
### Publish Laravel Kafka Configuration
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md
Publishes the configuration file for the laravel-kafka package to your Laravel project's config directory. This allows you to customize Kafka settings.
```bash
php artisan vendor:publish --tag=laravel-kafka-config
```
--------------------------------
### Create Basic Kafka Consumer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/creating-consumer.md
Demonstrates how to create a basic Kafka consumer instance using the Kafka facade. This is the starting point for consuming messages.
```PHP
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer();
```
--------------------------------
### Default Laravel Kafka Configuration
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/installation-and-setup.md
The default configuration file for laravel-kafka, defining essential Kafka parameters such as broker URLs, consumer group IDs, offset reset policies, auto-commit settings, and compression codecs.
```php
env('KAFKA_BROKERS', 'localhost:9092'),
/*
| Kafka consumers belonging to the same consumer group share a group id.
| The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
| establishing that each partition is only consumed by a single consumer from the group.
| This config defines the consumer group id you want to use for your project.
*/
'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),
'consumer_timeout_ms' => env("KAFKA_CONSUMER_DEFAULT_TIMEOUT", 2000),
/*
| After the consumer receives its assignment from the coordinator,
| it must determine the initial position for each assigned partition.
| When the group is first created, before any messages have been consumed, the position is set according to a configurable
| offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
| You can choose between "latest", "earliest" or "none".
*/
'offset_reset' => env('KAFKA_OFFSET_RESET', 'latest'),
/*
| If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
| interval set by auto.commit.interval.ms.
*/
'auto_commit' => env('KAFKA_AUTO_COMMIT', true),
'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),
'partition' => env('KAFKA_PARTITION', 0),
/*
| Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
*/
'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),
/*
| Choose if debug is enabled or not.
*/
'debug' => env('KAFKA_DEBUG', false),
/*
| Repository for batching messages together
| Implement BatchRepositoryInterface to save batches in different storage
*/
'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),
/*
| The sleep time in milliseconds that will be used when retrying flush
*/
'flush_retry_sleep_in_ms' => 100,
/*
| The cache driver that will be used
*/
'cache_driver' => env('KAFKA_CACHE_DRIVER', env('CACHE_DRIVER', 'file')),
];
```
--------------------------------
### Install Supervisor on Ubuntu/macOS (Bash)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md
Provides commands to install the Supervisor process control system on Ubuntu and macOS. Supervisor is essential for keeping background consumer processes running reliably. It ensures that if a consumer crashes, it is automatically restarted.
```bash
sudo apt-get install supervisor
```
```bash
brew install supervisor
```
--------------------------------
### Start and Manage Supervisor Processes (Bash)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md
Lists the necessary commands to reload Supervisor's configuration, update its process list, and start the defined consumer processes. These commands are crucial for deploying and managing the background consumer service.
```bash
sudo supervisorctl reread
```
```bash
sudo supervisorctl update
```
```bash
sudo supervisorctl start my-topic-consumer:*
```
--------------------------------
### Docker Compose for Kafka and ksqlDB Services
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/example-docker-compose.md
Defines the Docker Compose services for a Kafka and ksqlDB environment. Includes configurations for ksqlDB CLI, ksql-datagen, and the REST proxy, specifying images, dependencies, ports, and environment variables.
```yaml
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.3.2
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.3.2
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.3.2
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
```
--------------------------------
### Build and Consume Kafka Messages in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/consuming-messages.md
This snippet demonstrates how to build a Kafka consumer and then consume messages using the laravel-kafka package. It requires the package to be installed and configured.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->build();
$consumer->consume();
```
--------------------------------
### Subscribe to Kafka Topics with Regex (PHP)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/using-regex-to-subscribe-to-kafka-topics.md
This snippet shows how to subscribe a Kafka consumer to topics using a regular expression pattern. The pattern `^myPfx_.*` matches any topic starting with `myPfx_`. The consumer will see the new topics on its next periodic metadata refresh.
```php
\Junges\Kafka\Facades\Kafka::consumer()
->subscribe('^myPfx_.*')
->withHandler(...);
```
--------------------------------
### Update publishOn Method to publish and onTopic
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md
The `publishOn` method has been renamed to `publish`. It no longer accepts a `$topics` parameter directly. Instead, specify the topic by chaining a call to `onTopic`.
```php
\Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic-name');
```
--------------------------------
### Mocking Kafka Consumers in Laravel
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/testing/mocking-your-kafka-consumer.md
This example demonstrates how to mock Kafka messages and execute a consumer within a Laravel test environment. It shows how to use the `fake()` and `shouldReceiveMessages()` methods to control the messages processed by your consumer, allowing for thorough testing of your message handling logic.
```php
public function test_post_is_marked_as_published()
{
// First, you use the fake method:
\Junges\Kafka\Facades\Kafka::fake();
// Then, tells Kafka what messages the consumer should receive:
\Junges\Kafka\Facades\Kafka::shouldReceiveMessages([
new \Junges\Kafka\Message\ConsumedMessage(
topicName: 'mark-post-as-published-topic',
partition: 0,
headers: [],
body: ['post_id' => 1],
key: null,
offset: 0,
timestamp: 0
),
]);
// Now, instantiate your consumer and start consuming messages. It will consume only the messages
// specified in `shouldReceiveMessages` method:
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['mark-post-as-published-topic'])
->withHandler(function (\Junges\Kafka\Contracts\ConsumerMessage $message) use (&$posts) {
$post = Post::find($message->getBody()['post_id']);
$post->update(['published_at' => now()->format("Y-m-d H:i:s")]);
return 0;
})->build();
$consumer->consume();
// Now, you can test if the post published_at field is not empty, or anything else you want to test:
$this->assertNotNull($post->refresh()->published_at);
}
```
--------------------------------
### AVRO Serializer Configuration
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/custom-serializers.md
Provides a detailed example of configuring the AVRO serializer for Laravel Kafka. This includes setting up the schema registry with caching, mapping schemas for topics, and initializing the serializer with the registry and record serializer.
```PHP
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'kafka-schema-registry:9081'])
)
),
new AvroObjectCacheAdapter()
);
$registry = new AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
);
$serializer = new \Junges\Kafka\Message\Serializers\AvroSerializer($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);
$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer($serializer);
```
--------------------------------
### Set onStopConsuming Callbacks During Consumer Build
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md
Callbacks for `onStopConsuming` must now be defined during the consumer's build process, rather than after calling the `build` method. This ensures callbacks are registered before consumption begins.
```php
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
->onStopConsuming(static function () {
// Do something when the consumer stop consuming messages
})
->build()
```
--------------------------------
### Use AVRO Deserializer with Kafka PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/custom-deserializers.md
Provides a detailed example of setting up and using the AVRO deserializer for Kafka consumers. This includes configuring the schema registry, mapping schemas to topics for both keys and bodies, and initializing the AVRO deserializer with the necessary components. It also shows how to specify decoding modes for key or body.
```php
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use GuzzleHttp\Client;
$cachedRegistry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(['base_uri' => 'kafka-schema-registry:9081'])
)
),
new AvroObjectCacheAdapter()
);
$registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry);
$recordSerializer = new RecordSerializer($cachedRegistry);
//if no version is defined, latest version will be used
//if no schema definition is defined, the appropriate version will be fetched form the registry
$registry->addBodySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */)
);
$registry->addKeySchemaMappingForTopic(
'test-topic',
new \Junges\Kafka\Message\KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */)
);
// if you are only decoding key or value, you can pass that mode as additional third argument
// per default both key and body will get decoded
$deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */);
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer($deserializer);
```
--------------------------------
### Define and Use Custom Committer Logic
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md
Illustrates creating a `CustomCommitter` that only commits successful messages and a `CustomCommitterFactory` to provide this logic to the Kafka consumer. The example shows how to build the consumer with the custom factory.
```php
class CustomCommitter implements CommitterContract
{
public function __construct(private KafkaConsumer $consumer) {}
public function commitMessage(Message $message, bool $success): void
{
if (! $success) {
return;
}
$this->consumer->commit($message);
}
public function commitDlq(Message $message): void
{
$this->consumer->commit($message);
}
}
class CustomCommitterFactory implements CommitterFactory
{
public function make(KafkaConsumer $kafkaConsumer, Config $config): CommitterContract
{
return new RetryableCommitter(
new SuccessCommitter($kafkaConsumer),
new NativeSleeper(),
$config->getMaxCommitRetries()
);
}
}
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer()
->usingCommitterFactory(new CustomCommitterFactory())
->build();
```
--------------------------------
### Update withSasl Method Signature
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md
The `withSasl` method signature has been updated to accept individual SASL parameters instead of a `Sasl` object. This change simplifies the configuration of SASL authentication.
```php
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT');
```
--------------------------------
### Handler Functions Require MessageConsumer Parameter
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/upgrade-guide.md
In v2.x, handler functions and classes now require a `JungesKafkaContractsMessageConsumer` instance as the second argument. This provides access to consumer-specific methods within the handler.
```php
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
//
})
```
--------------------------------
### Get a Fresh Producer Instance
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md
When using asynchronous publishing, the builder is stored in memory. The `fresh` method on the Kafka facade allows you to obtain a new Kafka Manager with a fresh producer, ensuring isolation if needed.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::fresh()
->asyncPublish('broker')
->onTopic('topic-name')
```
--------------------------------
### Run Package Tests
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/README.md
Execute the test suite for the Laravel Kafka package using Composer. This command verifies the package's functionality and stability.
```bash
composer test
```
--------------------------------
### Render Sponsor Request Component
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/introduction.md
This snippet renders a custom Blade component for sponsor requests. It's a placeholder for dynamic content related to project sponsorship.
```blade
```
--------------------------------
### Configure Kafka Producer Options
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-producers.md
Demonstrates how to set Kafka producer configuration options using `withConfigOption` for single options and `withConfigOptions` for multiple options. It references external documentation for available properties.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')
->onTopic('topic')
->withConfigOption('property-name', 'property-value')
->withConfigOptions([
'property-name' => 'property-value'
]);
```
--------------------------------
### Create Kafka Consumer with Topics and Broker
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/creating-consumer.md
Shows how to create a Kafka consumer and specify the topics to consume, the broker address, and the consumer group ID. This configures the consumer for specific Kafka interactions.
```PHP
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer(['topic-1', 'topic-2'], 'group-id', 'broker');
```
--------------------------------
### Define Custom Producer Macro
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/setting-global-configuration.md
Demonstrates how to use Laravel's macro functionality to create a custom producer method that applies specific configuration options globally. This bypasses the current limitation of not having direct global configuration settings.
```PHP
// In a service provider:
\Junges\Kafka\Facades\Kafka::macro('myProducer', function () {
return $this->publish('broker')
->onTopic('my-awesome-topic')
->withConfigOption('key', 'value');
});
```
--------------------------------
### Create Laravel Kafka Consumer Class (PHP)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md
Demonstrates creating a basic Kafka consumer class in Laravel using the `laravel-kafka` package. It shows how to define the command signature, description, and handle message consumption with a callback. Requires `laravel-kafka` package and a running Kafka broker.
```php
withBrokers('localhost:8092')
->withAutoCommit()
->withHandler(function(ConsumerMessage $message, MessageConsumer $consumer) {
// Handle your message here
})
->build();
$consumer->consume();
}
}
```
--------------------------------
### Publish Single Message to Kafka
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/publishing-to-kafka.md
Demonstrates how to publish a single message to a Kafka topic. It shows how to configure the producer with topic, configuration options, Kafka key, and headers before sending the message.
```php
use Junges\Kafka\Facades\Kafka;
/** @var \Junges\Kafka\Producers\Builder $producer */
$producer = Kafka::publish('broker')
->onTopic('topic')
->withConfigOptions(['key' => 'value'])
->withKafkaKey('kafka-key')
->withHeaders(['header-key' => 'header-value']);
$producer->send();
```
--------------------------------
### Set Kafka Options using withOptions/withOption in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md
Allows setting Kafka configuration options for the consumer. Options can be provided as an associative array using `withOptions` or individually using the `withOption` method.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOptions([
'option-name' => 'option-value'
]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOption('option-name', 'option-value');
```
--------------------------------
### Send Kafka Message Batches with PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-message-batch-to-kafka.md
Demonstrates how to create a `MessageBatch` instance, push multiple `Message` objects into it, and then send the entire batch using the `sendBatch` method on a Kafka producer. This method is optimized for high throughput scenarios, sending messages asynchronously.
```php
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Producers\MessageBatch;
use Junges\Kafka\Message\Message;
$message = new Message(
headers: ['header-key' => 'header-value'],
body: ['key' => 'value'],
key: 'kafka key here',
topicName: 'my_topic'
);
$messageBatch = new MessageBatch();
$messageBatch->push($message);
$messageBatch->push($message);
$messageBatch->push($message);
$messageBatch->push($message);
/** @var \Junges\Kafka\Producers\Builder $producer */
$producer = Kafka::publish('broker')
->onTopic('topic')
->withConfigOptions(['key' => 'value']);
$producer->sendBatch($messageBatch);
```
--------------------------------
### Assign Multiple Partitions to Consumer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/assigning-partitions.md
Assigns multiple partitions of a Kafka topic to a consumer by providing an array of `\RdKafka\TopicPartition` objects to the `assignPartitions` method.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', 1),
new \RdKafka\TopicPartition('your-topic-name', 2),
new \RdKafka\TopicPartition('your-topic-name', 3)
]);
```
--------------------------------
### Consume Kafka Messages from Specific Partition Offset
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/consuming-from-specific-offsets.md
Demonstrates how to use the `assignPartitions` method on the `ConsumerBuilder` to consume messages from a specific partition and offset within a Kafka topic. This requires creating an instance of `RdKafka\TopicPartition`.
```php
$partition = 1; // The partition number you want to assign.
$offset = 0; // The offset you want to start consuming messages from.
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', $partition, $offset)
]);
```
--------------------------------
### Configure Supervisor for Laravel Kafka Consumer (Text)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/class-structure.md
Details the configuration file structure for Supervisor to manage Laravel Kafka consumer processes. It specifies the program name, working directory, command to execute, and logging settings. This configuration ensures the consumer runs as a background service.
```text
[program:my-topic-consumer]
directory=/var/www/html
process_name=%(program_name)_%(process_num)02d
command=php artisan consume:my-topic
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/supervisor-laravel-worker.log
stopwaitsecs=3600
```
--------------------------------
### Kafka Producer API
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md
Provides methods for interacting with Kafka producers. Includes options for synchronous and asynchronous publishing, and obtaining fresh producer instances.
```APIDOC
Kafka::publish(string $brokerName)
- Publishes messages synchronously.
- Returns a ProducerBuilder instance.
- Parameters:
- brokerName: The name of the Kafka broker configuration.
Kafka::asyncPublish(string $brokerName)
- Publishes messages asynchronously.
- Returns a ProducerBuilder instance.
- The producer is a singleton and flushes on application shutdown.
- Parameters:
- brokerName: The name of the Kafka broker configuration.
Kafka::fresh()
- Returns a new Kafka Manager instance with a newly created producer builder.
- Useful for obtaining a fresh producer, especially with async publishing.
- Added in v2.2.0.
```
--------------------------------
### Enable and Configure Kafka Message Batching
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/handling-message-batch.md
Demonstrates enabling Kafka message batching with a specified size limit and release interval. This allows for efficient handling of multiple messages, particularly useful for bulk database inserts. The provided handler function processes the collected batch of messages.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->enableBatching()
->withBatchSizeLimit(1000)
->withBatchReleaseInterval(1500)
->withHandler(function (\Illuminate\Support\Collection $collection, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle batch
})
->build();
$consumer->consume();
```
--------------------------------
### Basic Queueable Handler in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/queueable-handlers.md
Demonstrates a basic queueable handler by implementing the `ShouldQueue` interface. This allows Kafka messages to be processed asynchronously via the Laravel queue system. The handler's `__invoke` method receives the Kafka message.
```php
use Illuminate\Contracts\Queue\ShouldQueue;
use Junges\Kafka\Contracts\Handler as HandlerContract;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
class Handler implements HandlerContract, ShouldQueue
{
public function __invoke(KafkaConsumerMessage $message): void
{
// Handle the consumed message.
}
}
```
--------------------------------
### Request Sponsor Component
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/requirements.md
This component is used to display a sponsor request, likely within a templating engine for the Laravel framework.
```template
```
--------------------------------
### Assign Single Partition to Consumer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/assigning-partitions.md
Assigns a single partition of a Kafka topic to a consumer using the `assignPartitions` method with a `\RdKafka\TopicPartition` object.
```php
$partition = 1; // The partition number you want to assign
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', $partition)
]);
```
--------------------------------
### Add Middleware to Consumer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/middlewares.md
Demonstrates how to add a custom middleware to a Kafka consumer using the `withMiddleware` method. The middleware receives the consumed message and a `next` callable, allowing for pre-processing or post-processing of messages before they are passed to the next handler in the chain.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withMiddleware(function(\Junges\Kafka\Message\ConsumedMessage $message, callable $next) {
// Perform some work here
return $next($message);
});
```
--------------------------------
### Publish Messages Synchronously
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md
Use the `publish` method to send messages to Kafka. This method returns a `ProducerBuilder` instance, allowing further configuration of the producer before sending.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')->onTopic('topic-name')
```
--------------------------------
### Publish Messages Asynchronously
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/producing-messages.md
The `asyncPublish` method is recommended for sending a large volume of messages. It utilizes a singleton producer that flushes messages only when the application shuts down, reducing overhead.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::asyncPublish('broker')->onTopic('topic-name')
```
--------------------------------
### Configurable Queueable Handler in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/queueable-handlers.md
Shows how to configure the queue connection and name for a queueable Kafka handler. By implementing `onConnection` and `onQueue` methods, you can direct the handler jobs to specific Laravel queue configurations.
```php
use Illuminate\Contracts\Queue\ShouldQueue;
use Junges\Kafka\Contracts\Handler as HandlerContract;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
class Handler implements HandlerContract, ShouldQueue
{
public function __invoke(KafkaConsumerMessage $message): void
{
// Handle the consumed message.
}
public function onConnection(): string
{
return 'sqs'; // Specify your queue connection
}
public function onQueue(): string
{
return 'kafka-handlers'; // Specify your queue name
}
}
```
--------------------------------
### Subscribe to Multiple Kafka Topics (PHP)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md
Subscribes a Kafka consumer instance to multiple Kafka topics by passing topic names as separate arguments. This allows for flexible subscription to several topics simultaneously.
```php
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer()->subscribe('topic-1', 'topic-2', 'topic-n');
```
--------------------------------
### Implement Custom Committer and Factory
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md
Defines custom `MyCommitter` and `MyCommitterFactory` classes implementing the `Committer` and `CommitterFactory` interfaces respectively, and shows how to integrate them with the Kafka consumer.
```php
use Junges\Kafka\Config\Config;
use Junges\Kafka\Contracts\Committer;
use Junges\Kafka\Contracts\CommitterFactory;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
class MyCommitter implements Committer
{
public function commitMessage(Message $message, bool $success) : void {
// ...
}
public function commitDlq(Message $message) : void {
// ...
}
}
class MyCommitterFactory implements CommitterFactory
{
public function make(KafkaConsumer $kafkaConsumer, Config $config) : Committer {
// ...
}
}
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->usingCommitterFactory(new MyCommitterFactory())
->build();
```
--------------------------------
### Subscribe to Multiple Kafka Topics via Array (PHP)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md
Subscribes a Kafka consumer instance to multiple Kafka topics by providing an array of topic names. This is a convenient way to manage subscriptions for a larger number of topics.
```php
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer()->subscribe([
'topic-1',
'topic-2',
'topic-n'
]);
```
--------------------------------
### Assert Nothing Published in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/testing/assert-nothing-published.md
This snippet demonstrates how to use the `assertNothingPublished` method to verify that no Kafka messages have been published. It sets up a fake Kafka producer and then asserts that no messages were sent.
```php
use PHPUnit\Framework\TestCase;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
class MyTest extends TestCase
{
public function testWithSpecificTopic()
{
Kafka::fake();
if (false) {
$producer = Kafka::publish('broker')
->onTopic('some-kafka-topic')
->withHeaders(['key' => 'value'])
->withBodyKey('key', 'value');
$producer->send();
}
Kafka::assertNothingPublished();
}
}
```
--------------------------------
### Define Kafka Producer Macro in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sending-multiple-messages-with-the-same-producer.md
This snippet shows how to define a custom macro for the Kafka facade in a Laravel service provider. The macro encapsulates common publishing configurations, such as the broker, topic, and specific options, making it reusable for sending messages with consistent settings.
```php
// In a service provider:
\Junges\Kafka\Facades\Kafka::macro('myProducer', function () {
return $this->publish('broker')
->onTopic('my-awesome-topic')
->withConfigOption('key', 'value');
});
// Usage:
// \Junges\Kafka\Facades\Kafka::myProducer();
```
--------------------------------
### PHP Kafka Consumer with Callback Handler
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/message-handlers.md
Demonstrates how to set a callback function as a message handler for a Kafka consumer using the `withHandler` method.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer();
// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message, \Junges\Kafka\Contracts\MessageConsumer $consumer) {
// Handle your message here
});
```
--------------------------------
### Configure Dead Letter Queue (DLQ) in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md
Configures a Dead Letter Queue (DLQ) for messages that fail processing. If no DLQ topic name is specified, it defaults to the original topic name with a '-dlq' suffix. Three header keys (`kafka_throwable_message`, `kafka_throwable_code`, `kafka_throwable_class_name`) are added to messages sent to the DLQ.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq();
//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq('your-dlq-topic-name')
```
--------------------------------
### Set Kafka Partition Key in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md
Sets the Kafka key for a message using the `withKafkaKey` method. This key is crucial for determining which partition a message is appended to within a Kafka log.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')->onTopic('topic')->withKafkaKey('your-kafka-key');
```
--------------------------------
### Set Max Messages for Consumer in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md
Limits the total number of messages a Kafka consumer will process. Use the `withMaxMessages` method to specify this limit.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxMessages(2);
```
--------------------------------
### Set Entire Message Body with Message Object in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md
Sets the entire message body using a `Junges\Kafka\Message\Message` object via the `withMessage` method. This provides a structured way to define message content, headers, and Kafka keys.
```php
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
$message = new Message(
headers: ['header-key' => 'header-value'],
body: ['key' => 'value'],
key: 'kafka key here'
)
Kafka::publish('broker')->onTopic('topic')->withMessage($message);
```
--------------------------------
### Request Sponsor Component
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/custom-committers.md
A custom component or tag used for requesting a sponsor, likely within a templating system.
```+parse
```
--------------------------------
### Set Max Processing Time for Consumer in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/configuring-consumer-options.md
Sets a maximum duration, in seconds, for which a Kafka consumer can actively process messages. Use the `withMaxTime` method to define this time limit.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxTime(3600);
```
--------------------------------
### Configure Message Headers in PHP
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/configuring-message-payload.md
Configures message headers using the `withHeaders` method. This allows you to attach custom key-value pairs to your Kafka messages, which can be useful for metadata or routing.
```php
use Junges\Kafka\Facades\Kafka;
Kafka::publish('broker')
->onTopic('topic')
->withHeaders([
'header-key' => 'header-value'
])
```
--------------------------------
### Using Custom Serializer
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/producing-messages/custom-serializers.md
Demonstrates how to use a custom message serializer with the Kafka producer. This involves creating a class that implements the `\Junges\Kafka\Contracts\MessageSerializer` contract and passing an instance to the `usingSerializer` method.
```PHP
$producer = \Junges\Kafka\Facades\Kafka::publish('broker')->onTopic('topic')->usingSerializer(new MyCustomSerializer());
```
--------------------------------
### Configure SASL Authentication with Laravel Kafka
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sasl-authentication.md
Enables SASL authentication for Kafka clients by using the `withSasl` method. This method requires a `Junges\Kafka\Config\Sasl` instance to provide credentials like username, password, and authentication mechanism.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withSasl(
password: 'password',
username: 'username',
mechanisms: 'authentication mechanism'
);
```
--------------------------------
### Subscribe to Single Kafka Topic (PHP)
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/consuming-messages/subscribing-to-kafka-topics.md
Subscribes a Kafka consumer instance to a single Kafka topic. This method is part of the consumer builder pattern provided by the library.
```php
use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::consumer()->subscribe('topic');
```
--------------------------------
### Configure TLS Authentication with Laravel Kafka
Source: https://github.com/mateusjunges/laravel-kafka/blob/master/docs/advanced-usage/sasl-authentication.md
Sets up TLS authentication for Kafka clients by providing SSL-related configuration options via the `withOptions` method. This includes specifying certificate locations and endpoint identification algorithms.
```php
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOptions([
'ssl.ca.location' => '/some/location/kafka.crt',
'ssl.certificate.location' => '/some/location/client.crt',
'ssl.key.location' => '/some/location/client.key',
'ssl.endpoint.identification.algorithm' => 'none'
]);
```