What needs to happen?
Context
- Java 25
- Dataflow Runner V2
- Protos
- streaming
I have relative "simple" pipeline, it receives proto messages from pubsub which it stores in a UserBag and when certain conditions happen, it emits an aggregation.
I was profiling this (using the GCP profiler as that seems to be only avenue)
I saw that around 30% of CPU usage is spent on org.github.jamm.MemoryMeter.measureDeep
profiler_project-tempest_CPU_2026-03-20T16_51_30Z_2026-03-20T17_51_30Z_2026-03-20_10_29_37-14862633681033515490.pb.tar.gz
So I looked into it and saw that it's possible to avoid the reflection cost If I implemented "Weighted"
/** Returns the amount of memory in bytes the provided object consumes. */
public static long weigh(Object o) {
if (o == null) {
return REFERENCE_SIZE;
}
if (o instanceof Weighted) {
return ((Weighted) o).getWeight() + REFERENCE_SIZE + 8;
}
try {
return MEMORY_METER.measureDeep(o);
} catch (RuntimeException e) {
// Checking for RuntimeException since java.lang.reflect.InaccessibleObjectException is only
// available starting Java 9
LOG.warn("JVM prevents jamm from accessing subgraph - cache sizes may be underestimated", e);
return MEMORY_METER.measure(o);
}
}
Which I did by creating a wrapper object for my Proto which implemented Weighted. Then I profiled it with same workload again.
profiler_project-tempest_CPU_2026-03-20T18_15_48Z_2026-03-20T18_45_48Z_2026-03-20_11_25_08-9097137418379310132.pb.tar.gz
It improved the performance but it didn't eliminate the measureDeep calls.
That is because when multiple events get processed within the same bundle it tries to "weigh" a list of elements. This list doesn't implement "Weighted" thus the reflection path gets used again.
See this code section
/**
* Appends the newValues to the cached iterable with newWeight weight. If newWeight is negative,
* the weight will be calculated using Caches.weigh.
*/
private void appendHelper(List<T> newValues, long newWeight) {
if (newValues.isEmpty()) {
return;
}
Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
if (existing == null) {
return;
}
// Check to see if we have cached the whole iterable, if not then we must remove it to prevent
// returning invalid results as part of a future request.
if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) {
cache.remove(IterableCacheKey.INSTANCE);
}
// Combine all the individual blocks into one block containing all the values since
// they were mutated, and we must evict all or none of the blocks. When consuming the blocks,
// we must have a reference to all or none of the blocks (which forces a load).
List<Block<T>> blocks = existing.getBlocks();
int totalSize = newValues.size();
for (Block<T> block : blocks) {
totalSize += block.getValues().size();
}
WeightedList<T> allValues = WeightedList.of(new ArrayList<>(totalSize), 0L);
for (Block<T> block : blocks) {
allValues.addAll(block.getValues(), block.getWeight());
}
if (newWeight < 0) {
if (newValues.size() == 1) {
// Optimize weighing of the common value state as single single-element bag state.
newWeight = Caches.weigh(newValues.get(0));
} else {
newWeight = Caches.weigh(newValues);
}
}
allValues.addAll(newValues, newWeight);
cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<>(Block.mutatedBlock(allValues)));
}
I tried a few other things but they weren't fruitful at eliminating this cost.
So my question is, can I avoid this cost?
Could some fixes be applied? For example that It would weigh each element individually?
or that a "helper" could be supplied to the StateSpec so that it would use this to calculate "weigh" instead.
Or maybe disable this "weighing" all together?
Issue Priority
Priority: 2 (default / most normal work should be filed as P2)
Issue Components
What needs to happen?
Context
I have relative "simple" pipeline, it receives proto messages from pubsub which it stores in a UserBag and when certain conditions happen, it emits an aggregation.
I was profiling this (using the GCP profiler as that seems to be only avenue)
I saw that around 30% of CPU usage is spent on
org.github.jamm.MemoryMeter.measureDeepprofiler_project-tempest_CPU_2026-03-20T16_51_30Z_2026-03-20T17_51_30Z_2026-03-20_10_29_37-14862633681033515490.pb.tar.gz
So I looked into it and saw that it's possible to avoid the reflection cost If I implemented "Weighted"
Which I did by creating a wrapper object for my Proto which implemented Weighted. Then I profiled it with same workload again.
profiler_project-tempest_CPU_2026-03-20T18_15_48Z_2026-03-20T18_45_48Z_2026-03-20_11_25_08-9097137418379310132.pb.tar.gz
It improved the performance but it didn't eliminate the
measureDeepcalls.That is because when multiple events get processed within the same bundle it tries to "weigh" a list of elements. This list doesn't implement "Weighted" thus the reflection path gets used again.
See this code section
I tried a few other things but they weren't fruitful at eliminating this cost.
So my question is, can I avoid this cost?
Could some fixes be applied? For example that It would weigh each element individually?
or that a "helper" could be supplied to the
StateSpecso that it would use this to calculate "weigh" instead.Or maybe disable this "weighing" all together?
Issue Priority
Priority: 2 (default / most normal work should be filed as P2)
Issue Components