diff --git a/pcgroups/README.md b/pcgroups/README.md index b3642f9..f175860 100644 --- a/pcgroups/README.md +++ b/pcgroups/README.md @@ -27,7 +27,7 @@ NATS Partitioned consumer groups come in two flavors: *elastic* and *static*. ***Static*** partitioned consumer groups assume that the stream already has a partition number present as the first token of the message's subjects (something that can be done automatically when messages are stored into to the stream by setting a subject transform for the stream). You can only create and delete static consumer groups. Any change to the consumer group's config in the KV bucket will cause all the member instances for all members of the group to stop consuming. -***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups). +***Elastic*** partitioned consumer groups on the other hand are implemented differently: the stream doesn't need to already contain a partition number subject token and you can administratively add and drop members from the consumer group's config whenever you want without having to delete and re-create the consumer (like you have to with static consumer groups). You have the option of specifying a subject filter for the consumer group and calculating the partition number from the subject name using a consistent hashing algorithm. Either through the use of `*` wildcards in the partitioning filter(s) and then specifying in the partitioning wildcards array the indexes of the `*` wildcards in the filter that you want to use for computing the partition number (you can specify between one index and all of the indexes), or by leaving that array of wildcard indexes empty (or not specifying a partitioning filter at all) in which case the partition number is calculated using the entirety of the message's subject. ***In both cases*** In both cases you must specify when creating the consumer group the maximum number of members for the group (which is actually the number of partitions used when partitioning the messages), plus a list of "members" (named instances of the consuming application). The library takes care of distributing the members over the list of partitions using either a 'balanced' distribution (the partitions are evenly distributed between the members) or 'mappings' (where you assign administratively the mappings of partitions to the members). The membership list or mappings must be specified once at consumer group creation time for static consumer groups, but can be changed at any time for elastic consumer groups. @@ -65,8 +65,8 @@ This `cg` CLI tool can be used by passing it commands and arguments directly, or For more details on the CLI visit the [Partitioned Consumer Groups CLI Project](https://github.com/synadia-io/orbit.java/tree/main/pcgroups-cli) ### Binaries -You can download the latest `cg.jar` archived in a tar file -[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar) +You can download the latest `cg.jar` archived in a tar file +[cg.tar](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.tar) or zip file [cg.zip](https://github.com/synadia-io/orbit.java/releases/download/pcgcli%2F0.1.0/cg.zip) @@ -117,4 +117,4 @@ Partitioned consumer groups require NATS server version 2.11 or above. --- Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. -See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details. +See [LICENSE](LICENSE) and [NOTICE](NOTICE) file for details. \ No newline at end of file diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java index a798323..fd80ecc 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroup.java @@ -111,6 +111,7 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName // Create the work queue stream with subject transform String workQueueStreamName = composeCGSName(streamName, consumerGroupName); + String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; String filterDest = getPartitioningTransformDest(config); StreamConfiguration.Builder scBuilder = StreamConfiguration.builder() @@ -129,12 +130,11 @@ public static ElasticConsumerGroupConfig create(Connection nc, String streamName } // Add source with subject transform - External external = null; // Local stream scBuilder.addSource(Source.builder() .sourceName(streamName) .startSeq(0) .subjectTransforms(SubjectTransform.builder() - .source(filter) + .source(effectiveFilter) .destination(filterDest) .build()) .build()); @@ -534,14 +534,16 @@ private static ElasticConsumerGroupConfig getConfigFromKV(KeyValue kv, String st } private static String getPartitioningTransformDest(ElasticConsumerGroupConfig config) { + String effectiveFilter = (config.getFilter() != null && !config.getFilter().isEmpty()) ? config.getFilter() : ">"; int[] wildcards = config.getPartitioningWildcards(); + StringBuilder wildcardList = new StringBuilder(); for (int i = 0; i < wildcards.length; i++) { if (i > 0) wildcardList.append(","); wildcardList.append(wildcards[i]); } - String[] filterTokens = config.getFilter().split("\\."); + String[] filterTokens = effectiveFilter.split("\\."); int cwIndex = 1; for (int i = 0; i < filterTokens.length; i++) { if (filterTokens[i].equals("*")) { @@ -551,6 +553,11 @@ private static String getPartitioningTransformDest(ElasticConsumerGroupConfig co } String destFromFilter = String.join(".", filterTokens); + + if (wildcards.length == 0) { + return "{{Partition(" + config.getMaxMembers() + ")}}." + destFromFilter; + } + return "{{Partition(" + config.getMaxMembers() + "," + wildcardList + ")}}." + destFromFilter; } @@ -643,7 +650,7 @@ public void endOfData() { } }; - watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY); + watchSubscription = kv.watch(key, watcher); } catch (Exception e) { if (!stopped.get()) { diff --git a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java index 8c66dc2..dc057af 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java +++ b/pcgroups/src/main/java/io/synadia/pcg/ElasticConsumerGroupConfig.java @@ -189,35 +189,35 @@ public void validate() throws ConsumerGroupException { } // Validate filter and partitioning wildcards - if (filter == null || filter.isEmpty()) { - throw new ConsumerGroupException("filter must not be empty"); - } - - String[] filterTokens = filter.split("\\."); - int numWildcards = 0; - for (String token : filterTokens) { - if ("*".equals(token)) { - numWildcards++; + if (filter != null && !filter.isEmpty()) { + String[] filterTokens = filter.split("\\."); + int numWildcards = 0; + for (String token : filterTokens) { + if ("*".equals(token)) { + numWildcards++; + } } - } - if (numWildcards < 1) { - throw new ConsumerGroupException("filter must contain at least one * wildcard"); - } - - if (partitioningWildcards == null || partitioningWildcards.length < 1 || partitioningWildcards.length > numWildcards) { - throw new ConsumerGroupException("the number of partitioning wildcards must be between 1 and the total number of * wildcards in the filter"); - } + if (numWildcards == 0 && !">".equals(filterTokens[filterTokens.length - 1])) { + throw new ConsumerGroupException("partitioning filters must have at least one * wildcard or end with > wildcard"); + } - Set seenWildcards = new HashSet<>(); - for (int pwc : partitioningWildcards) { - if (seenWildcards.contains(pwc)) { - throw new ConsumerGroupException("partitioning wildcard indexes must be unique"); + if (partitioningWildcards != null && partitioningWildcards.length > numWildcards) { + throw new ConsumerGroupException("the number of partitioning wildcards must not be larger than the total number of * wildcards in the filter"); } - seenWildcards.add(pwc); - if (pwc < 1 || pwc > numWildcards) { - throw new ConsumerGroupException("partitioning wildcard indexes must be greater than 1 and less than or equal to the number of * wildcards in the filter"); + Set seenWildcards = new HashSet<>(); + if (partitioningWildcards != null) { + for (int pwc : partitioningWildcards) { + if (seenWildcards.contains(pwc)) { + throw new ConsumerGroupException("partitioning wildcard indexes must be unique"); + } + seenWildcards.add(pwc); + + if (pwc > numWildcards || pwc < 1) { + throw new ConsumerGroupException("partitioning wildcard indexes must be between 1 and the number of * wildcards in the filter"); + } + } } } @@ -266,15 +266,18 @@ public void validate() throws ConsumerGroupException { * Generates the subject transform destination for partitioning. */ public String getPartitioningTransformDest() { + String effectiveFilter = (filter != null && !filter.isEmpty()) ? filter : ">"; + int[] effectiveWildcards = (partitioningWildcards != null) ? partitioningWildcards : new int[0]; + StringBuilder wildcardList = new StringBuilder(); - for (int i = 0; i < partitioningWildcards.length; i++) { + for (int i = 0; i < effectiveWildcards.length; i++) { if (i > 0) { wildcardList.append(","); } - wildcardList.append(partitioningWildcards[i]); + wildcardList.append(effectiveWildcards[i]); } - String[] filterTokens = filter.split("\\."); + String[] filterTokens = effectiveFilter.split("\\."); int cwIndex = 1; for (int i = 0; i < filterTokens.length; i++) { if ("*".equals(filterTokens[i])) { @@ -284,6 +287,11 @@ public String getPartitioningTransformDest() { } String destFromFilter = String.join(".", filterTokens); + + if (effectiveWildcards.length == 0) { + return "{{Partition(" + maxMembers + ")}}." + destFromFilter; + } + return "{{Partition(" + maxMembers + "," + wildcardList + ")}}." + destFromFilter; } diff --git a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java index 099e7b5..2948370 100644 --- a/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java +++ b/pcgroups/src/main/java/io/synadia/pcg/StaticConsumerGroup.java @@ -440,7 +440,7 @@ public void endOfData() { } }; - watchSubscription = kv.watch(key, watcher, KeyValueWatchOption.UPDATES_ONLY); + watchSubscription = kv.watch(key, watcher); } catch (Exception e) { if (!stopped.get()) { diff --git a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java index b6c2a59..ec06a23 100644 --- a/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java +++ b/pcgroups/src/test/java/io/synadia/pcg/ElasticConsumerGroupTest.java @@ -95,25 +95,65 @@ void testValidationMaxMembersZero() { } @Test - void testValidationFilterNoWildcard() { + void testValidationFilterNoWildcardNoGt() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.bar", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); + assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard")); + } + + @Test + void testValidationFilterEndingWithGtIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.>", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); + } + + @Test + void testValidationFilterNoWildcardWithWildcardsSpecified() { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( 4, "foo.bar", new int[]{1}, 0, 0, Arrays.asList("m1", "m2"), new ArrayList<>() ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("filter must contain at least one * wildcard")); + assertTrue(exception.getMessage().contains("partitioning filters must have at least one * wildcard or end with > wildcard")); } @Test - void testValidationPartitioningWildcardsEmpty() { + void testValidationPartitioningWildcardsEmptyIsValid() { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( 4, "foo.*", new int[]{}, 0, 0, Arrays.asList("m1", "m2"), new ArrayList<>() ); - ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between")); + assertDoesNotThrow(config::validate); + } + + @Test + void testValidationNoFilterIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, null, new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); + } + + @Test + void testValidationEmptyFilterIsValid() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "", new int[]{}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + assertDoesNotThrow(config::validate); } @Test @@ -124,7 +164,7 @@ void testValidationPartitioningWildcardsTooMany() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("number of partitioning wildcards must be between")); + assertTrue(exception.getMessage().contains("number of partitioning wildcards must not be larger than")); } @Test @@ -135,7 +175,18 @@ void testValidationPartitioningWildcardsOutOfRange() { ); ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); - assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be greater than 1")); + assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and")); + } + + @Test + void testValidationPartitioningWildcardsZeroIndex() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.*", new int[]{0}, 0, 0, + Arrays.asList("m1", "m2"), new ArrayList<>() + ); + + ConsumerGroupException exception = assertThrows(ConsumerGroupException.class, config::validate); + assertTrue(exception.getMessage().contains("partitioning wildcard indexes must be between 1 and")); } @Test @@ -232,6 +283,39 @@ void testGetPartitioningTransformDestPartialWildcards() { assertEquals("{{Partition(8,2)}}.a.{{Wildcard(1)}}.b.{{Wildcard(2)}}.c.{{Wildcard(3)}}", dest); } + @Test + void testGetPartitioningTransformDestNoWildcards() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "foo.*", new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.foo.{{Wildcard(1)}}", dest); + } + + @Test + void testGetPartitioningTransformDestNoFilter() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, null, new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.>", dest); + } + + @Test + void testGetPartitioningTransformDestEmptyFilter() { + ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( + 4, "", new int[]{}, 0, 0, + new ArrayList<>(), new ArrayList<>() + ); + + String dest = config.getPartitioningTransformDest(); + assertEquals("{{Partition(4)}}.>", dest); + } + @Test void testJsonSerializationWithMembers() throws JsonParseException { ElasticConsumerGroupConfig config = new ElasticConsumerGroupConfig( diff --git a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java index 1faa142..13b2586 100644 --- a/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java +++ b/pcgroups/src/test/java/io/synadia/pcg/IntegrationTest.java @@ -243,6 +243,56 @@ void testElastic() throws Exception { // Delete elastic consumer group ElasticConsumerGroup.delete(nc, streamName, cgName); + + // --- Test elastic with no partitioning filters (partition on whole subject) --- + String cgName2 = "group2"; + AtomicInteger c3 = new AtomicInteger(0); + AtomicInteger c4 = new AtomicInteger(0); + + // Create elastic consumer group with no filter (null) and empty wildcards + ElasticConsumerGroup.create(nc, streamName, cgName2, 2, null, + new int[]{}, -1, -1); + + // Start consuming on both members + ConsumerGroupConsumeContext cc3 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m1", msg -> { + c3.incrementAndGet(); + try { + msg.ack(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, config); + + ConsumerGroupConsumeContext cc4 = ElasticConsumerGroup.consume(nc, streamName, cgName2, "m2", msg -> { + c4.incrementAndGet(); + try { + msg.ack(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, config); + + // Add members + ElasticConsumerGroup.addMembers(nc, streamName, cgName2, Arrays.asList("m1", "m2")); + + // Wait for all 30 messages (from previous publishes) to be consumed, split between the 2 members + // The stream has 30 messages total (10 + 10 + 10 from the three publish phases above) + deadline = System.currentTimeMillis() + 10000; + while (c3.get() + c4.get() < 30) { + Thread.sleep(100); + if (System.currentTimeMillis() > deadline) { + fail("timeout no-filter elastic: c3=" + c3.get() + " c4=" + c4.get() + " expected total=30"); + } + } + + assertEquals(30, c3.get() + c4.get()); + + cc3.stop(); + cc4.stop(); + + // Delete elastic consumer group + ElasticConsumerGroup.delete(nc, streamName, cgName2); + nc.close(); } }