From 9ba10f5581caa6c12b5b6d87506527bb4122c79d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 12 Mar 2026 01:09:44 +0100 Subject: [PATCH 1/2] Elastic consumer groups: - enables not specifying any wildcard indexes, in which case the whole subject is used to compute the partition number. - Enables not specifying any partitioning filters, in which case ">" is used as the sole partition filter with the whole subject name being used to compute the partition number MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port of https://github.com/synadia-io/orbit.go/pull/46 Signed-off-by: Jean-Noël Moyne --- pcgroups/README.md | 8 +-- .../io/synadia/pcg/ElasticConsumerGroup.java | 13 +++- .../pcg/ElasticConsumerGroupConfig.java | 60 +++++++++-------- .../synadia/pcg/ElasticConsumerGroupTest.java | 66 +++++++++++++++++-- .../java/io/synadia/pcg/IntegrationTest.java | 50 ++++++++++++++ 5 files changed, 155 insertions(+), 42 deletions(-) diff --git a/pcgroups/README.md b/pcgroups/README.md index b3642f9..f175860 100644 --- a/pcgroups/README.md +++ b/pcgroups/README.md @@ -27,7 +27,7 @@ NATS Partitioned consumer groups come in two flavors: *elastic* and *static*. ***Static*** partitioned consumer groups assume that the stream already has a partition number present as the first token of the message's subjects (something that can be done automatically when messages are stored into to the stream by setting a subject transform for the stream). You can only create and delete static consumer groups. Any change to the consumer group's config in the KV bucket will cause all the member instances for all members of the group to stop consuming. -***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups). +***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups). You have the option of specifying a subject filter for the consumer group and calculating the partition number from the subject name using a consistent hashing algorithm. Either through the use of `*` wildcards in the partitioning filter(s) and then specifying in the partitioning wildcards array the indexes of the `*` wildcards in the filter that you want to use for computing the partition number (you can specify between one index and all of the indexes), or by leaving that array of wildcard indexes empty (or not specifying a partitioning filter at all) in which case the partition number is calculated using the entirety of the message's subject. ***In both cases*** In both cases you must specify when creating the consumer group the maximum number of members for the group (which is actually the number of partitions used when partitioning the messages), plus a list of "members" (named instances of the consuming application). The library takes care of distributing the members over the list of partitions using either a 'balanced' distribution (the partitions are evenly distributed between the members) or 'mappings' (where you assign administratively the mappings of partitions to the members). The membership list or mappings must be specified once at consumer group creation time for static consumer groups, but can be changed at any time for elastic consumer groups. @@ -65,8 +65,8 @@ This `cg` CLI tool can be used by passing it commands and arguments directly, or For more details on the CLI visit the [Partitioned Consumer Groups CLI Project](https://github.com/synadia-io/orbit.java/tree/main/pcgroups-cli) ### Binaries -You can download the latest `cg.jar` archived in a tar file -[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar) +You can download the latest `cg.jar` archived in a tar file +[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar) or zip file [cg.zip](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.zip) @@ -117,4 +117,4 @@ Partitioned consumer groups require NATS server version 2.11 or above. --- Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. -See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details. +See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details. \ No newline at end of file diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java index a798323..29366be 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java @@ -111,6 +111,7 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName // Create the work queue stream with subject transform String workQueueStreamName = composeCGSName(streamName, consumerGroupName); + String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; String filterDest = getPartitioningTransformDest(config); StreamConfiguration.Builder scBuilder = StreamConfiguration.builder() @@ -129,12 +130,11 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName } // Add source with subject transform - External external = null; // Local stream scBuilder.addSource(Source.builder() .sourceName(streamName) .startSeq(0) .subjectTransforms(SubjectTransform.builder() - .source(filter) + .source(effectiveFilter) .destination(filterDest) .build()) .build()); @@ -534,14 +534,16 @@ private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String st } private static String getPartitioningTransformDest(ElasticConsumerGroupConfig config) { + String effectiveFilter = (config.getFilter() != null && !config.getFilter().isEmpty()) ? config.getFilter() : ">"; int[] wildcards = config.getPartitioningWildcards(); + StringBuilder wildcardList = new StringBuilder(); for (int i = 0; i < wildcards.length; i++) { if (i > 0) wildcardList.append(","); wildcardList.append(wildcards[i]); } - String[] filterTokens = config.getFilter().split("\\."); + String[] filterTokens = effectiveFilter.split("\\."); int cwIndex = 1; for (int i = 0; i < filterTokens.length; i++) { if (filterTokens[i].equals("*")) { @@ -551,6 +553,11 @@ private static String getPartitioningTransformDest(ElasticConsumerGroupConfig co } String destFromFilter = String.join(".", filterTokens); + + if (wildcards.length == 0) { + return "{{Partition(" + config.getMaxMembers() + ")}}." + destFromFilter; + } + return "{{Partition(" + config.getMaxMembers() + "," + wildcardList + ")}}." + destFromFilter; } diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java index 8c66dc2..ccc098d 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java @@ -189,35 +189,31 @@ public void validate() throws ConsumerGroupException { } // Validate filter and partitioning wildcards - if (filter == null || filter.isEmpty()) { - throw new ConsumerGroupException("filter must not be empty"); - } - - String[] filterTokens = filter.split("\\."); - int numWildcards = 0; - for (String token : filterTokens) { - if ("*".equals(token)) { - numWildcards++; + if (filter != null && !filter.isEmpty()) { + String[] filterTokens = filter.split("\\."); + int numWildcards = 0; + for (String token : filterTokens) { + if ("*".equals(token)) { + numWildcards++; + } } - } - - if (numWildcards < 1) { - throw new ConsumerGroupException("filter must contain at least one * wildcard"); - } - - if (partitioningWildcards == null || partitioningWildcards.length < 1 || partitioningWildcards.length > numWildcards) { - throw new ConsumerGroupException("the number of partitioning wildcards must be between 1 and the total number of * wildcards in the filter"); - } - Set seenWildcards = new HashSet<>(); - for (int pwc : partitioningWildcards) { - if (seenWildcards.contains(pwc)) { - throw new ConsumerGroupException("partitioning wildcard indexes must be unique"); + if (partitioningWildcards != null && partitioningWildcards.length > numWildcards) { + throw new ConsumerGroupException("the number of partitioning wildcards must not be larger than the total number of * wildcards in the filter"); } - seenWildcards.add(pwc); - if (pwc < 1 || pwc > numWildcards) { - throw new ConsumerGroupException("partitioning wildcard indexes must be greater than 1 and less than or equal to the number of * wildcards in the filter"); + Set seenWildcards = new HashSet<>(); + if (partitioningWildcards != null) { + for (int pwc : partitioningWildcards) { + if (seenWildcards.contains(pwc)) { + throw new ConsumerGroupException("partitioning wildcard indexes must be unique"); + } + seenWildcards.add(pwc); + + if (pwc > numWildcards) { + throw new ConsumerGroupException("partitioning wildcard indexes must be less than or equal to the number of * wildcards in the filter"); + } + } } } @@ -266,15 +262,18 @@ public void validate() throws ConsumerGroupException { * Generates the subject transform destination for partitioning. */ public String getPartitioningTransformDest() { + String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; + int[] effectiveWildcards = (partitioningWildcards != null) ? partitioningWildcards : new int[0]; + StringBuilder wildcardList = new StringBuilder(); - for (int i = 0; i < partitioningWildcards.length; i++) { + for (int i = 0; i < effectiveWildcards.length; i++) { if (i > 0) { wildcardList.append(","); } - wildcardList.append(partitioningWildcards[i]); + wildcardList.append(effectiveWildcards[i]); } - String[] filterTokens = filter.split("\\."); + String[] filterTokens = effectiveFilter.split("\\."); int cwIndex = 1; for (int i = 0; i < filterTokens.length; i++) { if ("*".equals(filterTokens[i])) { @@ -284,6 +283,11 @@ public String getPartitioningTransformDest() { } String destFromFilter = String.join(".", filterTokens); + + if (effectiveWildcards.length == 0) { + return "{{Partition(" + maxMembers + ")}}." + destFromFilter; + } + return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter; } diff --git a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java index b6c2a59..c81dece 100644 --- a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java +++ b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java @@ -95,25 +95,44 @@ void testValidationMaxMembersZero() { } @Test - void testValidationFilterNoWildcard() { + void testValidationFilterNoWildcardWithWildcardsSpecified() { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( 4, "foo.bar", new int[]{1}, 0, 0, Arrays.asList("m1", "m2"), new ArrayList<>() ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("filter must contain at least one * wildcard")); + assertTrue(exception.getMessage().contains("number of partitioning wildcards must not be larger than")); } @Test - void testValidationPartitioningWildcardsEmpty() { + void testValidationPartitioningWildcardsEmptyIsValid() { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( 4, "foo.*", new int[]{}, 0, 0, Arrays.asList("m1", "m2"), new ArrayList<>() ); - ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between")); + assertDoesNotThrow(config::validate); + } + + @Test + void testValidationNoFilterIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, null, new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); + } + + @Test + void testValidationEmptyFilterIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); } @Test @@ -124,7 +143,7 @@ void testValidationPartitioningWildcardsTooMany() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between")); + assertTrue(exception.getMessage().contains("number of partitioning wildcards must not be larger than")); } @Test @@ -135,7 +154,7 @@ void testValidationPartitioningWildcardsOutOfRange() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be greater than 1")); + assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be less than or equal to")); } @Test @@ -232,6 +251,39 @@ void testGetPartitioningTransformDestPartialWildcards() { assertEquals("{{Partition(8,2)}}.a.{{Wildcard(1)}}.b.{{Wildcard(2)}}.c.{{Wildcard(3)}}", dest); } + @Test + void testGetPartitioningTransformDestNoWildcards() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.*", new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.foo.{{Wildcard(1)}}", dest); + } + + @Test + void testGetPartitioningTransformDestNoFilter() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, null, new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.>", dest); + } + + @Test + void testGetPartitioningTransformDestEmptyFilter() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "", new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.>", dest); + } + @Test void testJsonSerializationWithMembers() throws JsonParseException { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( diff --git a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java index 1faa142..13b2586 100644 --- a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java +++ b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java @@ -243,6 +243,56 @@ void testElastic() throws Exception { // Delete elastic consumer group ElasticConsumerGroup.delete(nc, streamName, cgName); + + // --- Test elastic with no partitioning filters (partition on whole subject) --- + String cgName2 = "group2"; + AtomicInteger c3 = new AtomicInteger(0); + AtomicInteger c4 = new AtomicInteger(0); + + // Create elastic consumer group with no filter (null) and empty wildcards + ElasticConsumerGroup.create(nc, streamName, cgName2, 2, null, + new int[]{}, -1, -1); + + // Start consuming on both members + ConsumerGroupConsumeContext cc3 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m1", msg -> { + c3.incrementAndGet(); + try { + msg.ack(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, config); + + ConsumerGroupConsumeContext cc4 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m2", msg -> { + c4.incrementAndGet(); + try { + msg.ack(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, config); + + // Add members + ElasticConsumerGroup.addMembers(nc, streamName, cgName2, Arrays.asList("m1", "m2")); + + // Wait for all 30 messages (from previous publishes) to be consumed, split between the 2 members + // The stream has 30 messages total (10 + 10 + 10 from the three publish phases above) + deadline = System.currentTimeMillis() + 10000; + while (c3.get() + c4.get() < 30) { + Thread.sleep(100); + if (System.currentTimeMillis() > deadline) { + fail("timeout no-filter elastic: c3=" + c3.get() + " c4=" + c4.get() + " expected total=30"); + } + } + + assertEquals(30, c3.get() + c4.get()); + + cc3.stop(); + cc4.stop(); + + // Delete elastic consumer group + ElasticConsumerGroup.delete(nc, streamName, cgName2); + nc.close(); } } From ef68eabb0e8ca775262687793716d0b8e0233c7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Thu, 12 Mar 2026 13:55:34 +0100 Subject: [PATCH 2/2] Improve elastic consumer group config validation. Fix KV watchers being created with update only. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- .../io/synadia/pcg/ElasticConsumerGroup.java | 2 +- .../pcg/ElasticConsumerGroupConfig.java | 8 +++-- .../io/synadia/pcg/StaticConsumerGroup.java | 2 +- .../synadia/pcg/ElasticConsumerGroupTest.java | 36 +++++++++++++++++-- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java index 29366be..fd80ecc 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java @@ -650,7 +650,7 @@ public void endOfData() { } }; - watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY); + watchSubscription = kv.watch(key, watcher); } catch (Exception e) { if (!stopped.get()) { diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java index ccc098d..dc057af 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java @@ -198,6 +198,10 @@ public void validate() throws ConsumerGroupException { } } + if (numWildcards == 0 && !">".equals(filterTokens[filterTokens.length - 1])) { + throw new ConsumerGroupException("partitioning filters must have at least one * wildcard or end with > wildcard"); + } + if (partitioningWildcards != null && partitioningWildcards.length > numWildcards) { throw new ConsumerGroupException("the number of partitioning wildcards must not be larger than the total number of * wildcards in the filter"); } @@ -210,8 +214,8 @@ public void validate() throws ConsumerGroupException { } seenWildcards.add(pwc); - if (pwc > numWildcards) { - throw new ConsumerGroupException("partitioning wildcard indexes must be less than or equal to the number of * wildcards in the filter"); + if (pwc > numWildcards || pwc < 1) { + throw new ConsumerGroupException("partitioning wildcard indexes must be between 1 and the number of * wildcards in the filter"); } } } diff --git a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java index 099e7b5..2948370 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java @@ -440,7 +440,7 @@ public void endOfData() { } }; - watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY); + watchSubscription = kv.watch(key, watcher); } catch (Exception e) { if (!stopped.get()) { diff --git a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java index c81dece..ec06a23 100644 --- a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java +++ b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java @@ -94,6 +94,27 @@ void testValidationMaxMembersZero() { assertTrue(exception.getMessage().contains("max number of members must be >= 1")); } + @Test + void testValidationFilterNoWildcardNoGt() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.bar", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); + assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard")); + } + + @Test + void testValidationFilterEndingWithGtIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.>", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); + } + @Test void testValidationFilterNoWildcardWithWildcardsSpecified() { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( @@ -102,7 +123,7 @@ void testValidationFilterNoWildcardWithWildcardsSpecified() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("number of partitioning wildcards must not be larger than")); + assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard")); } @Test @@ -154,7 +175,18 @@ void testValidationPartitioningWildcardsOutOfRange() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be less than or equal to")); + assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and")); + } + + @Test + void testValidationPartitioningWildcardsZeroIndex() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.*", new int[]{0}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); + assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and")); } @Test