### Install Confluent Kafka Go Client Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md Demonstrates the command to manually install or update the confluent-kafka-go client using the go get command. ```bash go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka ``` -------------------------------- ### Run Transactional Kafka Example Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/transactions_example/README.md Instructions to build and run the transactional Kafka example application. It requires building the Go binary and executing it with a broker address. The example then waits for output to be visualized. ```shell go build ./transactions_example $MY_BROKERS ``` -------------------------------- ### Run Consumer Example Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/README.md Demonstrates how to build and run a basic Kafka consumer application. It shows the command-line arguments required for specifying broker, group, and topic. ```shell cd consumer_example go build (or 'go install') ./consumer_example # see usage ./consumer_example mybroker mygroup mytopic ``` -------------------------------- ### Run Full Go Test Suite Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Command to execute the entire test suite, including unit, benchmark, and integration tests, across all packages. This requires a configured Kafka cluster and `testconf.json`. ```Shell $ go test ./... ``` -------------------------------- ### Create and Push Git Tag for Release Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Git commands to create a version tag (e.g., `v1.3.0`) and then push it to the remote repository. A dry-run is recommended first to verify the command before actual execution. ```Shell $ git tag v1.3.0 $ git push --dry-run origin v1.3.0 ``` -------------------------------- ### Install librdkafka Dependencies on kafkatest VMs Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafkatest/README.md Installs necessary dependencies for librdkafka on running vagrant VMs. These packages include SSL libraries, SASL modules, compression libraries (lz4), and zlib, which are crucial for librdkafka's functionality. ```shell cd ~/src/kafka # your Kafka git checkout for n in $(vagrant status | grep running | awk '{print $1}') ; do \ vagrant ssh $n -c 'sudo apt-get install -y libssl1.0.0 libsasl2-modules-gssapi-mit liblz4-1 zlib1g' ; done ``` -------------------------------- ### Install librdkafka on MacOS Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md On MacOS, install librdkafka using Homebrew. It's also recommended to install pkg-config if it's not already present, as it's often a dependency for building libraries. ```bash brew install librdkafka pkg-config ``` -------------------------------- ### Install librdkafka on Alpine Linux Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md For Alpine Linux, install the necessary librdkafka development files and pkgconf using the apk package manager. ```bash apk add librdkafka-dev pkgconf ``` -------------------------------- ### Install librdkafka on Debian/Ubuntu Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md For Debian and Ubuntu-based distributions, install the librdkafka development package from the standard repositories or Confluent's APT repository. ```bash sudo apt install librdkafka-dev ``` -------------------------------- ### Install librdkafka on Red Hat/CentOS Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md For Red Hat-based distributions, install the librdkafka development package using Confluent's YUM repository. ```bash sudo yum install librdkafka-devel ``` -------------------------------- ### Build librdkafka from Source Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md If the bundled librdkafka is not supported or GSSAPI/Kerberos support is needed, you can build librdkafka from its source code. This involves cloning the repository, configuring the build, compiling, and installing. ```bash git clone https://github.com/confluentinc/librdkafka.git cd librdkafka ./configure make sudo make install ``` -------------------------------- ### Golang Kafka Producer Example Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md Illustrates how to create a Kafka producer, set up a delivery report handler for asynchronous message production, and send messages to a specified topic. It includes flushing messages before closing. ```golang import ( "fmt" "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) func main() { p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) if err != nil { panic(err) } defer p.Close() // Delivery report handler for produced messages go func() { for e := range p.Events() { switch ev := e.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // Produce messages to topic (asynchronously) topic := "myTopic" for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} { p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(word), }, nil) } // Wait for message deliveries before shutting down p.Flush(15 * 1000) } ``` -------------------------------- ### GSSAPI/Kerberos Support Build Tag Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md If your application requires GSSAPI/Kerberos authentication, you must install librdkafka separately and then build your Go application with the '-tags dynamic' flag. This enables dynamic linking with the system's librdkafka installation. ```bash go build -tags dynamic ``` -------------------------------- ### Initialize Confluent Kafka Go Soak Test Project Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/soaktest/README.md Commands to initialize the Go module and dependencies for the Kafka soak test project. This sets up the Go environment for the test applications. ```shell cd soaktest go mod init soaktest go mod tidy go install soaktest ``` -------------------------------- ### Run kafkatests using Go Client Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafkatest/README.md Executes the kafkatests suite using the Go clients. This involves sourcing a virtual environment, synchronizing Go clients to worker instances via rsync, and launching the ducktape test runner with specified global configuration. ```shell cd ~/src/kafka # your Kafka git checkout source ~/src/venv2.7/bin/activate # your virtualenv containing ducktape vagrant rsync # to copy go_verifiable_* clients to worker instances ducktape --debug tests/kafkatest/tests/client --globals $GOPATH/src/github.com/confluentinc/confluent-kafka-go/kafkatest/globals.json ``` -------------------------------- ### Build Go Clients with Statically Linked librdkafka Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafkatest/README.md Builds the Go verifiable producer and consumer clients using the 'static' build tag. This ensures librdkafka is statically linked into the Go binaries, preparing them for integration with the kafkatest suite. ```shell mkdir ~/src/kafka/tests/go cd go_verifiable_consumer go build -tags static cp go_verifiable_producer ~/src/kafka/tests/go cd go_verifiable_consumer go build -tags static cp go_verifiable_consumer ~/src/kafka/tests/go ``` -------------------------------- ### Consumer.Position: Get Current Consume Position Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Retrieves the current consumption position (the offset of the next message to be read) for a given list of partitions. Typically used after calling Assignment() to get the list of assigned partitions. ```APIDOC Consumer.Position: Position(partitions []TopicPartition) ([]TopicPartition, error) - Returns the current consume position for the given partitions. - Parameters: - partitions: A slice of TopicPartition structs for which to get the position. - Returns: - offsets: A slice of TopicPartition structs, where the Offset field indicates the next message offset to read for each partition. - err: An error if the operation fails, or nil on success. - Usage: Call Assignment() to get partitions, then pass them to Position(). The offset is typically the last message seen + 1. ``` -------------------------------- ### Import Static librdkafka Bundle Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/librdkafka_vendor/README.md Imports a prebuilt static librdkafka bundle into the confluent-kafka-go project using the import.sh script. This process involves creating a branch, importing the bundle, committing changes, and pushing for review. It copies the static library and header file, and generates build files. ```shell ./import.sh ~/path/to/librdkafka-static-bundle-v1.4.0.tgz ``` -------------------------------- ### Rebuild and Run Full Test Suite Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Commands to clean the build, rebuild the project, and run the complete test suite. Ensure `kafka/testconf.json` is correctly configured. ```shell go clean -i ./... go build ./... go test ./... ``` -------------------------------- ### confluent-kafka-go AdminClient Methods Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides a consolidated view of administrative operations available through the AdminClient in the confluent-kafka-go library. This includes methods for retrieving cluster and controller IDs, creating topics with specified configurations, creating partitions for existing topics, and managing Access Control Lists (ACLs). Each method details its parameters, return values, and any specific requirements or notes. ```APIDOC AdminClient Methods: ClusterID: func (a *AdminClient) ClusterID(ctx context.Context) (clusterID string, err error) - Returns the cluster ID as reported in broker metadata. - Note: The underlying C function respects the timeout but cannot be manually cancelled; context cancellation will block until the C function returns. - Requires broker version >= 0.10.0. ControllerID: func (a *AdminClient) ControllerID(ctx context.Context) (controllerID int32, err error) - Returns the broker ID of the current controller as reported in broker metadata. - Note: The underlying C function respects the timeout but cannot be manually cancelled; context cancellation will block until the C function returns. - Requires broker version >= 0.10.0. CreateACLs: func (a *AdminClient) CreateACLs(ctx context.Context, aclBindings ACLBindings, options ...CreateACLsAdminOption) (result []CreateACLResult, err error) - Creates one or more ACL bindings. - Parameters: - ctx: context with the maximum amount of time to block, or nil for indefinite. - aclBindings: A slice of ACL binding specifications to create. - options: Create ACLs options. - Returns: A slice of CreateACLResult with an ErrNoError ErrorCode when the operation was successful, plus an error that is not nil for client-level errors. CreatePartitions: func (a *AdminClient) CreatePartitions(ctx context.Context, partitions []PartitionsSpecification, options ...CreatePartitionsAdminOption) (result []TopicResult, err error) - Creates additional partitions for topics. CreateTopics: func (a *AdminClient) CreateTopics(ctx context.Context, topics []TopicSpecification, options ...CreateTopicsAdminOption) (result []TopicResult, err error) - Creates topics in the cluster. - The list of TopicSpecification objects define the per-topic partition count, replicas, etc. - Topic creation is non-atomic and may succeed for some topics but fail for others; check the result for topic-specific errors. - Note: TopicSpecification is analogous to NewTopic in the Java Topic Admin API. ``` -------------------------------- ### Message String Representation Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides a method to get a human-readable string representation of a Kafka Message. Key and payload are excluded from the output. ```go func (m *Message) String() string { // Returns a human readable representation of a Message. // Key and payload are not represented. return "..." } ``` -------------------------------- ### Confluent Kafka Topic and Partition Management Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Covers types and options for administrative operations related to topics and partitions, such as creating topics and creating partitions. ```APIDOC CreatePartitionsAdminOption (type) - Description: Options for creating partitions for a topic. CreateTopicsAdminOption (type) - Description: Options for creating topics. DeleteTopicsAdminOption (type) - Description: Options for deleting topics. DescribeTopicsAdminOption (type) - Description: Options for describing topics. DescribeTopicsResult (type) - Description: Represents the result of describing topics. ``` -------------------------------- ### UUID Struct and Get Methods (Go) Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Defines the UUID struct for Kafka UUID representation. It includes methods to retrieve the most and least significant 64 bits of the 128-bit UUID. ```go type UUID struct { // contains filtered or unexported fields } func (uuid UUID) GetLeastSignificantBits() int64 func (uuid UUID) GetMostSignificantBits() int64 ``` -------------------------------- ### NewClient Constructor Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/api.html Creates a new Schema Registry client instance. Requires a configuration object to establish connection and set initial parameters. ```go func NewClient(conf *Config) (Client, error) ``` -------------------------------- ### Producer.QueryWatermarkOffsets: Get partition watermark offsets Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Retrieves the low and high watermark offsets for a given topic and partition from the broker. These represent the earliest and latest available offsets, respectively. ```APIDOC Producer.QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition. ``` -------------------------------- ### Confluent Kafka Consumer API Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides documentation for core Consumer methods in the confluent-kafka-go library. This includes creating a new consumer, managing partition assignments, and committing offsets. ```APIDOC NewConsumer: Signature: func NewConsumer(conf *ConfigMap) (*Consumer, error) Description: Creates a new high-level Consumer instance. Requires a *ConfigMap with standard librdkafka configuration properties. Supported Special Configuration Properties: go.application.rebalance.enable (bool, false): Forward rebalancing responsibility to application via the Events() channel. If true, the app must handle AssignedPartitions and RevokedPartitions events and call Assign() and Unassign() respectively. go.events.channel.enable (bool, false) [deprecated]: Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. go.events.channel.size (int, 1000): Events() channel size. go.logs.channel.enable (bool, false): Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil): Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true. Warning: Using the events channel risks receiving outdated events and messages due to buffering. Minimizing go.events.channel.size reduces this risk. Consumer.Assign: Signature: func (c *Consumer) Assign(partitions []TopicPartition) (err error) Description: Assigns an atomic set of partitions to consume. This replaces the current assignment. Parameters: partitions ([]TopicPartition): A slice of TopicPartition structs. The .Offset field must be set to an absolute starting offset (>= 0), or a logical offset (e.g., kafka.OffsetEnd). Typically set to kafka.OffsetStored to use the committed offset, falling back to auto.offset.reset if no committed offset exists. Returns: error: An error if the assignment fails. Consumer.Assignment: Signature: func (c *Consumer) Assignment() (partitions []TopicPartition, err error) Description: Returns the current partition assignments for the consumer. Returns: []TopicPartition: A slice of currently assigned TopicPartition structs. error: An error if retrieving the assignment fails. Consumer.AssignmentLost: Signature: func (c *Consumer) AssignmentLost() bool Description: Returns true if the current partition assignment has been lost. This method is applicable for subscribing consumers handling rebalance events. Lost partitions may already be owned by other group members, potentially causing offset commits to fail. Returns: bool: True if the assignment is lost, false otherwise. Consumer.Close: Signature: func (c *Consumer) Close() (err error) Description: Closes the Consumer instance. The consumer object is no longer usable after this call. Returns: error: An error if closing the consumer fails. Consumer.Commit: Signature: func (c *Consumer) Commit() ([]TopicPartition, error) Description: Commits offsets for currently assigned partitions. This is a blocking call. Returns: []TopicPartition: A slice of TopicPartition structs representing the committed offsets on success. error: An error if the commit operation fails. Consumer.CommitMessage: Signature: func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) Description: Commits the offset based on the provided message. This is a blocking call. Parameters: m (*Message): The message whose offset should be committed. Returns: []TopicPartition: A slice of TopicPartition structs representing the committed offsets on success. error: An error if the commit operation fails. Consumer.CommitOffsets: Signature: func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) Description: Commits the provided list of offsets. This is a blocking call. Parameters: offsets ([]TopicPartition): A slice of TopicPartition structs with the offsets to commit. Returns: []TopicPartition: A slice of TopicPartition structs representing the committed offsets on success. error: An error if the commit operation fails. ``` -------------------------------- ### Get Library Version Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Retrieves the version of the underlying librdkafka library used by the Go client. It returns the major and minor version numbers as integers and the full version string. ```go func LibraryVersion() ([int](https://pkg.go.dev/builtin/#int), [string](https://pkg.go.dev/builtin/#string)) ``` -------------------------------- ### Generating HTML Documentation Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Steps to generate one-page HTML documentation using the `mk/doc-gen.py` script. Requires Python and the `beautifulsoup4` package. ```shell source .../your/virtualenv/bin/activate pip install beautifulsoup4 make -f mk/Makefile docs ``` -------------------------------- ### AdminClient Creation Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides functions to create an AdminClient instance. This includes creating a new client with a ConfigMap or deriving one from an existing Consumer or Producer instance. ```go func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { // ... implementation details ... } func NewAdminClientFromConsumer(c *Consumer) (*AdminClient, error) { // ... implementation details ... } func NewAdminClientFromProducer(p *Producer) (*AdminClient, error) { // ... implementation details ... } ``` -------------------------------- ### Kafka Configuration Properties Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Lists common configuration properties used for initializing Kafka producers and consumers, such as bootstrap servers, group IDs, and Go-specific channel enablement. ```APIDOC Package kafka - Configuration Properties Common Configuration Properties: - `bootstrap.servers`: (Required for Producer/Consumer) List of broker addresses (e.g., `host1:port1,host2:port2`). - `group.id`: (Required for Consumer) The unique identifier for the consumer group. - `go.events.channel.enable`: (Boolean) Enables the `.Events()` channel for receiving messages and events. Defaults to `false`. - `go.application.rebalance.enable`: (Boolean) Enables application-level handling of partition rebalances, allowing custom offset management. Defaults to `false`. - `go.delivery.reports`: (Boolean) Enables delivery reports for produced messages. Defaults to `true`. ``` -------------------------------- ### MockCluster API Documentation Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides a comprehensive overview of the MockCluster API for testing Kafka environments with the confluent-kafka-go library. Includes methods for cluster creation, management, and broker manipulation. ```APIDOC MockCluster: Represents a Kafka mock cluster instance for testing. NewMockCluster(brokerCount int) (*MockCluster, error) Creates a mock Kafka cluster with a configurable number of brokers. Supports producer, consumer, and topic operations. Parameters: brokerCount: The number of brokers to create in the mock cluster. Returns: *MockCluster: A pointer to the newly created MockCluster instance. error: An error if the cluster creation fails. Warning: THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. BootstrapServers() string Returns the bootstrap.servers property for this MockCluster. This string can be used to configure Kafka clients to connect to the mock cluster. Returns: string: The bootstrap servers string. Close() Closes and destroys the MockCluster instance, releasing associated resources. CreateTopic(topic string, partitions, replicationFactor int) error Creates a topic in the mock cluster without requiring a producer. Parameters: topic: The name of the topic to create. partitions: The number of partitions for the topic. replicationFactor: The replication factor for the topic. Returns: error: An error if topic creation fails. SetBrokerDown(brokerID int) error Disconnects a specific broker or all brokers from the mock cluster. This simulates a broker failure but does not trigger leader changes. Parameters: brokerID: The ID of the broker to set as down. Use -1 to set all brokers down. Returns: error: An error if setting the broker state fails. SetBrokerUp(brokerID int) error Makes a specific broker or all brokers accept connections again. This simulates a broker recovery but does not trigger leader changes. Parameters: brokerID: The ID of the broker to set as up. Use -1 to set all brokers up. Returns: error: An error if setting the broker state fails. Related Types: Metadata: Contains broker and topic metadata. BrokerMetadata: Information about a specific Kafka broker. TopicMetadata: Information about a specific Kafka topic. ``` -------------------------------- ### Producer.ProduceChannel: Get the produce message channel (Deprecated) Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Returns the produce message channel for writing messages. This method is deprecated in favor of Produce(). Reliability of Flush() and Len() is not guaranteed when using ProduceChannel. ```APIDOC Producer.ProduceChannel() chan *Message ProduceChannel returns the produce *Message channel (write) Deprecated: ProduceChannel (channel based producer) is deprecated in favour of Produce(). Flush() and Len() are not guaranteed to be reliable with ProduceChannel. ``` -------------------------------- ### High-level Consumer Usage Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Guides on creating and managing a high-level Kafka consumer. It covers subscribing to topics, reading messages via Poll() or an Events channel, handling rebalances, and closing the consumer. ```APIDOC Package kafka - High-level Consumer Overview: Provides high-level Apache Kafka producer and consumers using bindings on-top of the librdkafka C library. Key Operations: 1. **Initialization**: Create a Consumer with `kafka.NewConsumer()` providing configuration properties like `bootstrap.servers` and `group.id`. 2. **Subscription**: Use `.Subscribe()` or `.SubscribeTopics()` to join a consumer group with a specified topic subscription. Subsequent calls to `.Subscribe*()` will leave the current group and rejoin with the new topics. 3. **Message Consumption**: Read messages and events by either calling `.Poll()` or using the `.Events()` channel. To enable the Events channel, set `"go.events.channel.enable": true` in the configuration. 4. **Rebalance Handling**: To receive notifications about partition assignments and revocations, set `"go.application.rebalance.enable": true` in the `NewConsumer()` call. This allows for custom initial offset management via `.Assign(partitions)` when `kafka.AssignedPartitions` events are received. For channel-based consumers, `.Assign()` is mandatory upon receiving `AssignedPartitions` and `RevokedPartitions` events. 5. **Message Retrieval**: Messages are available as `*kafka.Message` events on the `.Events` channel or via `.Poll()`. 6. **Lifecycle Management**: Call `.Close()` to commit final offsets, leave the consumer group, and clean up resources. ``` -------------------------------- ### SchemaMetadata Type Definition Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/api.html SchemaMetadata represents schema metadata, embedding SchemaInfo and adding specific fields like ID, GUID, Subject, and Version. It provides comprehensive information about a schema's identity and context. ```go type SchemaMetadata struct { SchemaInfo ID int `json:"id,omitempty"` GUID string `json:"guid,omitempty"` Subject string `json:"subject,omitempty"` Version int `json:"version,omitempty"` } ``` -------------------------------- ### Consumer.QueryWatermarkOffsets: Get Topic Partition Watermarks Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Queries the Kafka broker for the low and high watermarks (earliest and latest offsets) for a specific topic partition. This is useful for understanding the offset range available in a partition. ```APIDOC Consumer.QueryWatermarkOffsets: QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) - Queries the broker for the low and high offsets for the given topic and partition. - Parameters: - topic: The name of the topic. - partition: The partition ID. - timeoutMs: The maximum time in milliseconds to block for the query. - Returns: - low: The low watermark (earliest offset). - high: The high watermark (latest offset). - err: An error if the query fails, or nil on success. ``` -------------------------------- ### Running Go Tests Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Commands to execute different types of tests for the confluent-kafka-go project, including unit, benchmark, and coverage tests. ```shell # Run unit-tests: go test # Run benchmark tests: go test -bench . # Run code coverage: go test -coverprofile=coverage.out -bench=. go tool cover -func=coverage.out ``` -------------------------------- ### Go: TxnRequiresAbort Error Method Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html The TxnRequiresAbort method on the Error type indicates if an error is related to an abortable transaction. It returns true if the application must call AbortTransaction() and start a new transaction to proceed. This flag is specific to the Transactional producer API. ```go func (e Error) TxnRequiresAbort() bool { // ... implementation details ... } ``` -------------------------------- ### Confluent Kafka Go Producer Transactional Methods Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Manages Kafka transactions using the Confluent Kafka Go Producer. This includes starting new transactions, committing ongoing transactions, and aborting transactions. Proper handling of transaction states and delivery reports is crucial for reliable transactional messaging. ```APIDOC Producer Transactional Methods: BeginTransaction: Starts a new transaction. Requires InitTransactions() to have been called successfully. After a successful call, at least one operation (Produce, SendOffsetsToTransaction, CommitTransaction, AbortTransaction) must be performed within transaction.timeout.ms to prevent transaction timeout on the broker. Messages produced or offsets sent after this call are part of the transaction. Finish the transaction with CommitTransaction() or AbortTransaction(). Note: Produce() and similar calls are only allowed during an ongoing transaction. Parameters: None Returns: nil on success, or an error object on failure. Check for fatal errors using err.(kafka.Error).IsFatal(). CommitTransaction: Commits the current transaction. Flushes all outstanding messages before committing. If any outstanding messages fail permanently, the transaction enters an abortable error state, and this function returns an abortable error. In this case, AbortTransaction() must be called before starting a new transaction. Parameters: ctx: The context for the operation, or nil for indefinite blocking. Note: Blocks until all outstanding messages are delivered and the commit request is handled by the coordinator, or until ctx expires. Automatically calls Flush() to ensure all queued messages are delivered. The application MUST serve the producer.Events() channel for delivery reports in a separate go-routine. Returns: nil on success, or an error object on failure. Check for retriable errors using err.(kafka.Error).IsRetriable(), abortable errors using err.(kafka.Error).TxnRequiresAbort(), or fatal errors using err.(kafka.Error).IsFatal(). AbortTransaction: Aborts the ongoing transaction. Should also be used to recover from non-fatal abortable transaction errors. Any outstanding messages will be purged and fail with ErrPurgeInflight or ErrPurgeQueue. Parameters: ctx: The context for the operation, or nil for indefinite blocking. Note: Blocks until all outstanding messages are purged and the abort request is handled by the coordinator, or until ctx expires. Automatically calls Purge() and Flush() to ensure all queued and in-flight messages are purged. The application MUST serve the producer.Events() channel for delivery reports in a separate go-routine. Returns: nil on success, or an error object on failure. Check for retriable errors using err.(kafka.Error).IsRetriable(), or fatal errors using err.(kafka.Error).IsFatal(). ``` -------------------------------- ### Confluent Kafka Consumer Methods Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides documentation for core consumer operations like unassigning partitions and unsubscribing from topics. These methods manage the consumer's subscription state. ```APIDOC Consumer.Unassign() - Description: Unassigns all partitions from the consumer. - Returns: An error if the operation fails. Consumer.Unsubscribe() - Description: Unsubscribes the consumer from all topics and partitions. - Returns: An error if the operation fails. ``` -------------------------------- ### Build Docker Image Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/docker_aws_lambda_example/README.md Builds a Docker image from a specified Dockerfile, typically used for deploying applications to environments like AWS Lambda. The image is tagged as 'goclients'. ```shell $ docker build -f examples/docker_aws_lambda_example/Dockerfile -t goclients . ``` -------------------------------- ### Git Tagging and Pushing for Release Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/README.md Commands for creating and pushing a Git tag for a new release. Includes a dry-run option for verification. ```shell git tag v1.3.0 git push --dry-run origin v1.3.0 # Remove --dry-run and re-execute if it looks ok. ``` -------------------------------- ### AdminClient String Method Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides a human-readable name for an AdminClient instance. ```go func (a *AdminClient) String() string ``` -------------------------------- ### Confluent Kafka Mock Cluster Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides types and functions for creating and managing a mock Kafka cluster for testing purposes, including cluster bootstrapping, topic creation, and broker status management. ```APIDOC MockCluster (type) - Description: Represents a mock Kafka cluster for testing. - NewMockCluster(brokerCount int) (*MockCluster, error) - Description: Creates a new mock Kafka cluster with a specified number of brokers. - Parameters: - brokerCount: The number of brokers to include in the mock cluster. - Returns: - *MockCluster: A pointer to the created mock cluster. - error: An error if cluster creation fails. - (mc *MockCluster) BootstrapServers() string - Description: Returns the bootstrap servers string for the mock cluster. - (mc *MockCluster) Close() - Description: Closes the mock cluster and releases resources. - (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error - Description: Creates a topic within the mock cluster. - Parameters: - topic: The name of the topic to create. - partitions: The number of partitions for the topic. - replicationFactor: The replication factor for the topic. - Returns: - error: An error if topic creation fails. - (mc *MockCluster) SetBrokerDown(brokerID int) error - Description: Simulates a broker being down. - Parameters: - brokerID: The ID of the broker to set as down. - Returns: - error: An error if setting broker status fails. ``` -------------------------------- ### Build and Run Soak Client Application Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/soaktest/README.md Instructions to build the Go executable for the soak client and run it with specified Kafka broker and topic configurations. This application sends messages using a producer and receives them with a consumer. ```shell cd soaktest/soakclient go build ./soakclient -broker -inputTopic <..> -outTopic <..> -groupID <..> -inputTopicPartitionsNum <..> -outTopicPartitionsNum <..> -replicationFactor <..> -ccloudAPIKey -ccloudAPISecret ``` -------------------------------- ### Static Compilation for Linux with musl Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md This command demonstrates how to statically compile a Go application using musl-gcc, ensuring a truly static binary for Linux. It uses specific linker flags to achieve static linking and includes the '-tags musl' flag. ```bash CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl ``` -------------------------------- ### TopicSpecification Struct (Go) Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Represents parameters for creating a new Kafka topic. It includes topic name, number of partitions, replication factor, optional replica assignments, and topic configurations. ```go type TopicSpecification struct { // Topic name to create. Topic string // Number of partitions in topic. NumPartitions int // Default replication factor for the topic's partitions, or zero // if an explicit ReplicaAssignment is set. ReplicationFactor int // (Optional) Explicit replica assignment. The outer array is // indexed by the partition number, while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. ReplicaAssignment [][]int32 // Topic configuration. Config map[string]string } ``` -------------------------------- ### Build Project with Musl Tag Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/README.md Provides the command to build a Go project that depends on confluent-kafka-go, specifically for Alpine Linux (musl) by including the '-tags musl' flag. ```bash go build -tags musl ./... ``` -------------------------------- ### Topic and Partition Management Options Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Defines interfaces for configuring administrative operations related to Kafka topics and partitions. These options allow setting timeouts and validation parameters for requests like creating topics or partitions. ```go type CreatePartitionsAdminOption interface { // contains filtered or unexported methods } type CreateTopicsAdminOption interface { // contains filtered or unexported methods } ``` -------------------------------- ### Confluent Kafka Go Admin Client Methods Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Provides an overview of the core administrative operations available through the AdminClient in the confluent-kafka-go library. This includes methods for describing cluster state, managing topics and consumer groups, altering configurations, and retrieving metadata. ```APIDOC AdminClient Methods: DescribeCluster(ctx context.Context, options ...DescribeClusterAdminOption) (result DescribeClusterResult, err error) - Describes the Kafka cluster, including brokers and controller. - Parameters: - ctx: Context for the request. - options: Optional parameters for the describe cluster operation. - Returns: - DescribeClusterResult: Contains cluster information. - error: An error if the operation fails. DescribeConfigs(ctx context.Context, resources []ConfigResource, options ...DescribeConfigsAdminOption) (result []ConfigResourceResult, err error) - Describes the configurations for specified resources (topics, brokers, etc.). - Parameters: - ctx: Context for the request. - resources: A slice of ConfigResource specifying which resources to describe. - options: Optional parameters for the describe configs operation. - Returns: - []ConfigResourceResult: A slice of ConfigResourceResult containing configuration details. - error: An error if the operation fails. DescribeConsumerGroups(ctx context.Context, groups []string, options ...DescribeConsumerGroupsAdminOption) (result DescribeConsumerGroupsResult, err error) - Describes the state of specified consumer groups. - Parameters: - ctx: Context for the request. - groups: A slice of strings representing the consumer group IDs to describe. - options: Optional parameters for the describe consumer groups operation. - Returns: - DescribeConsumerGroupsResult: Contains information about the consumer groups. - error: An error if the operation fails. DescribeTopics(ctx context.Context, topics TopicCollection, options ...DescribeTopicsAdminOption) (result DescribeTopicsResult, err error) - Describes the metadata for specified topics. - Parameters: - ctx: Context for the request. - topics: A TopicCollection specifying the topics to describe. - options: Optional parameters for the describe topics operation. - Returns: - DescribeTopicsResult: Contains information about the topics. - error: An error if the operation fails. DescribeUserScramCredentials(ctx context.Context, users []string, options ...DescribeUserScramCredentialsAdminOption) (result DescribeUserScramCredentialsResult, err error) - Describes SCRAM credentials for specified users. - Parameters: - ctx: Context for the request. - users: A slice of strings representing the user principals. - options: Optional parameters for the describe credentials operation. - Returns: - DescribeUserScramCredentialsResult: Contains SCRAM credential information. - error: An error if the operation fails. ElectLeaders(ctx context.Context, electLeaderRequest ElectLeadersRequest, options ...ElectLeadersAdminOption) (result ElectLeadersResult, err error) - Elects new leaders for partitions of specified topics. - Parameters: - ctx: Context for the request. - electLeaderRequest: Specifies the topics and partitions for leader election. - options: Optional parameters for the elect leaders operation. - Returns: - ElectLeadersResult: Contains the result of the leader election. - error: An error if the operation fails. GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) - Retrieves cluster metadata, including brokers, topics, and partitions. - Parameters: - topic: Optional topic name to filter metadata. - allTopics: If true, retrieves metadata for all topics. - timeoutMs: Timeout in milliseconds for the metadata request. - Returns: - *Metadata: Cluster metadata. - error: An error if the operation fails. IncrementalAlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error) - Increments or decrements configuration values for specified resources. - Parameters: - ctx: Context for the request. - resources: A slice of ConfigResource specifying resources and their config changes. - options: Optional parameters for the alter configs operation. - Returns: - []ConfigResourceResult: Results of the configuration alteration. - error: An error if the operation fails. IsClosed() bool - Checks if the AdminClient has been closed. - Returns: - bool: True if closed, false otherwise. ListConsumerGroupOffsets(ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, options ...ListConsumerGroupOffsetsAdminOption) (lcgor ListConsumerGroupOffsetsResult, err error) - Lists the current offsets for specified consumer groups and partitions. - Parameters: - ctx: Context for the request. - groupsPartitions: Specifies consumer groups and their partitions. - options: Optional parameters for listing offsets. - Returns: - ListConsumerGroupOffsetsResult: Contains the listed offsets. - error: An error if the operation fails. ListConsumerGroups(ctx context.Context, options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error) - Lists all consumer groups in the cluster. - Parameters: - ctx: Context for the request. - options: Optional parameters for listing consumer groups. - Returns: - ListConsumerGroupsResult: Contains the list of consumer groups. - error: An error if the operation fails. ListOffsets(ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error) - Lists offsets for specified topic partitions. - Parameters: - ctx: Context for the request. - topicPartitionOffsets: A map of TopicPartition to OffsetSpec. - options: Optional parameters for listing offsets. - Returns: - ListOffsetsResult: Contains the listed offsets. - error: An error if the operation fails. SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error - Sets the OAuth bearer token for SASL/OAUTHBEARER authentication. - Parameters: - oauthBearerToken: The OAuthBearerToken object. - Returns: - error: An error if setting the token fails. SetOAuthBearerTokenFailure(errstr string) error - Sets a failure string for OAuth bearer token authentication. - Parameters: - errstr: The error string to set. - Returns: - error: An error if setting the failure string fails. SetSaslCredentials(username, password string) error - Sets SASL credentials (username and password) for authentication. - Parameters: - username: The SASL username. - password: The SASL password. - Returns: - error: An error if setting credentials fails. String() string - Returns a string representation of the AdminClient. - Returns: - string: The string representation. ``` -------------------------------- ### Schema Registry Client Configuration Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/api.html Functions for creating and configuring the Schema Registry client, including options for basic authentication. ```APIDOC type Config struct - Used to pass multiple configuration options to the Schema Registry client. - Contains an internal ClientConfig. func NewConfig(url string) *Config - Returns a new configuration instance with sane defaults. - Parameters: - url: The URL of the Schema Registry. - Returns: A pointer to a new Config instance. ``` ```APIDOC func NewConfigWithAuthentication(url string, username string, password string) *Config - Returns a new configuration instance using basic authentication. - For Confluent Cloud, use the API key for the username and the API secret for the password. - This method is deprecated. - Parameters: - url: The URL of the Schema Registry. - username: The username for authentication (e.g., Confluent Cloud API key). - password: The password for authentication (e.g., Confluent Cloud API secret). - Returns: A pointer to a new Config instance. ``` ```APIDOC func NewConfigWithBasicAuthentication(url string, username string, password string) *Config - Returns a new configuration instance using basic authentication. - For Confluent Cloud, use the API key for the username and the API secret for the password. - Parameters: - url: The URL of the Schema Registry. - username: The username for authentication (e.g., Confluent Cloud API key). - password: The password for authentication (e.g., Confluent Cloud API secret). - Returns: A pointer to a new Config instance. ``` -------------------------------- ### Go PartitionsSpecification Struct Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/api.html Specifies parameters for creating additional partitions for a Kafka topic. It includes the topic name, the desired new partition count, and an optional explicit replica assignment. ```go type PartitionsSpecification struct { // Topic to create more partitions for. Topic string // New partition count for topic, must be higher than current partition count. IncreaseTo int // (Optional) Explicit replica assignment. The outer array is // indexed by the new partition index (i.e., 0 for the first added // partition), while the inner per-partition array // contains the replica broker ids. The first broker in each // broker id list will be the preferred replica. ReplicaAssignment [][]int32 } ``` -------------------------------- ### Schema Registry Client Interface Source: https://github.com/confluentinc/confluent-kafka-go/blob/master/schemaregistry/api.html Defines the interface for interacting with the Schema Registry and provides a constructor for creating a client instance. ```APIDOC Package schemaregistry Client Interface: type Client interface - Represents a client for the Schema Registry. - Methods (not detailed here, but typically include operations like GetSchema, RegisterSchema, DeleteSchema, etc.) NewClient(conf *Config) (Client, error) - Creates a new Schema Registry client instance. - Parameters: - conf: A pointer to the configuration object for the client. - Returns: - Client: An instance of the Schema Registry client. - error: An error if the client could not be created. ```