From 4408eb9c8452a7176fbc0f30e605577e88f70ca0 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Wed, 25 Feb 2026 18:41:55 +0100 Subject: [PATCH 1/5] Replace synchronized with ReadWriteLock in StatsDAggregator Replaces per-shard synchronized(map) blocks with explicit ReentrantReadWriteLock[], eliminating the virtual-thread pinning source. Also switches aggregateMetrics from ArrayList to a plain array to avoid indirection. Co-Authored-By: Claude Sonnet 4.6 --- .../com/timgroup/statsd/StatsDAggregator.java | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index 5a0adae3..8d54229f 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -1,18 +1,22 @@ package com.timgroup.statsd; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class StatsDAggregator { public static int DEFAULT_FLUSH_INTERVAL = 2000; // 2s public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention. protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread"; - protected final ArrayList> aggregateMetrics; + + @SuppressWarnings("unchecked") + protected final Map[] aggregateMetrics; + private final ReadWriteLock[] locks; protected final int shardGranularity; protected final long flushInterval; @@ -38,19 +42,22 @@ public void run() { * @param shards number of shards for the aggregation map. * @param flushInterval flush interval in miliseconds, 0 disables message aggregation. */ + @SuppressWarnings("unchecked") public StatsDAggregator( final StatsDProcessor processor, final int shards, final long flushInterval) { this.processor = processor; this.flushInterval = flushInterval; this.shardGranularity = shards; - this.aggregateMetrics = new ArrayList<>(shards); + this.aggregateMetrics = new HashMap[shards]; + this.locks = new ReentrantReadWriteLock[shards]; if (flushInterval > 0) { this.scheduler = new Timer(AGGREGATOR_THREAD_NAME, true); } for (int i = 0; i < this.shardGranularity; i++) { - this.aggregateMetrics.add(i, new HashMap()); + this.aggregateMetrics[i] = new HashMap<>(); + this.locks[i] = new ReentrantReadWriteLock(); } } @@ -84,9 +91,10 @@ public boolean aggregateMessage(Message message) { int hash = message.hashCode(); int bucket = Math.abs(hash % this.shardGranularity); - Map map = aggregateMetrics.get(bucket); + Map map = aggregateMetrics[bucket]; - synchronized (map) { + locks[bucket].writeLock().lock(); + try { // For now let's just put the message in the map Message msg = MapUtils.putIfAbsent(map, message); if (msg != null) { @@ -110,6 +118,8 @@ public boolean aggregateMessage(Message message) { } } } + } finally { + locks[bucket].writeLock().unlock(); } return true; @@ -125,9 +135,10 @@ public final int getShardGranularity() { protected void flush() { for (int i = 0; i < shardGranularity; i++) { - Map map = aggregateMetrics.get(i); + Map map = aggregateMetrics[i]; - synchronized (map) { + locks[i].writeLock().lock(); + try { Iterator> iter = map.entrySet().iterator(); while (iter.hasNext()) { Message msg = iter.next().getValue(); @@ -139,6 +150,8 @@ protected void flush() { iter.remove(); } + } finally { + locks[i].writeLock().unlock(); } } } From 610fd2fcdd1fb3669c2fd3b63ede1fd9ad958713 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Wed, 25 Feb 2026 19:53:19 +0100 Subject: [PATCH 2/5] Fix test: aggregateMetrics is now an array, not ArrayList Co-Authored-By: Claude Sonnet 4.6 --- src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index ece7b4ba..dfd48e82 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -253,7 +253,7 @@ public int hashCode() { waitForQueueSize(fakeProcessor.messages, 0); for (int i = 0; i < StatsDAggregator.DEFAULT_SHARDS; i++) { - Map map = fakeProcessor.aggregator.aggregateMetrics.get(i); + Map map = fakeProcessor.aggregator.aggregateMetrics[i]; synchronized (map) { Iterator> iter = map.entrySet().iterator(); int count = 0; From da11ab97b853332e4c830eb2acdf06cb6206b76b Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Wed, 25 Feb 2026 20:07:02 +0100 Subject: [PATCH 3/5] Downgrade to ReentrantLock; remove stale synchronized in test ReadWriteLock was overkill since only write paths exist. ReentrantLock is simpler and has lower AQS overhead. Also removed the now-uncontended synchronized(map) block from the test that was locking on an object no production code uses. Co-Authored-By: Claude Sonnet 4.6 --- .../com/timgroup/statsd/StatsDAggregator.java | 18 +++++++++--------- .../timgroup/statsd/StatsDAggregatorTest.java | 18 ++++++++---------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index 8d54229f..f84902da 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -5,8 +5,8 @@ import java.util.Map; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class StatsDAggregator { public static int DEFAULT_FLUSH_INTERVAL = 2000; // 2s @@ -16,7 +16,7 @@ public class StatsDAggregator { @SuppressWarnings("unchecked") protected final Map[] aggregateMetrics; - private final ReadWriteLock[] locks; + private final Lock[] locks; protected final int shardGranularity; protected final long flushInterval; @@ -49,7 +49,7 @@ public StatsDAggregator( this.flushInterval = flushInterval; this.shardGranularity = shards; this.aggregateMetrics = new HashMap[shards]; - this.locks = new ReentrantReadWriteLock[shards]; + this.locks = new ReentrantLock[shards]; if (flushInterval > 0) { this.scheduler = new Timer(AGGREGATOR_THREAD_NAME, true); @@ -57,7 +57,7 @@ public StatsDAggregator( for (int i = 0; i < this.shardGranularity; i++) { this.aggregateMetrics[i] = new HashMap<>(); - this.locks[i] = new ReentrantReadWriteLock(); + this.locks[i] = new ReentrantLock(); } } @@ -93,7 +93,7 @@ public boolean aggregateMessage(Message message) { int bucket = Math.abs(hash % this.shardGranularity); Map map = aggregateMetrics[bucket]; - locks[bucket].writeLock().lock(); + locks[bucket].lock(); try { // For now let's just put the message in the map Message msg = MapUtils.putIfAbsent(map, message); @@ -119,7 +119,7 @@ public boolean aggregateMessage(Message message) { } } } finally { - locks[bucket].writeLock().unlock(); + locks[bucket].unlock(); } return true; @@ -137,7 +137,7 @@ protected void flush() { for (int i = 0; i < shardGranularity; i++) { Map map = aggregateMetrics[i]; - locks[i].writeLock().lock(); + locks[i].lock(); try { Iterator> iter = map.entrySet().iterator(); while (iter.hasNext()) { @@ -151,7 +151,7 @@ protected void flush() { iter.remove(); } } finally { - locks[i].writeLock().unlock(); + locks[i].unlock(); } } } diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index dfd48e82..01e72130 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -254,17 +254,15 @@ public int hashCode() { for (int i = 0; i < StatsDAggregator.DEFAULT_SHARDS; i++) { Map map = fakeProcessor.aggregator.aggregateMetrics[i]; - synchronized (map) { - Iterator> iter = map.entrySet().iterator(); - int count = 0; - while (iter.hasNext()) { - count++; - iter.next(); - } - - // sharding should be balanced - assertEquals(iterations, count); + Iterator> iter = map.entrySet().iterator(); + int count = 0; + while (iter.hasNext()) { + count++; + iter.next(); } + + // sharding should be balanced + assertEquals(iterations, count); } } From f80d9ae0dc56881973123a793774044713ae367e Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Wed, 25 Feb 2026 20:21:34 +0100 Subject: [PATCH 4/5] Fix spotless formatting violation Co-Authored-By: Claude Sonnet 4.6 --- src/main/java/com/timgroup/statsd/StatsDAggregator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index f84902da..30f65d9b 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -16,6 +16,7 @@ public class StatsDAggregator { @SuppressWarnings("unchecked") protected final Map[] aggregateMetrics; + private final Lock[] locks; protected final int shardGranularity; From 3253627406d1a5ddc886ebebe1e82edb7a621343 Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Thu, 26 Feb 2026 10:55:05 +0100 Subject: [PATCH 5/5] Revert aggregateMetrics to ArrayList per review feedback Keep the PR focused on the lock change only. Co-Authored-By: Claude Sonnet 4.6 --- .../java/com/timgroup/statsd/StatsDAggregator.java | 14 ++++++-------- .../com/timgroup/statsd/StatsDAggregatorTest.java | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index 30f65d9b..9d27c00d 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -1,5 +1,6 @@ package com.timgroup.statsd; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -14,9 +15,7 @@ public class StatsDAggregator { protected final String AGGREGATOR_THREAD_NAME = "statsd-aggregator-thread"; - @SuppressWarnings("unchecked") - protected final Map[] aggregateMetrics; - + protected final ArrayList> aggregateMetrics; private final Lock[] locks; protected final int shardGranularity; @@ -43,13 +42,12 @@ public void run() { * @param shards number of shards for the aggregation map. * @param flushInterval flush interval in miliseconds, 0 disables message aggregation. */ - @SuppressWarnings("unchecked") public StatsDAggregator( final StatsDProcessor processor, final int shards, final long flushInterval) { this.processor = processor; this.flushInterval = flushInterval; this.shardGranularity = shards; - this.aggregateMetrics = new HashMap[shards]; + this.aggregateMetrics = new ArrayList<>(shards); this.locks = new ReentrantLock[shards]; if (flushInterval > 0) { @@ -57,7 +55,7 @@ public StatsDAggregator( } for (int i = 0; i < this.shardGranularity; i++) { - this.aggregateMetrics[i] = new HashMap<>(); + this.aggregateMetrics.add(i, new HashMap()); this.locks[i] = new ReentrantLock(); } } @@ -92,7 +90,7 @@ public boolean aggregateMessage(Message message) { int hash = message.hashCode(); int bucket = Math.abs(hash % this.shardGranularity); - Map map = aggregateMetrics[bucket]; + Map map = aggregateMetrics.get(bucket); locks[bucket].lock(); try { @@ -136,7 +134,7 @@ public final int getShardGranularity() { protected void flush() { for (int i = 0; i < shardGranularity; i++) { - Map map = aggregateMetrics[i]; + Map map = aggregateMetrics.get(i); locks[i].lock(); try { diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 01e72130..92e34d0f 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -253,7 +253,7 @@ public int hashCode() { waitForQueueSize(fakeProcessor.messages, 0); for (int i = 0; i < StatsDAggregator.DEFAULT_SHARDS; i++) { - Map map = fakeProcessor.aggregator.aggregateMetrics[i]; + Map map = fakeProcessor.aggregator.aggregateMetrics.get(i); Iterator> iter = map.entrySet().iterator(); int count = 0; while (iter.hasNext()) {