Skip to content

Commit e3a268a

Browse files
authored
bump all versions, migrate to ClickHouse client v2 (#58)
1 parent 648ca66 commit e3a268a

21 files changed

Lines changed: 263 additions & 274 deletions

File tree

build.sbt

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "stream-loader"
22

3-
ThisBuild / scalaVersion := "2.13.15"
3+
ThisBuild / scalaVersion := "2.13.17"
44
ThisBuild / scalacOptions := Seq(
55
"-unchecked",
66
"-deprecation",
@@ -25,12 +25,12 @@ ThisBuild / git.remoteRepo := {
2525
}
2626

2727
val scalaTestVersion = "3.2.19"
28-
val scalaCheckVersion = "1.18.1"
28+
val scalaCheckVersion = "1.19.0"
2929
val scalaCheckTestVersion = "3.2.19.0"
3030

31-
val hadoopVersion = "3.4.1"
32-
val parquetVersion = "1.15.2"
33-
val icebergVersion = "1.7.0"
31+
val hadoopVersion = "3.4.2"
32+
val parquetVersion = "1.16.0"
33+
val icebergVersion = "1.10.0"
3434

3535
lazy val `stream-loader-core` = project
3636
.in(file("stream-loader-core"))
@@ -41,19 +41,19 @@ lazy val `stream-loader-core` = project
4141
buildInfoKeys := Seq[BuildInfoKey](name, version, scalaVersion, git.gitHeadCommit),
4242
libraryDependencies ++= Seq(
4343
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
44-
"org.apache.kafka" % "kafka-clients" % "3.9.0",
44+
"org.apache.kafka" % "kafka-clients" % "4.1.0",
4545
"org.log4s" %% "log4s" % "1.10.0",
46-
"org.apache.commons" % "commons-compress" % "1.27.1",
47-
"org.xerial.snappy" % "snappy-java" % "1.1.10.7",
46+
"org.apache.commons" % "commons-compress" % "1.28.0",
47+
"org.xerial.snappy" % "snappy-java" % "1.1.10.8",
4848
"org.lz4" % "lz4-java" % "1.8.0",
49-
"com.github.luben" % "zstd-jni" % "1.5.6-8",
49+
"com.github.luben" % "zstd-jni" % "1.5.7-5",
5050
"com.univocity" % "univocity-parsers" % "2.9.1",
5151
"org.json4s" %% "json4s-native" % "4.0.7",
52-
"io.micrometer" % "micrometer-core" % "1.14.1",
52+
"io.micrometer" % "micrometer-core" % "1.15.4",
5353
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
5454
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
5555
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
56-
"ch.qos.logback" % "logback-classic" % "1.5.12" % "test"
56+
"ch.qos.logback" % "logback-classic" % "1.5.19" % "test"
5757
)
5858
)
5959

@@ -64,8 +64,8 @@ lazy val `stream-loader-clickhouse` = project
6464
.settings(
6565
resolvers += "jitpack" at "https://jitpack.io",
6666
libraryDependencies ++= Seq(
67-
"org.apache.httpcomponents.client5" % "httpclient5" % "5.4.1",
68-
"com.clickhouse" % "clickhouse-jdbc" % "0.7.1",
67+
"org.apache.httpcomponents.client5" % "httpclient5" % "5.5.1",
68+
"com.clickhouse" % "client-v2" % "0.9.2",
6969
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
7070
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
7171
"org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test"
@@ -106,14 +106,14 @@ lazy val `stream-loader-s3` = project
106106
.settings(commonSettings)
107107
.settings(
108108
libraryDependencies ++= Seq(
109-
"software.amazon.awssdk" % "s3" % "2.29.20",
109+
"software.amazon.awssdk" % "s3" % "2.35.5",
110110
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
111-
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.778" % "test",
112-
"org.gaul" % "s3proxy" % "2.4.1" % "test"
111+
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.792" % "test",
112+
"org.gaul" % "s3proxy" % "2.8.0" % "test"
113113
)
114114
)
115115

116-
val verticaVersion = "24.4.0-0"
116+
val verticaVersion = "25.3.0-0"
117117

118118
lazy val `stream-loader-vertica` = project
119119
.in(file("stream-loader-vertica"))
@@ -128,7 +128,8 @@ lazy val `stream-loader-vertica` = project
128128
)
129129
)
130130

131-
val duckdbVersion = "1.1.3"
131+
val duckdbVersion = "1.4.1.0"
132+
val jerseyVersion = "3.1.11"
132133

133134
lazy val packAndSplitJars =
134135
taskKey[(File, File)]("Runs pack and splits out the application jars from the external dependency jars")
@@ -150,19 +151,21 @@ lazy val `stream-loader-tests` = project
150151
.settings(commonSettings)
151152
.settings(
152153
libraryDependencies ++= Seq(
153-
"com.typesafe" % "config" % "1.4.3",
154-
"ch.qos.logback" % "logback-classic" % "1.5.12",
155-
"com.zaxxer" % "HikariCP" % "6.2.1",
156-
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
157-
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
158-
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
159-
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
160-
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
161-
"org.slf4j" % "log4j-over-slf4j" % "2.0.16" % "test",
162-
"org.mandas" % "docker-client" % "8.0.3" % "test",
163-
"org.jboss.resteasy" % "resteasy-client" % "6.2.11.Final" % "test",
164-
"com.fasterxml.jackson.jakarta.rs" % "jackson-jakarta-rs-json-provider" % "2.18.1" % "test",
165-
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
154+
"com.typesafe" % "config" % "1.4.5",
155+
"ch.qos.logback" % "logback-classic" % "1.5.19",
156+
"com.zaxxer" % "HikariCP" % "7.0.2",
157+
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion,
158+
"com.vertica.jdbc" % "vertica-jdbc" % verticaVersion,
159+
"org.scalacheck" %% "scalacheck" % scalaCheckVersion,
160+
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
161+
"org.scalatestplus" %% "scalacheck-1-18" % scalaCheckTestVersion % "test",
162+
"org.slf4j" % "log4j-over-slf4j" % "2.0.17" % "test",
163+
"org.mandas" % "docker-client" % "9.0.4" % "test",
164+
"org.glassfish.jersey.core" % "jersey-client" % jerseyVersion % "test",
165+
"org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion % "test",
166+
"org.glassfish.jersey.connectors" % "jersey-apache-connector" % jerseyVersion % "test",
167+
"org.glassfish.jersey.media" % "jersey-media-json-jackson" % jerseyVersion % "test",
168+
"org.duckdb" % "duckdb_jdbc" % duckdbVersion % "test"
166169
),
167170
inConfig(IntegrationTest)(Defaults.testTasks),
168171
publish := {},
@@ -195,7 +198,7 @@ lazy val `stream-loader-tests` = project
195198
val bin = s"/opt/${name.value}/bin/"
196199

197200
new Dockerfile {
198-
from("eclipse-temurin:21.0.2_13-jre")
201+
from("eclipse-temurin:21.0.8_9-jre")
199202

200203
env("APP_CLASS_PATH" -> s"$lib/*")
201204

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.11.3
1+
sbt.version=1.11.7

project/plugins.sbt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@ addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.11.0")
44

55
addSbtPlugin("com.github.sbt" % "sbt-git" % "2.1.0")
66

7-
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.20")
7+
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.22")
88

99
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.13.1")
1010

11-
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
11+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.5")
1212

1313
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
1414

15-
addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")
15+
addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.6.0")
1616

1717
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
1818

19-
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2024.8"
19+
libraryDependencies += "net.sourceforge.plantuml" % "plantuml" % "1.2025.8"
2020

21-
addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.8.0")
21+
addSbtPlugin("com.github.sbt" % "sbt-ghpages" % "0.9.0")
2222

23-
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.0")
23+
addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.1")

stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88

99
package com.adform.streamloader.clickhouse
1010

11-
import com.adform.streamloader.sink.file.{FileBuilder, FileBuilderFactory}
12-
import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat}
11+
import com.adform.streamloader.sink.file.{Compression, FileBuilder, FileBuilderFactory}
12+
import com.clickhouse.data.ClickHouseFormat
1313

1414
/**
1515
* A FileBuilder able to build files that can be loaded to ClickHouse.
@@ -26,7 +26,7 @@ trait ClickHouseFileBuilder[-R] extends FileBuilder[R] {
2626
/**
2727
* Compression to use for the files being constructed.
2828
*/
29-
def compression: ClickHouseCompression
29+
def fileCompression: Compression
3030
}
3131

3232
trait ClickHouseFileBuilderFactory[R] extends FileBuilderFactory[R, ClickHouseFileBuilder[R]] {

stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatch.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88

99
package com.adform.streamloader.clickhouse
1010

11-
import java.io.File
1211
import com.adform.streamloader.model.StreamRange
13-
import com.adform.streamloader.sink.file.FileRecordBatch
14-
import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat}
12+
import com.adform.streamloader.sink.file.{Compression, FileRecordBatch}
13+
import com.clickhouse.data.ClickHouseFormat
14+
15+
import java.io.File
1516

1617
/**
1718
* A file containing a batch of records in some ClickHouse supported format that can be loaded to ClickHouse.
1819
*/
1920
case class ClickHouseFileRecordBatch(
2021
file: File,
2122
format: ClickHouseFormat,
22-
compression: ClickHouseCompression,
23+
fileCompression: Compression,
2324
recordRanges: Seq[StreamRange],
2425
rowCount: Long
2526
) extends FileRecordBatch

stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileRecordBatcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ClickHouseFileRecordBatcher[R](
3232
ClickHouseFileRecordBatch(
3333
f,
3434
fileBuilder.format,
35-
fileBuilder.compression,
35+
fileBuilder.fileCompression,
3636
recordRanges,
3737
fileBuilder.getRecordCount
3838
)

stream-loader-clickhouse/src/main/scala/com/adform/streamloader/clickhouse/ClickHouseFileStorage.scala

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,22 @@ package com.adform.streamloader.clickhouse
1010

1111
import com.adform.streamloader.model._
1212
import com.adform.streamloader.sink.batch.storage.InDataOffsetBatchStorage
13+
import com.adform.streamloader.sink.file.Compression
1314
import com.adform.streamloader.util.Logging
14-
import com.clickhouse.data.ClickHouseFile
15-
import com.clickhouse.jdbc.ClickHouseConnection
15+
import com.clickhouse.client.api.Client
16+
import com.clickhouse.client.api.insert.InsertSettings
1617
import org.apache.kafka.common.TopicPartition
1718

18-
import java.sql.Connection
19-
import javax.sql.DataSource
19+
import java.nio.file.Files
20+
import java.time.ZoneOffset
2021
import scala.collection.mutable
21-
import scala.jdk.CollectionConverters._
22-
import scala.util.Using
2322

2423
/**
2524
* A ClickHouse storage implementation, stores offsets in rows of data.
2625
* Queries ClickHouse upon initialization in order to retrieve committed stream positions.
2726
*/
2827
class ClickHouseFileStorage(
29-
dbDataSource: DataSource,
28+
client: Client,
3029
table: String,
3130
topicColumnName: String,
3231
partitionColumnName: String,
@@ -35,7 +34,7 @@ class ClickHouseFileStorage(
3534
) extends InDataOffsetBatchStorage[ClickHouseFileRecordBatch]
3635
with Logging {
3736

38-
def committedPositions(connection: Connection): Map[TopicPartition, StreamPosition] = {
37+
override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
3938
val positionQuery =
4039
s"""SELECT
4140
| $topicColumnName,
@@ -47,53 +46,52 @@ class ClickHouseFileStorage(
4746
|GROUP BY $topicColumnName, $partitionColumnName
4847
|""".stripMargin
4948

50-
Using.resource(connection.prepareStatement(positionQuery)) { statement =>
51-
{
52-
log.info(s"Running stream position query: $positionQuery")
53-
Using.resource(statement.executeQuery()) { result =>
54-
val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty
55-
while (result.next()) {
56-
val topic = result.getString(1)
57-
val partition = result.getInt(2)
58-
val offset = result.getLong(3)
59-
val watermark = Timestamp(result.getTimestamp(4).getTime)
60-
if (!result.wasNull()) {
61-
val topicPartition = new TopicPartition(topic, partition)
62-
val position = StreamPosition(offset, watermark)
63-
positions.put(topicPartition, position)
64-
}
65-
}
66-
positions.toMap
67-
}
68-
}
69-
}
49+
log.info(s"Running stream position query: $positionQuery")
50+
val positions: mutable.HashMap[TopicPartition, StreamPosition] = mutable.HashMap.empty
51+
client
52+
.queryAll(positionQuery)
53+
.forEach(row => {
54+
val topic = row.getString(1)
55+
val partition = row.getInteger(2)
56+
val offset = row.getLong(3)
57+
val watermark = Timestamp(row.getLocalDateTime(4).toInstant(ZoneOffset.UTC).toEpochMilli)
58+
59+
val topicPartition = new TopicPartition(topic, partition)
60+
val position = StreamPosition(offset, watermark)
61+
positions.put(topicPartition, position)
62+
})
63+
64+
topicPartitions.map(tp => (tp, positions.get(tp))).toMap
7065
}
7166

72-
override def committedPositions(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Option[StreamPosition]] = {
73-
Using.resource(dbDataSource.getConnection()) { connection =>
74-
val positions = committedPositions(connection)
75-
topicPartitions.map(tp => (tp, positions.get(tp))).toMap
76-
}
67+
override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = {
68+
val settings = new InsertSettings()
69+
.setOption("max_insert_block_size", batch.rowCount) // ensure single block to prevent partial writes
70+
.setDeduplicationToken(deduplicationToken(batch.recordRanges)) // deduplicate based on ranges
71+
72+
contentEncoding(batch.fileCompression).foreach(encoding => settings.appCompressedData(true, encoding))
73+
74+
client.insert(table, Files.newInputStream(batch.file.toPath), batch.format, settings).get()
7775
}
7876

79-
override def commitBatchWithOffsets(batch: ClickHouseFileRecordBatch): Unit = {
80-
Using.resource(dbDataSource.getConnection) { connection =>
81-
Using.resource(connection.unwrap(classOf[ClickHouseConnection]).createStatement) { statement =>
82-
statement
83-
.write()
84-
.data(ClickHouseFile.of(batch.file, batch.compression, 1, batch.format))
85-
.table(table)
86-
.params(Map("max_insert_block_size" -> batch.rowCount.toString).asJava) // atomic insert
87-
.executeAndWait()
88-
}
89-
}
77+
private def contentEncoding(fileCompression: Compression): Option[String] = fileCompression match {
78+
case Compression.NONE => None
79+
case Compression.ZSTD => Some("zstd")
80+
case Compression.GZIP => Some("gzip")
81+
case Compression.BZIP => Some("bz2")
82+
case Compression.LZ4 => Some("lz4")
83+
case _ => throw new UnsupportedOperationException(s"Compression $fileCompression is not supported by ClickHouse")
84+
}
85+
86+
private def deduplicationToken(ranges: Seq[StreamRange]): String = {
87+
ranges.map(range => s"${range.topic}:${range.partition}:${range.start.offset}:${range.end.offset}").mkString(";")
9088
}
9189
}
9290

9391
object ClickHouseFileStorage {
9492

9593
case class Builder(
96-
private val _dbDataSource: DataSource,
94+
private val _client: Client,
9795
private val _table: String,
9896
private val _topicColumnName: String,
9997
private val _partitionColumnName: String,
@@ -102,9 +100,9 @@ object ClickHouseFileStorage {
102100
) {
103101

104102
/**
105-
* Sets a data source for ClickHouse JDBC connections.
103+
* Sets the ClickHouse client.
106104
*/
107-
def dbDataSource(source: DataSource): Builder = copy(_dbDataSource = source)
105+
def client(client: Client): Builder = copy(_client = client)
108106

109107
/**
110108
* Sets the table to load data to.
@@ -129,11 +127,11 @@ object ClickHouseFileStorage {
129127
)
130128

131129
def build(): ClickHouseFileStorage = {
132-
if (_dbDataSource == null) throw new IllegalStateException("Must provide a ClickHouse data source")
130+
if (_client == null) throw new IllegalStateException("Must provide a ClickHouse client")
133131
if (_table == null) throw new IllegalStateException("Must provide a valid table name")
134132

135133
new ClickHouseFileStorage(
136-
_dbDataSource,
134+
_client,
137135
_table,
138136
_topicColumnName,
139137
_partitionColumnName,

0 commit comments

Comments
 (0)