### Install kafka-python Source: https://github.com/dpkp/kafka-python/blob/master/README.rst Use pip to install the library. ```bash $ pip install kafka-python ``` -------------------------------- ### Install kafka-python from Source with Setuptools Source: https://github.com/dpkp/kafka-python/wiki/Home Clone the repository and install kafka-python using the easy_install command. ```bash git clone https://github.com/mumrah/kafka-python easy_install ./kafka-python ``` -------------------------------- ### Install Snappy Development Libraries on Ubuntu Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Install the necessary development libraries for Snappy on Ubuntu systems before installing the python-snappy module. ```bash apt-get install libsnappy-dev ``` -------------------------------- ### Install kafka-python using setup.py Source: https://github.com/dpkp/kafka-python/wiki/Home Clone the repository and install kafka-python by directly running the setup.py script. ```bash git clone https://github.com/mumrah/kafka-python cd kafka-python python setup.py install ``` -------------------------------- ### Install Snappy Python Extension Source: https://github.com/dpkp/kafka-python/wiki/Home Install the python-snappy library after building it from source. ```bash python setup.py install ``` -------------------------------- ### Install python-snappy from PyPI Source: https://github.com/dpkp/kafka-python/wiki/Home Install the python-snappy library directly from the Python Package Index using pip. ```bash pip install python-snappy ``` -------------------------------- ### Install test dependencies Source: https://github.com/dpkp/kafka-python/blob/master/docs/tests.md Install the required development dependencies before running the test suite. ```bash pip install -r requirements-dev.txt ``` -------------------------------- ### Install Snappy Development Libraries (APT) Source: https://github.com/dpkp/kafka-python/wiki/Home Install the Snappy C library development files on Debian/Ubuntu systems using apt-get. ```bash sudo apt-get install libsnappy-dev ``` -------------------------------- ### Install kafka-python from Source with Pip Source: https://github.com/dpkp/kafka-python/wiki/Home Clone the repository and install the bleeding-edge version of kafka-python using pip. ```bash git clone https://github.com/mumrah/kafka-python pip install ./kafka-python ``` -------------------------------- ### Install Snappy Development Libraries (YUM) Source: https://github.com/dpkp/kafka-python/wiki/Home Install the Snappy C library development files on RPM-based systems using yum. ```bash sudo yum install libsnappy-devel ``` -------------------------------- ### Install Snappy Development Libraries (Brew) Source: https://github.com/dpkp/kafka-python/wiki/Home Install the Snappy C library development files on macOS using Homebrew. ```bash brew install snappy ``` -------------------------------- ### Build Snappy from Source Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Download, configure, build, and install Snappy from its source code. This is an alternative to using package managers. ```bash wget https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz tar xzvf snappy-1.1.3.tar.gz cd snappy-1.1.3 ./configure make sudo make install ``` -------------------------------- ### Install kafka-python with Optional Snappy Support Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Install kafka-python with the snappy extra to enable Snappy compression and decompression. Requires the python-snappy module and Snappy development libraries. ```bash pip install 'kafka-python[snappy]' ``` -------------------------------- ### Install Bleeding-Edge kafka-python from Source Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Clone the repository and install directly from the source for the latest development version. This method is useful for testing unreleased features. ```bash git clone https://github.com/dpkp/kafka-python pip install ./kafka-python ``` -------------------------------- ### Install kafka-python with Pip Source: https://github.com/dpkp/kafka-python/wiki/Home Use this command to install the latest stable release of kafka-python using pip. ```bash pip install kafka-python ``` -------------------------------- ### Install kafka-python with Optional LZ4 Support Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Install kafka-python with the lz4 extra to enable LZ4 compression and decompression. Requires the python-lz4 package. ```python pip install 'kafka-python[lz4]' ``` -------------------------------- ### Install kafka-python with Optional ZSTD Support Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Install kafka-python with the zstd extra to enable ZSTD compression and decompression. Requires the python-zstandard package. ```python pip install 'kafka-python[zstd]' ``` -------------------------------- ### Get Partitions and Topics Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Fetch the partitions for a given topic and list all available topics in the Kafka cluster. ```python # get all partitions of a topic print(clusterMetadata.partitions_for_topic("topic")) # list topics print(clusterMetadata.topics()) ``` -------------------------------- ### Install kafka-python with Optional crc32c Support Source: https://github.com/dpkp/kafka-python/blob/master/docs/install.md Install kafka-python with the crc32c extra to enable faster crc32c calculations, recommended for Kafka 11+ brokers. This avoids a slow pure Python implementation. ```bash pip install 'kafka-python[crc32c]' ``` -------------------------------- ### Run integration tests Source: https://github.com/dpkp/kafka-python/blob/master/docs/tests.md Execute integration tests with a specific Kafka version, which automatically handles fixture setup. ```bash KAFKA_VERSION=4.0.0 make test ``` -------------------------------- ### Configure Kafka API Version Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaConsumer.md Examples of tuple values used to specify the Kafka broker API version for feature compatibility. ```python (3, 9) ``` ```python (0, 11) ``` ```python (0, 10, 0) ``` ```python (0, 9) ``` ```python (0, 8, 2) ``` ```python (0, 8, 1) ``` ```python (0, 8, 0) ``` -------------------------------- ### GET /is_ready Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Checks if a node is ready to send more requests, including metadata refresh status. ```APIDOC ## GET /is_ready ### Description Check whether a node is ready to send more requests. ### Parameters #### Query Parameters - **node_id** (int) - Required - Id of the node to check. - **metadata_priority** (bool) - Optional - Mark node as not-ready if a metadata refresh is required. Default: True. ### Response - **returns** (bool) - True if the node is ready and metadata is not refreshing. ``` -------------------------------- ### Produce with Msgpack Serializer Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Configure the producer to automatically serialize message values using Msgpack. Ensure the 'msgpack' library is installed. ```python producer = KafkaProducer(value_serializer=msgpack.dumps) producer.send('msgpack-topic', {'key': 'value'}) ``` -------------------------------- ### Enable System Site Packages in venv Source: https://github.com/dpkp/kafka-python/wiki/kafka-import-error Modify the 'venv/pyvenv.cfg' file to include system site packages. This is necessary when a package installed globally or in another environment is required within the virtual environment, such as when integrating Flask with Kafka. ```text include-system-site-packages = true ``` -------------------------------- ### KafkaProducer Basic Usage Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Sends a message to a Kafka topic. This example shows asynchronous sending of raw bytes. Ensure Kafka is running and the topic exists. ```python from kafka import KafkaProducer from kafka.errors import KafkaError import msgpack import json producer = KafkaProducer(bootstrap_servers=['broker1:1234']) # Asynchronous by default future = producer.send('my-topic', b'raw_bytes') ``` -------------------------------- ### Kafka Consumer CLI Help Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Displays help information for the kafka.consumer command-line interface. Use this to understand available options for consuming messages. ```bash ❯ python -m kafka.consumer --help usage: python -m kafka.consumer [-h] -b BOOTSTRAP_SERVERS -t TOPICS -g GROUP [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] [--encoding ENCODING] Kafka console consumer options: -h, --help show this help message and exit -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS host:port for cluster bootstrap servers -t TOPICS, --topic TOPICS subscribe to topic -g GROUP, --group GROUP consumer group -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG additional configuration properties for kafka consumer -l LOG_LEVEL, --log-level LOG_LEVEL logging level, passed to logging.basicConfig -f FORMAT, --format FORMAT output format: str|raw|full --encoding ENCODING encoding to use for str output decode() ``` -------------------------------- ### Create Kafka Topics (Simple) Source: https://context7.com/dpkp/kafka-python/llms.txt Create multiple Kafka topics with default configurations using KafkaAdminClient. This is a quick way to provision topics when default settings are sufficient. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # Create topics # Simple: list of topic names (uses broker defaults for partitions/replication) admin.create_topics(['new-topic-1', 'new-topic-2']) ``` -------------------------------- ### Create Kafka Topics (Configured) Source: https://context7.com/dpkp/kafka-python/llms.txt Create Kafka topics with specific configurations for partitions, replication factor, and retention policies using KafkaAdminClient. This allows fine-grained control over topic settings. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # With configuration admin.create_topics({ 'configured-topic': { 'num_partitions': 6, 'replication_factor': 3, 'configs': { 'retention.ms': '86400000', # 1 day 'cleanup.policy': 'delete' } }, 'another-topic': { 'num_partitions': 3, 'replication_factor': 2 } }) ``` -------------------------------- ### Kafka Producer CLI Help Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Displays help information for the kafka.producer command-line interface. Use this to understand available options for producing messages. ```bash ❯ python -m kafka.producer --help usage: python -m kafka.producer [-h] -b BOOTSTRAP_SERVERS -t TOPIC [-c EXTRA_CONFIG] [-l LOG_LEVEL] [--encoding ENCODING] Kafka console producer options: -h, --help show this help message and exit -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS host:port for cluster bootstrap servers -t TOPIC, --topic TOPIC publish to topic -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG additional configuration properties for kafka producer -l LOG_LEVEL, --log-level LOG_LEVEL logging level, passed to logging.basicConfig --encoding ENCODING byte encoding for produced messages ``` -------------------------------- ### subscription Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaConsumer.md Get the current topic subscription. ```APIDOC ## subscription() ### Description Get the current topic subscription. ### Method GET ### Endpoint /subscription ### Parameters None ### Response #### Success Response (200) * **topics** (set) - A set of currently subscribed topics. ### Response Example ```json { "topics": ["topic1", "topic2"] } ``` ``` -------------------------------- ### topics Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaConsumer.md Get all topics the user is authorized to view. ```APIDOC ## topics() ### Description Get all topics the user is authorized to view. This will always issue a remote call to the cluster to fetch the latest information. ### Method GET ### Endpoint /topics ### Parameters None ### Response #### Success Response (200) * **topics** (set) - A set of all available topics. ### Response Example ```json { "topics": ["topicA", "topicB", "topicC"] } ``` ``` -------------------------------- ### Kafka Admin CLI Help Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Displays help information for the kafka.admin command-line interface. Use this to manage Kafka cluster, configurations, topics, and consumer groups. ```bash ❯ python -m kafka.admin --help usage: python -m kafka.admin [-h] -b BOOTSTRAP_SERVERS [-c EXTRA_CONFIG] [-l LOG_LEVEL] [-f FORMAT] {cluster,configs,log-dirs,topics,consumer-groups} ... Kafka admin client positional arguments: {cluster,configs,log-dirs,topics,consumer-groups} subcommands cluster Manage Kafka Cluster configs Manage Kafka Configuration log-dirs Manage Kafka Topic/Partition Log Directories topics List/Describe/Create/Delete Kafka Topics consumer-groups Manage Kafka Consumer Groups options: -h, --help show this help message and exit -b BOOTSTRAP_SERVERS, --bootstrap-servers BOOTSTRAP_SERVERS host:port for cluster bootstrap servers -c EXTRA_CONFIG, --extra-config EXTRA_CONFIG additional configuration properties for admin client -l LOG_LEVEL, --log-level LOG_LEVEL logging level, passed to logging.basicConfig -f FORMAT, --format FORMAT output format: raw|json ``` -------------------------------- ### Create Kafka Topics (Manual Assignment) Source: https://context7.com/dpkp/kafka-python/llms.txt Create Kafka topics with manual replica assignments for each partition using KafkaAdminClient. This is useful for controlling data locality and fault tolerance at a granular level. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # Create with manual replica assignment admin.create_topics({ 'manual-assignment-topic': { 'assignments': { 0: [0, 1, 2], # Partition 0 on brokers 0, 1, 2 1: [1, 2, 0], # Partition 1 on brokers 1, 2, 0 2: [2, 0, 1] # Partition 2 on brokers 2, 0, 1 } } }) ``` -------------------------------- ### GET /least_loaded_node Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Selects the node with the fewest outstanding requests. ```APIDOC ## GET /least_loaded_node ### Description Choose the node with fewest outstanding requests, with fallbacks. ### Parameters #### Query Parameters - **bootstrap_fallback** (bool) - Optional - Whether to fallback to bootstrap nodes. Default: False. ### Response - **returns** (int) - node_id or None if no suitable node was found. ``` -------------------------------- ### Clone and Navigate to Snappy Repository Source: https://github.com/dpkp/kafka-python/wiki/Home Clone the python-snappy library from its GitHub repository and navigate into the directory. ```bash git clone https://github.com/andrix/python-snappy cd python-snappy ``` -------------------------------- ### GET /is_disconnected Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Checks if a specific node connection has been disconnected or failed. ```APIDOC ## GET /is_disconnected ### Description Check whether the node connection has been disconnected or failed. ### Parameters #### Query Parameters - **node_id** (int) - Required - The id of the node to check. ### Response - **returns** (bool) - True if the node exists and is disconnected. ``` -------------------------------- ### Get Broker Metadata Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Retrieve metadata for all brokers in the cluster or for a specific broker by its ID. ```python # get all brokers metadata print(clusterMetadata.brokers()) # get specific broker metadata print(clusterMetadata.broker_metadata('bootstrap-0')) ``` -------------------------------- ### Basic KafkaProducer Usage Source: https://github.com/dpkp/kafka-python/blob/master/docs/index.md Instantiate a KafkaProducer to send messages asynchronously to a topic. ```python from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:1234') for _ in range(100): producer.send('foobar', b'some_message_bytes') ``` -------------------------------- ### GET /in_flight_request_count Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Retrieves the number of in-flight requests for a specific node or the total across all nodes. ```APIDOC ## GET /in_flight_request_count ### Description Get the number of in-flight requests for a node or all nodes. ### Parameters #### Query Parameters - **node_id** (int) - Optional - A specific node to check. If unspecified, return the total for all nodes. ### Response - **returns** (int) - Pending in-flight requests for the node, or all nodes if None. ``` -------------------------------- ### Build Snappy Python Extension Source: https://github.com/dpkp/kafka-python/wiki/Home Build the C extension for the python-snappy library after downloading or cloning the source. ```bash python setup.py build ``` -------------------------------- ### KafkaProducer Transactions Source: https://github.com/dpkp/kafka-python/blob/master/docs/index.md Use transactions for atomic message production. Includes examples for committing and aborting transactions. ```python # Use transactions producer = KafkaProducer(transactional_id='fizzbuzz') producer.init_transactions() producer.begin_transaction() future = producer.send('txn_topic', value=b'yes') future.get() # wait for successful produce producer.commit_transaction() # commit the transaction producer.begin_transaction() future = producer.send('txn_topic', value=b'no') future.get() # wait for successful produce producer.abort_transaction() # abort the transaction ``` -------------------------------- ### Manage Kafka Partitions with KafkaAdminClient Source: https://context7.com/dpkp/kafka-python/llms.txt Use KafkaAdminClient to create partitions, describe topic metadata, manage reassignments, elect leaders, and delete records. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient(bootstrap_servers=['localhost:9092']) # Add partitions to existing topic admin.create_partitions({ 'my-topic': {'new_total_count': 10} # Increase to 10 partitions total }) # Add partitions with specific replica assignment admin.create_partitions({ 'my-topic': { 'new_total_count': 12, 'new_assignments': [ [0, 1], # New partition 10 on brokers 0, 1 [1, 2] # New partition 11 on brokers 1, 2 ] } }) # Describe partitions for a topic topics = admin.describe_topics(['my-topic']) for topic in topics: print(f"Topic: {topic['name']}") for partition in topic['partitions']: print(f" Partition {partition['partition_index']}:") print(f" Leader: {partition['leader_id']}") print(f" Replicas: {partition['replica_nodes']}") print(f" ISR: {partition['isr_nodes']}") # List partition reassignments in progress reassignments = admin.list_reassignments() print(f"Ongoing reassignments: {reassignments}") # Alter partition reassignments admin.alter_partition_reassignments({ TopicPartition('my-topic', 0): [1, 2, 3], # Move partition 0 to brokers 1, 2, 3 TopicPartition('my-topic', 1): [2, 3, 1] # Move partition 1 to brokers 2, 3, 1 }) # Elect preferred leaders for partitions admin.elect_leaders([ TopicPartition('my-topic', 0), TopicPartition('my-topic', 1) ]) # Delete records (truncate topic up to specified offset) admin.delete_records({ TopicPartition('my-topic', 0): 1000, # Delete records before offset 1000 TopicPartition('my-topic', 1): 500 }) admin.close() ``` -------------------------------- ### Download and Extract Snappy Source Source: https://github.com/dpkp/kafka-python/wiki/Home Download the python-snappy library source archive and extract it. ```bash wget https://files.pythonhosted.org/packages/45/35/65d9f8cc537129894b4b32647d80212d1fa342877581c5b8a69872cea8be/python-snappy-0.5.4.tar.gz tar xzvf python-snappy-0.5.4.tar.gz cd python-snappy-0.5.4 ``` -------------------------------- ### KafkaProducer Send with Blocking Source: https://github.com/dpkp/kafka-python/blob/master/docs/index.md Block until a single message is sent or a timeout occurs. The get() method on the future object handles this. ```python # Block until a single message is sent (or timeout) future = producer.send('foobar', b'another_message') result = future.get(timeout=60) ``` -------------------------------- ### KafkaProducer Initialization Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaProducer.md Initializes a new KafkaProducer instance to publish records to a Kafka cluster. ```APIDOC ## CLASS KafkaProducer ### Description A Kafka client that publishes records to the Kafka cluster. The producer is thread-safe and uses a background I/O thread for transmitting records. ### Parameters #### Keyword Arguments - **bootstrap_servers** (string/list) - Required - 'host[:port]' string or list of strings that the producer should contact to bootstrap initial connections. - **acks** (string) - Optional - Controls the criteria under which requests are considered complete (e.g., 'all'). - **retries** (int) - Optional - Number of times to automatically retry failed requests. - **batch_size** (int) - Optional - Size of the buffer for unsent records per partition. - **linger_ms** (int) - Optional - Time in milliseconds to wait before sending a request to allow for more batching. - **key_serializer** (callable) - Optional - Function to convert key objects into bytes. - **value_serializer** (callable) - Optional - Function to convert value objects into bytes. - **enable_idempotence** (bool) - Optional - Enables idempotent producer mode for exactly-once delivery. - **transactional_id** (string) - Optional - Unique identifier to enable transactional producer features. ``` -------------------------------- ### Initialize ClusterMetadata Source: https://github.com/dpkp/kafka-python/blob/master/docs/usage.md Create a ClusterMetadata instance to fetch information about the Kafka cluster. Provide bootstrap server addresses. ```python from kafka.cluster import ClusterMetadata clusterMetadata = ClusterMetadata(bootstrap_servers=['broker1:1234']) ``` -------------------------------- ### Initialize kafka.BrokerConnection Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/BrokerConnection.md Initializes a new connection to a Kafka broker with specified host, port, and configuration options. ```APIDOC ## class kafka.BrokerConnection ### Description Initialize a Kafka broker connection with specific network and security configurations. ### Parameters #### Path Parameters - **host** (str) - Required - The hostname or IP address of the Kafka broker. - **port** (int) - Required - The port number of the Kafka broker. - **afi** (int) - Required - Address family identifier for the socket. #### Keyword Arguments - **client_id** (str) - Optional - A name for this client used for logging and identification. Default: 'kafka-python-{version}' - **client_software_name** (str) - Optional - Sent to kafka broker for KIP-511. Default: 'kafka-python' - **reconnect_backoff_ms** (int) - Optional - Time in milliseconds to wait before reconnecting. Default: 50 - **request_timeout_ms** (int) - Optional - Client request timeout in milliseconds. Default: 30000 - **security_protocol** (str) - Optional - Protocol used to communicate (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL). Default: PLAINTEXT - **ssl_context** (ssl.SSLContext) - Optional - Pre-configured SSLContext for wrapping socket connections. - **api_version** (tuple) - Optional - Specify which Kafka API version to use. ``` -------------------------------- ### Kafka Client Configuration Parameters Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaProducer.md A comprehensive list of configuration parameters for initializing the Kafka client, covering network, security, and performance settings. ```APIDOC ## Kafka Client Configuration ### Description Configuration parameters for the Kafka client instance, defining how the client connects to brokers, handles security, and manages internal metrics. ### Parameters - **socket_options** (list) - Optional - List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. - **reconnect_backoff_ms** (int) - Optional - Time in milliseconds to wait before attempting to reconnect to a host. - **reconnect_backoff_max_ms** (int) - Optional - Maximum time in milliseconds to backoff when reconnecting to a broker. - **max_in_flight_requests_per_connection** (int) - Optional - Maximum number of requests to pipeline per broker connection. - **security_protocol** (str) - Optional - Protocol used to communicate with brokers (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL). - **ssl_context** (ssl.SSLContext) - Optional - Pre-configured SSLContext for wrapping socket connections. - **ssl_check_hostname** (bool) - Optional - Flag to configure whether ssl handshake should verify the certificate matches the broker hostname. - **ssl_cafile** (str) - Optional - Filename of CA file for certificate verification. - **ssl_certfile** (str) - Optional - Filename of client certificate in PEM format. - **ssl_keyfile** (str) - Optional - Filename containing the client private key. - **ssl_password** (str) - Optional - Password for loading the certificate chain. - **ssl_crlfile** (str) - Optional - Filename containing the CRL to check for certificate expiration. - **ssl_ciphers** (str) - Optional - OpenSSL cipher list format string for SSL connections. - **api_version** (tuple) - Optional - Specify which Kafka API version to use (e.g., (0, 10, 0)). - **api_version_auto_timeout_ms** (int) - Optional - Timeout in milliseconds for checking broker API version. - **metric_reporters** (list) - Optional - List of classes to use as metrics reporters. - **metrics_enabled** (bool) - Optional - Whether to track metrics on this instance. - **metrics_num_samples** (int) - Optional - Number of samples maintained to compute metrics. - **metrics_sample_window_ms** (int) - Optional - Maximum age in milliseconds of samples used to compute metrics. - **selector** (selectors.BaseSelector) - Optional - Specific selector implementation for I/O multiplexing. - **sasl_mechanism** (str) - Optional - Authentication mechanism (PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512). ``` -------------------------------- ### Import KafkaProducer in Python Source: https://github.com/dpkp/kafka-python/wiki/kafka-import-error This import statement is used to access the KafkaProducer class from the kafka-python library. Ensure the 'kafka' module is correctly installed and accessible within your Python environment. ```python from kafka import KafkaProducer ``` -------------------------------- ### KafkaClient Configuration Options Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Configuration parameters for initializing the KafkaClient. ```APIDOC ## KafkaClient Initialization Parameters ### Description Parameters used when initializing the KafkaClient for establishing connections to Kafka brokers. ### Parameters #### Request Body - **api_version** (tuple) - Specify which Kafka API version to use. If set to None, the client will attempt to determine the broker version. Default: None. - **api_version_auto_timeout_ms** (int) - Number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version set to None. Default: 2000. - **selector** (selectors.BaseSelector) - Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector. - **metrics** (kafka.metrics.Metrics) - Optionally provide a metrics instance for capturing network IO stats. Default: None. - **metric_group_prefix** (str) - Prefix for metric names. Default: '' - **sasl_mechanism** (str) - Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. - **sasl_plain_username** (str) - Username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - **sasl_plain_password** (str) - Password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. - **sasl_kerberos_name** (str or gssapi.Name) - Constructed gssapi.Name for use with sasl mechanism handshake. Default: None. - **sasl_kerberos_service_name** (str) - Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’ - **sasl_kerberos_domain_name** (str) - Kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers - **sasl_oauth_token_provider** (kafka.sasl.oauth.AbstractTokenProvider) - OAuthBearer token provider instance. Default: None - **socks5_proxy** (str) - Socks5 proxy URL. Default: None ### Note An `ssl.SSLError` will be raised if SSL is misconfigured. See `ssl.SSLContext.set_ciphers` for more details. ``` -------------------------------- ### Describe Consumer Groups Source: https://context7.com/dpkp/kafka-python/llms.txt Get detailed information about specific consumer groups, including their state, protocol, coordinator, and members, using KafkaAdminClient. This is useful for debugging consumer issues. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient(bootstrap_servers=['localhost:9092']) # Describe specific consumer groups group_descriptions = admin.describe_groups(['my-consumer-group']) for group_id, description in group_descriptions.items(): print(f"\nGroup: {group_id}") print(f" State: {description['state']}") print(f" Protocol: {description['protocol_type']}") print(f" Coordinator: {description.get('coordinator')}") print(f" Members:") for member in description['members']: print(f" - {member['member_id']}") print(f" Client: {member['client_id']} @ {member['client_host']}") if member.get('member_assignment'): assignment = member['member_assignment'] print(f" Assigned partitions: {assignment.get('assignment', [])}") ``` -------------------------------- ### Create Kafka Topics (Wait for Metadata) Source: https://context7.com/dpkp/kafka-python/llms.txt Create a Kafka topic and wait for its metadata to be available in the cluster using KafkaAdminClient. This ensures the topic is ready for use immediately after creation. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # Wait for topic to be ready in metadata admin.create_topics({'ready-topic': {'num_partitions': 3, 'replication_factor': 1}}, wait_for_metadata=True) ``` -------------------------------- ### KafkaAdminClient Initialization Parameters Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaAdminClient.md Configuration options for initializing the KafkaAdminClient, covering API versions, timeouts, and SASL authentication settings. ```APIDOC ## KafkaAdminClient Configuration ### Parameters - **api_version** (tuple) - Optional - Specify which Kafka API version to use. Default: None. - **bootstrap_timeout_ms** (int) - Optional - Milliseconds to throw a timeout exception during bootstrapping. Default: 2000. - **selector** (selectors.BaseSelector) - Optional - Specific selector implementation for I/O multiplexing. Default: selectors.DefaultSelector. - **metrics** (kafka.metrics.Metrics) - Optional - Metrics instance for capturing network IO stats. Default: None. - **metric_group_prefix** (str) - Optional - Prefix for metric names. Default: ''. - **sasl_mechanism** (str) - Optional - Authentication mechanism (PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512). - **sasl_plain_username** (str) - Required if SASL mechanism is PLAIN or SCRAM - Username for authentication. - **sasl_plain_password** (str) - Required if SASL mechanism is PLAIN or SCRAM - Password for authentication. - **sasl_kerberos_name** (str or gssapi.Name) - Optional - GSSAPI name for handshake. Default: None. - **sasl_kerberos_service_name** (str) - Optional - Service name for GSSAPI handshake. Default: 'kafka'. - **sasl_kerberos_domain_name** (str) - Optional - Kerberos domain name for GSSAPI handshake. Default: bootstrap servers. - **sasl_oauth_token_provider** (kafka.sasl.oauth.AbstractTokenProvider) - Optional - OAuthBearer token provider instance. Default: None. - **socks5_proxy** (str) - Optional - Socks5 proxy URL. Default: None. - **kafka_client** (callable) - Optional - Custom class/callable for creating KafkaClient instances. ``` -------------------------------- ### Join a consumer group Source: https://github.com/dpkp/kafka-python/blob/master/README.rst Configure a consumer group for dynamic partition assignment and offset commits. ```python # join a consumer group for dynamic partition assignment and offset commits from kafka import KafkaConsumer consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group') for msg in consumer: print (msg) ``` -------------------------------- ### KafkaProducer Send with Key for Partitioning Source: https://github.com/dpkp/kafka-python/blob/master/docs/index.md Use a key with the send method for hashed-partitioning of messages. ```python # Use a key for hashed-partitioning producer.send('foobar', key=b'foo', value=b'bar') ``` -------------------------------- ### List Kafka Topics Source: https://context7.com/dpkp/kafka-python/llms.txt Use KafkaAdminClient to list all available topics in the Kafka cluster. This requires an initialized KafkaAdminClient instance. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # List all topics topics = admin.list_topics() print(f"Topics: {topics}") ``` -------------------------------- ### KafkaAdminClient Initialization Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaAdminClient.md Initializes a new instance of the KafkaAdminClient to interact with the Kafka cluster. ```APIDOC ## KafkaAdminClient ### Description A class for administering the Kafka cluster. ### Constructor `kafka.KafkaAdminClient(**configs)` ### Parameters #### Configuration Parameters - **configs** (dict) - Required - A dictionary of configuration parameters for the Kafka client. ``` -------------------------------- ### class kafka.KafkaClient Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaClient.md Initializes the KafkaClient for handling asynchronous network I/O. ```APIDOC ## class kafka.KafkaClient ### Description A network client for asynchronous request/response network I/O. This is an internal class used to implement the user-facing producer and consumer clients. Note: This class is not thread-safe. ### Parameters #### Constructor Parameters - **configs** (dict) - Required - Configuration parameters for the Kafka client. ``` -------------------------------- ### Configure SSLContext for KafkaConsumer Source: https://context7.com/dpkp/kafka-python/llms.txt Use a pre-configured ssl.SSLContext to manage client certificates and CA files for secure connections. ```python import ssl ssl_context = ssl.create_default_context(cafile='/path/to/ca.pem') ssl_context.load_cert_chain(certfile='/path/to/client-cert.pem', keyfile='/path/to/client-key.pem') consumer = KafkaConsumer( 'secure-topic', bootstrap_servers=['kafka.example.com:9093'], security_protocol='SSL', ssl_context=ssl_context ) ``` -------------------------------- ### Basic Kafka Consumer with Auto-Commit Source: https://context7.com/dpkp/kafka-python/llms.txt Demonstrates basic message consumption using KafkaConsumer with automatic offset commits and consumer group balancing. Ensure 'my-topic' exists and Kafka is running on 'localhost:9092'. ```python from kafka import KafkaConsumer, TopicPartition # Basic consumer with consumer group for auto-commit and load balancing consumer = KafkaConsumer( 'my-topic', bootstrap_servers=['localhost:9092'], group_id='my-consumer-group', auto_offset_reset='earliest', # Start from earliest message if no committed offset enable_auto_commit=True, auto_commit_interval_ms=5000 ) # Iterate over messages (blocks until messages available or consumer_timeout_ms) for message in consumer: print(f"Topic: {message.topic}") print(f"Partition: {message.partition}") print(f"Offset: {message.offset}") print(f"Key: {message.key}") print(f"Value: {message.value.decode('utf-8')}") print(f"Timestamp: {message.timestamp}") print(f"Headers: {message.headers}") ``` -------------------------------- ### Manage KafkaConsumer Offsets Source: https://context7.com/dpkp/kafka-python/llms.txt Demonstrates manual offset committing, querying partition positions, and seeking offsets by timestamp. ```python from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata consumer = KafkaConsumer( 'my-topic', bootstrap_servers=['localhost:9092'], group_id='manual-commit-group', enable_auto_commit=False, # Disable auto-commit for manual control auto_offset_reset='earliest' ) try: for message in consumer: # Process message print(f"Processing: {message.value}") # Commit offset for processed message (synchronous) consumer.commit({ TopicPartition(message.topic, message.partition): OffsetAndMetadata(message.offset + 1, None) }) # Alternative: Commit current positions for all assigned partitions consumer.commit() # Async commit with callback def commit_callback(offsets, response): if isinstance(response, Exception): print(f"Commit failed: {response}") else: print(f"Committed offsets: {offsets}") consumer.commit_async(callback=commit_callback) except Exception as e: print(f"Error: {e}") finally: consumer.close(autocommit=False) # Query committed offsets and positions consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], group_id='my-group') consumer.subscribe(['my-topic']) consumer.poll(timeout_ms=0) # Trigger assignment for tp in consumer.assignment(): committed = consumer.committed(tp) position = consumer.position(tp) highwater = consumer.highwater(tp) print(f"{tp}: committed={committed}, position={position}, highwater={highwater}") # Get beginning and end offsets partitions = [TopicPartition('my-topic', 0)] beginning = consumer.beginning_offsets(partitions) end = consumer.end_offsets(partitions) print(f"Beginning: {beginning}, End: {end}") # Seek by timestamp (find offset for messages after specific time) import time timestamps = {TopicPartition('my-topic', 0): int(time.time() * 1000) - 3600000} # 1 hour ago offsets = consumer.offsets_for_times(timestamps) print(f"Offsets for timestamp: {offsets}") ``` -------------------------------- ### Kafka Producer Configuration Options Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaProducer.md Detailed explanations of available configuration parameters for the Kafka producer. ```APIDOC ## Kafka Producer Configuration This section outlines the configuration options available for the Kafka producer. ### `retries` * **Type**: `int` * **Default**: `float('inf')` * **Description**: The number of times the producer will retry to send a record upon receiving an error. Note that retries without `max_in_flight_requests_per_connection=1` can alter record ordering. Produce requests can also fail before retries are exhausted if `delivery_timeout_ms` expires. It's generally recommended to use `delivery_timeout_ms` to control retry behavior. ### `batch_size` * **Type**: `int` * **Default**: `16384` * **Description**: The size of batches sent to brokers. Each batch contains data for a single partition. A smaller batch size reduces batching and may lower throughput. A batch size of 0 disables batching. ### `linger_ms` * **Type**: `int` * **Default**: `0` * **Description**: The maximum time (in milliseconds) the producer will wait to gather more records for a batch before sending. This helps reduce the number of requests by batching records that arrive within this delay. Setting this to a non-zero value adds latency but can improve throughput under moderate load. ### `partitioner` * **Type**: `callable` * **Default**: None * **Description**: A callable function used to determine the partition for each message. It's called with `partitioner(key_bytes, all_partitions, available_partitions)`. The default partitioner uses murmur2 hashing for keyed messages to ensure messages with the same key go to the same partition. Unkeyed messages are sent to a random partition. ### `connections_max_idle_ms` * **Type**: `int` * **Default**: `540000` * **Description**: The duration in milliseconds after which idle connections are closed. This prevents unexpected socket disconnection errors by closing connections before the broker does. ### `max_block_ms` * **Type**: `int` * **Default**: `60000` * **Description**: The maximum time (in milliseconds) that `send()` and `partitions_for()` methods will block. Blocking can occur if the buffer is full or metadata is unavailable. This timeout does not include time spent in user-supplied serializers or partitioners. ### `max_request_size` * **Type**: `int` * **Default**: `1048576` * **Description**: The maximum size of a request, which also acts as a cap on the maximum record size. This limits the number of record batches sent in a single request to prevent excessively large requests. Note that the server may have its own record size limit. ### `allow_auto_create_topics` * **Type**: `bool` * **Default**: `True` * **Description**: Enables or disables automatic topic creation upon metadata requests. This feature is available for `api_version >= (0, 11)`. ### `metadata_max_age_ms` * **Type**: `int` * **Default**: `300000` * **Description**: The period in milliseconds after which metadata is forcibly refreshed, even if no leadership changes have occurred. This proactively discovers new brokers or partitions. ### `retry_backoff_ms` * **Type**: `int` * **Default**: `100` * **Description**: The backoff time in milliseconds when retrying on errors. ### `request_timeout_ms` * **Type**: `int` * **Default**: `30000` * **Description**: The client request timeout in milliseconds. ### `receive_buffer_bytes` * **Type**: `int` * **Default**: `None` (relies on system defaults) * **Description**: The size of the TCP receive buffer (SO_RCVBUF) used for reading data. The Java client defaults to 32768. ### `send_buffer_bytes` * **Type**: `int` * **Default**: `None` (relies on system defaults) * **Description**: The size of the TCP send buffer (SO_SNDBUF) used for sending data. ``` -------------------------------- ### KafkaConsumer Configuration Parameters Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaConsumer.md A comprehensive list of configuration parameters used to initialize and tune the KafkaConsumer client. ```APIDOC ## KafkaConsumer Configuration ### Description Configuration parameters for the KafkaConsumer client to manage connections, offsets, and group behavior. ### Parameters - **max_in_flight_requests_per_connection** (int) - Optional - Requests are pipelined to kafka brokers up to this number. Default: 5. - **auto_offset_reset** (str) - Optional - Policy for resetting offsets on OffsetOutOfRange errors ('earliest', 'latest'). Default: 'latest'. - **enable_auto_commit** (bool) - Optional - If True, the consumer’s offset will be periodically committed. Default: True. - **auto_commit_interval_ms** (int) - Optional - Milliseconds between automatic offset commits. Default: 5000. - **default_offset_commit_callback** (callable) - Optional - Callback triggered when a commit request completes. - **check_crcs** (bool) - Optional - Automatically check the CRC32 of records. Default: True. - **isolation_level** (str) - Optional - Configure transactional consumer ('read_committed', 'read_uncommitted'). Default: 'read_uncommitted'. - **allow_auto_create_topics** (bool) - Optional - Enable/disable auto topic creation on metadata request. Default: True. - **metadata_max_age_ms** (int) - Optional - Period in milliseconds to force a metadata refresh. Default: 300000. - **partition_assignment_strategy** (list) - Optional - List of objects to distribute partition ownership. - **max_poll_records** (int) - Optional - Maximum number of records returned in a single poll() call. Default: 500. - **max_poll_interval_ms** (int) - Optional - Maximum delay between poll() invocations. Default: 300000. - **session_timeout_ms** (int) - Optional - Timeout used to detect consumer failures. Default: 10000. - **heartbeat_interval_ms** (int) - Optional - Expected time between heartbeats. Default: 3000. - **receive_buffer_bytes** (int) - Optional - Size of the TCP receive buffer. - **send_buffer_bytes** (int) - Optional - Size of the TCP send buffer. - **socket_options** (list) - Optional - List of socket.setsockopt arguments. - **consumer_timeout_ms** (int) - Optional - Milliseconds to block during message iteration before raising StopIteration. ``` -------------------------------- ### Configure Serialization and Callbacks Source: https://context7.com/dpkp/kafka-python/llms.txt Configures custom serializers for JSON and Msgpack, and implements asynchronous callbacks for success and error handling. ```python from kafka import KafkaProducer import json import msgpack # JSON serialization producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None ) # Send Python dict as JSON producer.send('json-topic', key='user-123', value={'event': 'login', 'timestamp': 1234567890}) # Msgpack serialization producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=msgpack.dumps ) producer.send('msgpack-topic', value={'key': 'value'}) # Callbacks for async result handling def on_send_success(record_metadata): print(f"Message delivered to {record_metadata.topic}[{record_metadata.partition}] @ {record_metadata.offset}") def on_send_error(excp): print(f"Message delivery failed: {excp}") producer = KafkaProducer(bootstrap_servers=['localhost:9092']) # Chain callbacks to Future for i in range(100): producer.send('my-topic', value=f'message-{i}'.encode()) \ .add_callback(on_send_success) \ .add_errback(on_send_error) # Wait for all messages to be sent producer.flush() producer.close() ``` -------------------------------- ### Create Kafka Topics (Validation Only) Source: https://context7.com/dpkp/kafka-python/llms.txt Perform a dry run to validate topic creation parameters without actually creating the topic using KafkaAdminClient. This is useful for checking configurations before deployment. ```python from kafka import KafkaAdminClient admin = KafkaAdminClient( bootstrap_servers=['localhost:9092'], client_id='admin-client' ) # Create with validation only (dry run) result = admin.create_topics(['test-topic'], validate_only=True) print(f"Validation result: {result}") ``` -------------------------------- ### Manual Partition Assignment Source: https://github.com/dpkp/kafka-python/blob/master/docs/apidoc/KafkaConsumer.md Methods for manually assigning partitions to a consumer. ```APIDOC ## Manual Partition Assignment ### Description Methods for manually assigning partitions to a consumer. ### assign(partitions) Manually assign a list of TopicPartitions to this consumer. ### Parameters: - **partitions** (list of TopicPartition) - Assignment for this instance. ### Raises: - **IllegalStateError** - If consumer has already called subscribe. ### WARNING It is not possible to use both manual partition assignment with `assign()` and group assignment with `subscribe()`. ### NOTE This interface does not support incremental assignment and will replace the previous assignment (if there was one). ### NOTE Manual topic assignment through this method does not use the consumer’s group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change. ### assignment() Get the TopicPartitions currently assigned to this consumer. If partitions were directly assigned using `assign()`, then this will simply return the same partitions that were previously assigned. If topics were subscribed using `subscribe()`, then this will give the set of topic partitions currently assigned to the consumer (which may be None if the assignment hasn’t happened yet, or if the partitions are in the process of being reassigned). ### Returns: - **TopicPartition, …** (set) - The set of TopicPartitions currently assigned to this consumer. ```