Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pcgroups/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ NATS Partitioned consumer groups come in two flavors: *elastic* and *static*.

***Static*** partitioned consumer groups assume that the stream already has a partition number present as the first token of the message's subjects (something that can be done automatically when messages are stored into to the stream by setting a subject transform for the stream). You can only create and delete static consumer groups. Any change to the consumer group's config in the KV bucket will cause all the member instances for all members of the group to stop consuming.

***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups).
***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups). You have the option of specifying a subject filter for the consumer group and calculating the partition number from the subject name using a consistent hashing algorithm. Either through the use of `*` wildcards in the partitioning filter(s) and then specifying in the partitioning wildcards array the indexes of the `*` wildcards in the filter that you want to use for computing the partition number (you can specify between one index and all of the indexes), or by leaving that array of wildcard indexes empty (or not specifying a partitioning filter at all) in which case the partition number is calculated using the entirety of the message's subject.

***In both cases***
In both cases you must specify when creating the consumer group the maximum number of members for the group (which is actually the number of partitions used when partitioning the messages), plus a list of "members" (named instances of the consuming application). The library takes care of distributing the members over the list of partitions using either a 'balanced' distribution (the partitions are evenly distributed between the members) or 'mappings' (where you assign administratively the mappings of partitions to the members). The membership list or mappings must be specified once at consumer group creation time for static consumer groups, but can be changed at any time for elastic consumer groups.
Expand Down Expand Up @@ -65,8 +65,8 @@ This `cg` CLI tool can be used by passing it commands and arguments directly, or
For more details on the CLI visit the [Partitioned Consumer Groups CLI Project](https://github.com/synadia-io/orbit.java/tree/main/pcgroups-cli)

### Binaries
You can download the latest `cg.jar` archived in a tar file
[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar)
You can download the latest `cg.jar` archived in a tar file
[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar)
or zip file
[cg.zip](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.zip)

Expand Down Expand Up @@ -117,4 +117,4 @@ Partitioned consumer groups require NATS server version 2.11 or above.

---
Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details.
See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details.
15 changes: 11 additions & 4 deletions pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName

// Create the work queue stream with subject transform
String workQueueStreamName = composeCGSName(streamName, consumerGroupName);
String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">";
String filterDest = getPartitioningTransformDest(config);

StreamConfiguration.Builder scBuilder = StreamConfiguration.builder()
Expand All @@ -129,12 +130,11 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName
}

// Add source with subject transform
External external = null; // Local stream
scBuilder.addSource(Source.builder()
.sourceName(streamName)
.startSeq(0)
.subjectTransforms(SubjectTransform.builder()
.source(filter)
.source(effectiveFilter)
.destination(filterDest)
.build())
.build());
Expand Down Expand Up @@ -534,14 +534,16 @@ private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String st
}

private static String getPartitioningTransformDest(ElasticConsumerGroupConfig config) {
String effectiveFilter = (config.getFilter() != null && !config.getFilter().isEmpty()) ? config.getFilter() : ">";
int[] wildcards = config.getPartitioningWildcards();

StringBuilder wildcardList = new StringBuilder();
for (int i = 0; i < wildcards.length; i++) {
if (i > 0) wildcardList.append(",");
wildcardList.append(wildcards[i]);
}

String[] filterTokens = config.getFilter().split("\\.");
String[] filterTokens = effectiveFilter.split("\\.");
int cwIndex = 1;
for (int i = 0; i < filterTokens.length; i++) {
if (filterTokens[i].equals("*")) {
Expand All @@ -551,6 +553,11 @@ private static String getPartitioningTransformDest(ElasticConsumerGroupConfig co
}

String destFromFilter = String.join(".", filterTokens);

if (wildcards.length == 0) {
return "{{Partition(" + config.getMaxMembers() + ")}}." + destFromFilter;
}

return "{{Partition(" + config.getMaxMembers() + "," + wildcardList + ")}}." + destFromFilter;
}

Expand Down Expand Up @@ -643,7 +650,7 @@ public void endOfData() {
}
};

watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY);
watchSubscription = kv.watch(key, watcher);

} catch (Exception e) {
if (!stopped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,35 +189,35 @@ public void validate() throws ConsumerGroupException {
}

// Validate filter and partitioning wildcards
if (filter == null || filter.isEmpty()) {
throw new ConsumerGroupException("filter must not be empty");
}

String[] filterTokens = filter.split("\\.");
int numWildcards = 0;
for (String token : filterTokens) {
if ("*".equals(token)) {
numWildcards++;
if (filter != null && !filter.isEmpty()) {
String[] filterTokens = filter.split("\\.");
int numWildcards = 0;
for (String token : filterTokens) {
if ("*".equals(token)) {
numWildcards++;
}
}
}

if (numWildcards < 1) {
throw new ConsumerGroupException("filter must contain at least one * wildcard");
}

if (partitioningWildcards == null || partitioningWildcards.length < 1 || partitioningWildcards.length > numWildcards) {
throw new ConsumerGroupException("the number of partitioning wildcards must be between 1 and the total number of * wildcards in the filter");
}
if (numWildcards == 0 && !">".equals(filterTokens[filterTokens.length - 1])) {
throw new ConsumerGroupException("partitioning filters must have at least one * wildcard or end with > wildcard");
}

Set<Integer> seenWildcards = new HashSet<>();
for (int pwc : partitioningWildcards) {
if (seenWildcards.contains(pwc)) {
throw new ConsumerGroupException("partitioning wildcard indexes must be unique");
if (partitioningWildcards != null && partitioningWildcards.length > numWildcards) {
throw new ConsumerGroupException("the number of partitioning wildcards must not be larger than the total number of * wildcards in the filter");
}
seenWildcards.add(pwc);

Comment thread
jnmoyne marked this conversation as resolved.
if (pwc < 1 || pwc > numWildcards) {
throw new ConsumerGroupException("partitioning wildcard indexes must be greater than 1 and less than or equal to the number of * wildcards in the filter");
Set<Integer> seenWildcards = new HashSet<>();
if (partitioningWildcards != null) {
for (int pwc : partitioningWildcards) {
if (seenWildcards.contains(pwc)) {
throw new ConsumerGroupException("partitioning wildcard indexes must be unique");
}
seenWildcards.add(pwc);

if (pwc > numWildcards || pwc < 1) {
throw new ConsumerGroupException("partitioning wildcard indexes must be between 1 and the number of * wildcards in the filter");
}
}
Comment thread
jnmoyne marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -266,15 +266,18 @@ public void validate() throws ConsumerGroupException {
* Generates the subject transform destination for partitioning.
*/
public String getPartitioningTransformDest() {
String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">";
int[] effectiveWildcards = (partitioningWildcards != null) ? partitioningWildcards : new int[0];

StringBuilder wildcardList = new StringBuilder();
for (int i = 0; i < partitioningWildcards.length; i++) {
for (int i = 0; i < effectiveWildcards.length; i++) {
if (i > 0) {
wildcardList.append(",");
}
wildcardList.append(partitioningWildcards[i]);
wildcardList.append(effectiveWildcards[i]);
}

String[] filterTokens = filter.split("\\.");
String[] filterTokens = effectiveFilter.split("\\.");
int cwIndex = 1;
for (int i = 0; i < filterTokens.length; i++) {
if ("*".equals(filterTokens[i])) {
Expand All @@ -284,6 +287,11 @@ public String getPartitioningTransformDest() {
}

String destFromFilter = String.join(".", filterTokens);

if (effectiveWildcards.length == 0) {
return "{{Partition(" + maxMembers + ")}}." + destFromFilter;
}

return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public void endOfData() {
}
};

watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY);
watchSubscription = kv.watch(key, watcher);

} catch (Exception e) {
if (!stopped.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,65 @@ void testValidationMaxMembersZero() {
}

@Test
void testValidationFilterNoWildcard() {
void testValidationFilterNoWildcardNoGt() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.bar", new int[]{}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard"));
}

@Test
void testValidationFilterEndingWithGtIsValid() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.>", new int[]{}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

assertDoesNotThrow(config::validate);
}

@Test
void testValidationFilterNoWildcardWithWildcardsSpecified() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.bar", new int[]{1}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("filter must contain at least one * wildcard"));
assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard"));
}

@Test
void testValidationPartitioningWildcardsEmpty() {
void testValidationPartitioningWildcardsEmptyIsValid() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.*", new int[]{}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between"));
assertDoesNotThrow(config::validate);
}

@Test
void testValidationNoFilterIsValid() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, null, new int[]{}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

assertDoesNotThrow(config::validate);
}

@Test
void testValidationEmptyFilterIsValid() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "", new int[]{}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

assertDoesNotThrow(config::validate);
}

@Test
Comment thread
jnmoyne marked this conversation as resolved.
Expand All @@ -124,7 +164,7 @@ void testValidationPartitioningWildcardsTooMany() {
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between"));
assertTrue(exception.getMessage().contains("number of partitioning wildcards must not be larger than"));
}

@Test
Expand All @@ -135,7 +175,18 @@ void testValidationPartitioningWildcardsOutOfRange() {
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be greater than 1"));
assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and"));
}

@Test
void testValidationPartitioningWildcardsZeroIndex() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.*", new int[]{0}, 0, 0,
Arrays.asList("m1", "m2"), new ArrayList<>()
);

ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate);
assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and"));
}

@Test
Expand Down Expand Up @@ -232,6 +283,39 @@ void testGetPartitioningTransformDestPartialWildcards() {
assertEquals("{{Partition(8,2)}}.a.{{Wildcard(1)}}.b.{{Wildcard(2)}}.c.{{Wildcard(3)}}", dest);
}

@Test
void testGetPartitioningTransformDestNoWildcards() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "foo.*", new int[]{}, 0, 0,
new ArrayList<>(), new ArrayList<>()
);

String dest = config.getPartitioningTransformDest();
assertEquals("{{Partition(4)}}.foo.{{Wildcard(1)}}", dest);
}

@Test
void testGetPartitioningTransformDestNoFilter() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, null, new int[]{}, 0, 0,
new ArrayList<>(), new ArrayList<>()
);

String dest = config.getPartitioningTransformDest();
assertEquals("{{Partition(4)}}.>", dest);
}

@Test
void testGetPartitioningTransformDestEmptyFilter() {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
4, "", new int[]{}, 0, 0,
new ArrayList<>(), new ArrayList<>()
);

String dest = config.getPartitioningTransformDest();
assertEquals("{{Partition(4)}}.>", dest);
}

@Test
void testJsonSerializationWithMembers() throws JsonParseException {
ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig(
Expand Down
50 changes: 50 additions & 0 deletions pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,56 @@ void testElastic() throws Exception {

// Delete elastic consumer group
ElasticConsumerGroup.delete(nc, streamName, cgName);

// --- Test elastic with no partitioning filters (partition on whole subject) ---
String cgName2 = "group2";
AtomicInteger c3 = new AtomicInteger(0);
AtomicInteger c4 = new AtomicInteger(0);

// Create elastic consumer group with no filter (null) and empty wildcards
ElasticConsumerGroup.create(nc, streamName, cgName2, 2, null,
new int[]{}, -1, -1);

// Start consuming on both members
ConsumerGroupConsumeContext cc3 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m1", msg -> {
c3.incrementAndGet();
try {
msg.ack();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, config);

ConsumerGroupConsumeContext cc4 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m2", msg -> {
c4.incrementAndGet();
try {
msg.ack();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, config);

Comment thread
jnmoyne marked this conversation as resolved.
// Add members
ElasticConsumerGroup.addMembers(nc, streamName, cgName2, Arrays.asList("m1", "m2"));

// Wait for all 30 messages (from previous publishes) to be consumed, split between the 2 members
// The stream has 30 messages total (10 + 10 + 10 from the three publish phases above)
deadline = System.currentTimeMillis() + 10000;
while (c3.get() + c4.get() < 30) {
Thread.sleep(100);
if (System.currentTimeMillis() > deadline) {
fail("timeout no-filter elastic: c3=" + c3.get() + " c4=" + c4.get() + " expected total=30");
}
}

assertEquals(30, c3.get() + c4.get());

cc3.stop();
cc4.stop();

// Delete elastic consumer group
ElasticConsumerGroup.delete(nc, streamName, cgName2);

nc.close();
}
}
Expand Down
Loading