Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dd9b9bf
add support for pii in alerts
John-Weak Feb 26, 2026
a6a7734
enable prefill of drawer
John-Weak Feb 26, 2026
3d11059
fix test
John-Weak Feb 26, 2026
d962961
add alert test
John-Weak Feb 26, 2026
e3bcb04
improve naming
John-Weak Feb 26, 2026
6810170
ui padding fix
John-Weak Feb 26, 2026
a3e975a
fix test
John-Weak Feb 27, 2026
38cfe44
processCoreMetrics is called in returnRequestMetrics() during /sdk/pr…
John-Weak Feb 27, 2026
82030e5
Merge branch 'newarchitecture' into feature-pii
can-angun Mar 4, 2026
949ec59
add support for pii in alerts
John-Weak Feb 26, 2026
8a7489a
enable prefill of drawer
John-Weak Feb 26, 2026
5de8fe2
fix test
John-Weak Feb 26, 2026
cbf4aa7
add alert test
John-Weak Feb 26, 2026
408639c
improve naming
John-Weak Feb 26, 2026
49052b5
ui padding fix
John-Weak Feb 26, 2026
e374d39
fix test
John-Weak Feb 27, 2026
e0c3dc4
processCoreMetrics is called in returnRequestMetrics() during /sdk/pr…
John-Weak Feb 27, 2026
3941c39
Merge branch 'feature-pii' of github.com:Countly/countly-server into …
John-Weak Mar 9, 2026
0b98f58
add clickhouse for pii
John-Weak Mar 12, 2026
6f22cd7
update alerts test to support pii clickhouse
John-Weak Mar 12, 2026
61e160b
exec time
John-Weak Mar 24, 2026
21da67c
alerts fix
John-Weak Apr 1, 2026
e044b9c
Merge branch 'newarchitecture' of github.com:Countly/countly-server i…
John-Weak Apr 1, 2026
8b59ccf
user profile table cursor fix
John-Weak May 5, 2026
6f8f503
ts frontend compile
John-Weak May 5, 2026
41bd1b4
raw event transformer
John-Weak May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
],
"internalConsoleOptions": "openOnSessionStart",
"env": {
"COUNTLY_CONFIG_HOSTNAME": "localhost:3001"
"COUNTLY_CONFIG_HOSTNAME": "localhost"
}
},
{
Expand Down
8 changes: 7 additions & 1 deletion Gruntfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ module.exports = function(grunt) {

grunt.registerTask('dist', ['sass', 'concat', 'uglify', 'replace-paths', 'cssmin']);

grunt.registerTask('plugins', 'Minify plugin JS / CSS files and copy images', function() {
grunt.registerTask('compile-plugin-frontends', 'Compile plugin frontend TypeScript to JavaScript', function() {
var execSync = require('child_process').execSync;
execSync('node scripts/compile-plugin-frontends.js', { cwd: __dirname, stdio: 'inherit' });
});

grunt.registerTask('plugins', ['compile-plugin-frontends', 'plugins:run']);
grunt.registerTask('plugins:run', 'Minify plugin JS / CSS files and copy images', function() {
var js = [], css = [], img = [], fs = require('fs'), path = require('path');

var pluginFolderPath = path.join(__dirname, 'plugins');
Expand Down
7 changes: 7 additions & 0 deletions api/ingestor/requestProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ interface RequestEvent {
_system_event?: boolean;
/** ID value */
_id?: string;
/** PII raw delta — original values before obfuscation, keyed by top-level field */
raw?: Record<string, unknown>;
}

/**
Expand Down Expand Up @@ -876,6 +878,11 @@ const processToDrill = async function(params: RequestParams, drill_updates: Dril
dbEventObject.s = currEvent.sum || 0;
dbEventObject.dur = currEvent.dur || 0;
dbEventObject.c = currEvent.count || 1;

if (currEvent.raw && typeof currEvent.raw === 'object') {
dbEventObject.raw = currEvent.raw;
}

eventsToInsert.push({ 'insertOne': { 'document': dbEventObject } });

if (eventKey === '[CLY]_view') {
Expand Down
1 change: 0 additions & 1 deletion api/ingestor/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,6 @@ const usage: IngestorUsageModule = {
if (Object.keys(update).length > 0) {
ob.updates.push(update);
}
usage.processCoreMetrics(params); // Collects core metrics
},

/**
Expand Down
7 changes: 7 additions & 0 deletions api/utils/eventTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ interface DrillEventDocument {
sg?: Record<string, unknown>;
up_extra?: Record<string, unknown>;
lu?: number | Date | string;
/** PII raw delta: original values for fields that were obfuscated or blocked. Keys match drill event field names (e.g. sg, up). Only present when PII obfuscation occurred. */
raw?: Record<string, unknown>;
[key: string]: unknown;
}

Expand All @@ -44,6 +46,8 @@ interface KafkaEventFormat {
sg?: Record<string, unknown>;
up_extra?: Record<string, unknown>;
lu?: number;
/** PII raw delta: original values for fields that were obfuscated or blocked. Only present when PII obfuscation occurred. Ignored by the drill_events ClickHouse table (no matching column); consumed by drill_events_raw via a dedicated connector + SMT. */
raw?: Record<string, unknown>;
}

/**
Expand Down Expand Up @@ -103,6 +107,9 @@ function transformToKafkaEventFormat(doc: DrillEventDocument | null | undefined)
if (doc.up_extra && typeof doc.up_extra === 'object') {
result.up_extra = doc.up_extra;
}
if (doc.raw && typeof doc.raw === 'object') {
result.raw = doc.raw;
}

// Optional date field
if (doc.lu !== undefined && doc.lu !== null) {
Expand Down
40 changes: 16 additions & 24 deletions bin/datasource/Dockerfile-kafka-connect-clickhouse-dev
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
FROM eclipse-temurin:21-jre
# Stage 1: compile the RawEventExtractor SMT against the Kafka Connect API JARs
# that are bundled inside the existing runtime image.
FROM eclipse-temurin:21-jdk AS smt-builder

ARG SCALA_VERSION=2.13
ARG KAFKA_VERSION=4.0.0
ARG CH_SINK_VERSION=1.3.5
# Copy the Kafka libs from the runtime image so javac has the Connect API on its classpath
COPY --from=countly/kafka-connect-clickhouse:latest /opt/kafka/libs /opt/kafka/libs
COPY --from=countly/kafka-connect-clickhouse:latest /opt/kafka/plugins /opt/kafka/plugins

# Install Kafka (includes connect-distributed.sh)
RUN mkdir -p /opt && \
curl -fSL https://dlcdn.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
-o /tmp/kafka.tgz && \
tar -xzf /tmp/kafka.tgz -C /opt && \
ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka && \
rm /tmp/kafka.tgz
COPY transforms/RawEventExtractor.java /tmp/RawEventExtractor.java
RUN mkdir -p /tmp/smt-build && \
javac -cp "/opt/kafka/libs/*:/opt/kafka/plugins/clickhouse/*" \
-d /tmp/smt-build \
/tmp/RawEventExtractor.java && \
jar cf /tmp/countly-smt.jar -C /tmp/smt-build .

# Plugins
RUN mkdir -p /opt/kafka/plugins/clickhouse
# Pull the ClickHouse Sink (Apache-2.0) release ZIP and extract jars
RUN apt-get update && apt-get install -y unzip ca-certificates curl && \
curl -fSL \
https://github.com/ClickHouse/clickhouse-kafka-connect/releases/download/v${CH_SINK_VERSION}/clickhouse-kafka-connect-v${CH_SINK_VERSION}.zip \
-o /tmp/ch-sink.zip && \
unzip -q /tmp/ch-sink.zip -d /opt/kafka/plugins/clickhouse && \
rm -rf /var/lib/apt/lists/* /tmp/ch-sink.zip
# Stage 2: extend the existing runtime image with the compiled JAR
FROM countly/kafka-connect-clickhouse:latest

COPY --from=smt-builder /tmp/countly-smt.jar /opt/kafka/plugins/countly-smt/countly-smt.jar

# Copy minimal config (environment variables will override)
COPY connect-distributed.properties /opt/kafka/config/connect-distributed.properties
COPY clickhouse-connector.json /opt/kafka/config/clickhouse-connector.json

EXPOSE 8083
WORKDIR /opt/kafka
CMD ["bin/connect-distributed.sh", "config/connect-distributed.properties"]
26 changes: 26 additions & 0 deletions bin/datasource/clickhouse-connector-raw.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "clickhouse-sink-raw",
"config": {
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "1",
"topics": "drill-events",
"hostname": "clickhouse",
"port": "8123",
"database": "countly_drill",
"username": "default",
"password": "",
"ssl": "false",
"exactlyOnce": "false",
"topic2TableMap": "drill-events=drill_events_raw",
"clickhouseSettings": "input_format_binary_read_json_as_string=1,allow_experimental_json_type=1,enable_json_type=1,async_insert=1,wait_for_async_insert=1",
"bypassRowBinary": "false",
"errors.tolerance": "none",
"errors.retry.timeout": "60",
"tableRefreshInterval": "300",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "extractRaw",
"transforms.extractRaw.type": "dev.you.smt.RawEventExtractor"
}
}
55 changes: 46 additions & 9 deletions bin/datasource/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ services:

# Kafka Connect with ClickHouse connector
kafka-connect:
image: countly/kafka-connect-clickhouse:latest
build:
context: .
dockerfile: Dockerfile-kafka-connect-clickhouse-dev
container_name: kafka-connect
depends_on:
kafka:
Expand Down Expand Up @@ -80,6 +82,9 @@ services:
CONNECT_CONSUMER_MAX_PARTITION_FETCH_BYTES: "52428800" # 50MB per partition
CONNECT_PRODUCER_MAX_REQUEST_SIZE: "52428800" # 50MB producer request
CONNECT_PRODUCER_BUFFER_MEMORY: "67108864" # 64MB producer buffer
# Small heap to force frequent GC — for benchmarking SMT GC overhead only
#KAFKA_HEAP_OPTS: "-Xms128M -Xmx512M"
#KAFKA_JVM_PERFORMANCE_OPTS: "-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Xlog:gc*:file=/tmp/gc.log:time,level,tags:filecount=1,filesize=50m"
networks:
- kafka-network

Expand Down Expand Up @@ -130,7 +135,7 @@ services:
sleep 5
done
echo 'Connect is ready, creating connector...'

curl -X PUT -H 'Content-Type: application/json' \
-d '{
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
Expand All @@ -154,13 +159,45 @@ services:
"errors.deadletterqueue.topic.replication.factor": "1"
}' \
http://kafka-connect:8083/connectors/clickhouse-sink/config

if [ $? -eq 0 ]; then
echo 'Connector created successfully with DLQ!'
else
echo 'Failed to create connector'

if [ $? -ne 0 ]; then
echo 'Failed to create clickhouse-sink connector'
exit 1
fi
echo 'clickhouse-sink connector created successfully!'

echo 'Creating clickhouse-sink-raw connector...'
curl -X PUT -H 'Content-Type: application/json' \
-d '{
"connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
"tasks.max": "1",
"topics": "drill-events",
"hostname": "clickhouse",
"port": "8123",
"ssl": "false",
"database": "countly_drill",
"username": "default",
"password": "",
"topic2TableMap": "drill-events=drill_events_raw",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.retry.timeout": "300000",
"errors.retry.delay.max.ms": "10000",
"errors.deadletterqueue.topic.name": "drill-events-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "1",
"transforms": "extractRaw",
"transforms.extractRaw.type": "dev.you.smt.RawEventExtractor"
}' \
http://kafka-connect:8083/connectors/clickhouse-sink-raw/config

if [ $? -ne 0 ]; then
echo 'Failed to create clickhouse-sink-raw connector'
exit 1
fi
echo 'clickhouse-sink-raw connector created successfully!'
restart: "no"

# Kafka UI for monitoring
Expand Down Expand Up @@ -216,7 +253,7 @@ services:
sleep 2
done
echo 'MongoDB is ready, initializing replica set...'

mongosh --host mongodb:27017 --eval '
try {
rs.status();
Expand All @@ -232,7 +269,7 @@ services:
console.log("Replica set initialized successfully");
}
'

echo 'Waiting for replica set to be ready...'
mongosh --host mongodb:27017 --eval '
while (true) {
Expand Down
Loading
Loading