Skip to content

source-c/clj-kafka-x

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

100 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

clj-kafka-x

A Clojure library for the Apache Kafka (distributed stream-processing software platform).

Uses KF protocol and does not rely on ZooKeeper.

Tries to be as lightweight as possible thus depends only on

  • org.apache.kafka/kafka-clients "4.2.0"

Note
The Zookeeper dependency has been removed as it’s not required by modern Kafka clients for standard operations.
Note
Some builds (for instance of v0.4.x branch) may partially (sometimes even fully) be incompatible with some versions of other libraries that also use NIO! If you’re experiencing build problems and/or your application is unexpectedly crashed on start - try check your project dependencies more deeply, may be you will need to correct existing dependencies version or to add an actual version of full [io.netty/netty-all]

Actual library info:

GitHub clj kafka x ClojarsDownloads GitHub release (latest by date) GitHub Release Date GitHub tag (latest by date) GitHub last commit

Installation

Add the following to your Leiningen’s project.clj:

[net.tbt-post/clj-kafka-x "0.9.0"]

Usage

Producer

(require '[clj-kafka-x.producer :as kp])

(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
                           (kp/string-serializer)
                           (kp/string-serializer))]
  @(kp/send p (kp/record "topic-a" "Hi there!")))

Consumer

(require '[clj-kafka-x.consumers.simple :as kc])

(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
                            "group.id" "consumer-id"}
                            (kc/string-deserializer)
                            (kc/string-deserializer))]
  (kc/subscribe c "topic-a")
  (kc/messages c))

Share Consumer (Kafka Queues)

Share consumers (KIP-932) allow multiple consumers to read from the same partition concurrently, solving the Head of Line Blocking problem.

(require '[clj-kafka-x.consumers.shared :as ks])

(with-open [c (ks/share-consumer {"bootstrap.servers" "localhost:9092"
                                   "group.id" "my-share-group"}
                                   (ks/string-deserializer)
                                   (ks/string-deserializer))]
  (ks/subscribe c "topic-a")
  (let [msgs (ks/messages c)]
    (doseq [msg msgs]
      (ks/acknowledge c msg))
    (ks/commit-sync c)
    msgs))

Records can be acknowledged with :accept (default), :release, :reject, or :renew:

(ks/acknowledge c msg :reject)  ;; reject permanently
(ks/acknowledge c msg :release) ;; release for redelivery
Note
When you use multiple partitions per topic it is required to specify them explicitly when subscribing, i.e. (kc/subscribe c [{:topic "topic-a" :partitions #{0 1}} {:topic "topic-b" :partitions #{0 1 2}}])
Real-life (almost) example
(ns buzz.consumer.kafka
  (:require [clj-kafka-x.consumers.simple :as kc]
            [clojure.tools.logging :as log]))

(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
             "group.id" "consumer-id"})

(defn process-message [msg]
  (let [{:keys [value topic partition offset]} msg
        processor processor ;; choose one by topic name
        schema schema]      ;; choose one by topic name
    (if (fn? processor) (processor value schema) value)))

(defn consume []
  (with-open [c (kc/consumer config
                             (kc/byte-array-deserializer)
                             (kc/byte-array-deserializer))]
    (kc/subscribe c (config/kafka-topics))
    (let [pool (kc/messages c)]
      (doseq [message pool]
        (log/warn (process-message message))))))

you may also use specific timeouts form

(defn- consume [instance process-message]
  (when-let [co (kc/consumer config
                             (kc/byte-array-deserializer)
                             (kc/byte-array-deserializer))
             messages (kc/messages
                        co
                        :timeout (:request-timeout-ms config))]
    (doall (map process-message messages))))

message count per poll execution may be specified by max.poll.records field of configuration

Manual Build

$ lein install

Development and Testing

The project includes a comprehensive test suite with ~82% code coverage.

$ lein with-profile dev test

For test coverage reports:

$ lein with-profile dev cloverage

Current coverage:

Namespace Forms Lines

clj-kafka-x.consumers.shared

81.60%

89.09%

clj-kafka-x.consumers.simple

80.65%

83.13%

clj-kafka-x.data

81.95%

83.95%

clj-kafka-x.impl.helpers

87.90%

93.55%

clj-kafka-x.producer

76.09%

81.82%

Total

81.70%

85.51%

Integration Tests

The project includes integration tests that require a running Kafka instance. By default, these tests are skipped. To run them:

(binding [clj-kafka-x.integration-test/*run-integration-tests* true
          clj-kafka-x.integration-test/*kafka-bootstrap-servers* "localhost:9092"]
  (clj-kafka-x.integration-test/run-integration-tests))

Changelog

See CHANGELOG.adoc for version history.

License

Copyright © 2016-2026

Distributed under the Apache License v 2.0

Sponsor this project

 

Packages

 
 
 

Contributors