diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java index 0f3b6df..f6d4ba6 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java @@ -5,7 +5,6 @@ import io.nats.client.*; import io.nats.client.api.StorageType; -import io.nats.client.api.StreamInfo; import io.nats.client.support.DateTimeUtils; import io.synadia.sm.ScheduleManagement; import io.synadia.sm.ScheduledMessageBuilder; @@ -13,7 +12,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static io.synadia.examples.ScheduleUtils.report; +import static io.synadia.examples.ScheduleExampleUtils.report; /** * Example: build and publish a few scheduled messages using @@ -57,26 +56,25 @@ public static void main(String[] args) { try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} // Use the utility to properly create a schedulable stream - StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); - report("Created stream", si.getConfiguration()); + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); CountDownLatch latch = new CountDownLatch(4); Dispatcher d = connection.createDispatcher(); // subscribe to the subject that receives the schedule message js.subscribe(SCHEDULES, d, m -> { - report("SCHEDULED (received)", m); + report("MONITORING via '" + SCHEDULES + "'", m); m.ack(); }, false); // subscribe to the target subject js.subscribe(TARGETS, d, m -> { - report("TARGETED (received)", m); + report("TARGETED via '" + TARGETS + "'", m); m.ack(); latch.countDown(); }, false); - report("SCHEDULE-NOW (publishing)"); + report("SCHEDULING " + SCHEDULE_PREFIX + "now"); new ScheduledMessageBuilder() .scheduleSubject(SCHEDULE_PREFIX + "now") .targetSubject(TARGET_PREFIX + "now") @@ -84,7 +82,7 @@ public static void main(String[] args) { .data("Schedule-Now") .scheduleMessage(js); - report("SCHEDULE-AT (publishing)"); + report("SCHEDULING " + SCHEDULE_PREFIX + "at"); new ScheduledMessageBuilder() .scheduleSubject(SCHEDULE_PREFIX + "at") .targetSubject(TARGET_PREFIX + "at") @@ -92,15 +90,19 @@ public static void main(String[] args) { .data("Scheduled-At") .scheduleMessage(js); - report("SCHEDULE-EVERY (publishing)"); + report("SCHEDULING " + SCHEDULE_PREFIX + "every"); new ScheduledMessageBuilder() - .scheduleSubject(SCHEDULE_PREFIX + "at") - .targetSubject(TARGET_PREFIX + "at") + .scheduleSubject(SCHEDULE_PREFIX + "every") + .targetSubject(TARGET_PREFIX + "every") .scheduleEvery(1, TimeUnit.SECONDS) .data("Every Second") .scheduleMessage(js); latch.await(); + + // The "every" schedule keeps firing until it is removed. + report("CANCEL " + SCHEDULE_PREFIX + "every", + ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM)); } } catch (Exception e) { diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java index 15ebd84..b7fc51a 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java @@ -13,12 +13,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static io.synadia.examples.ScheduleUtils.report; +import static io.synadia.examples.ScheduleExampleUtils.report; /** * Example: same scenario as {@link ScheduleBasics}, but built using * {@link io.synadia.sm.ScheduledMessageBuilder#build()} and then published * via {@link io.nats.client.JetStream#publish(io.nats.client.Message)}. + * There is really no reason to do this unless you specifically want to log the + * actual message. */ public class ScheduleBasicsAlternate { @@ -66,13 +68,13 @@ public static void main(String[] args) { // subscribe to the subject that receives the schedule message js.subscribe(SCHEDULES, d, m -> { - report("SCHEDULED (received)", m); + report("MONITORING via '" + SCHEDULES + "'", m); m.ack(); }, false); // subscribe to the target subject js.subscribe(TARGETS, d, m -> { - report("TARGETED (received)", m); + report("TARGETED via '" + TARGETS + "'", m); m.ack(); latch.countDown(); }, false); @@ -83,7 +85,7 @@ public static void main(String[] args) { .scheduleImmediate() .data("Schedule-Now") .build(); - report("SCHEDULE-NOW (publishing)", m); + report("SCHEDULING " + SCHEDULE_PREFIX + "now", m); js.publish(m); m = new ScheduledMessageBuilder() @@ -92,19 +94,23 @@ public static void main(String[] args) { .scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5)) .data("Scheduled-At") .build(); - report("SCHEDULE-AT (publishing)", m); + report("SCHEDULING " + SCHEDULE_PREFIX + "at", m); js.publish(m); m = new ScheduledMessageBuilder() - .scheduleSubject(SCHEDULE_PREFIX + "at") - .targetSubject(TARGET_PREFIX + "at") + .scheduleSubject(SCHEDULE_PREFIX + "every") + .targetSubject(TARGET_PREFIX + "every") .scheduleEvery(1, TimeUnit.SECONDS) .data("Every Second") .build(); - report("SCHEDULE-EVERY (publishing)", m); + report("SCHEDULING " + SCHEDULE_PREFIX + "every", m); js.publish(m); latch.await(); + + // The "every" schedule keeps firing until it is removed. + report("CANCEL " + SCHEDULE_PREFIX + "every", + ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM)); } } catch (Exception e) { diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleCancel.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleCancel.java new file mode 100644 index 0000000..866fef7 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleCancel.java @@ -0,0 +1,117 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduleManagement.Result; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.time.Duration; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: stop a running schedule with each + * {@link ScheduleManagement#cancelSchedule cancelSchedule} overload. + *

+ *

+ * The example also shows a {@link Result#NOT_FOUND} outcome by cancelling a + * subject that no longer has a schedule. + */ +public class ScheduleCancel { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleCancel() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + String bySeqSubject = SCHEDULE_PREFIX + "by-seq"; + String bySubjectSubject = SCHEDULE_PREFIX + "by-subject"; + String byLookupSubject = SCHEDULE_PREFIX + "by-lookup"; + + // Schedule each one an hour out so it can't fire while the example runs. + long bySeq = new ScheduledMessageBuilder() + .scheduleSubject(bySeqSubject) + .targetSubject(TARGET_PREFIX + "by-seq") + .scheduleIn(Duration.ofHours(1)) + .data("By-Seq") + .scheduleMessage(js); + report("SCHEDULED " + bySeqSubject + " at sequence " + bySeq); + + new ScheduledMessageBuilder() + .scheduleSubject(bySubjectSubject) + .targetSubject(TARGET_PREFIX + "by-subject") + .scheduleIn(Duration.ofHours(1)) + .data("By-Subject") + .scheduleMessage(js); + report("SCHEDULED " + bySubjectSubject); + + new ScheduledMessageBuilder() + .scheduleSubject(byLookupSubject) + .targetSubject(TARGET_PREFIX + "by-lookup") + .scheduleIn(Duration.ofHours(1)) + .data("By-Lookup") + .scheduleMessage(js); + report("SCHEDULED " + byLookupSubject); + + // 1) Sequence-based cancel. + Result r1 = ScheduleManagement.cancelSchedule(jsm, STREAM, bySeq); + report("CANCEL by sequence", r1); + + // 2) Subject + stream cancel. + Result r2 = ScheduleManagement.cancelSchedule(jsm, bySubjectSubject, STREAM); + report("CANCEL by subject + stream", r2); + + // 3) Subject-only cancel; the helper locates the stream. + Result r3 = ScheduleManagement.cancelSchedule(jsm, byLookupSubject); + report("CANCEL by subject (auto-find)", r3); + + // 4) Cancelling something that isn't there returns NOT_FOUND. + Result r4 = ScheduleManagement.cancelSchedule(jsm, bySeqSubject, STREAM); + report("CANCEL already-cancelled", r4); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleCron.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleCron.java new file mode 100644 index 0000000..55a1b39 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleCron.java @@ -0,0 +1,107 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.util.concurrent.CountDownLatch; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: schedule using cron expressions, including the + * {@link io.synadia.sm.ScheduledMessageBuilder#scheduleCron(String, String)} + * variant that takes an IANA time zone. + *

+ * NATS schedules use a six-field cron form (second minute hour day month + * day-of-week) per ADR-51. The expressions below fire on short intervals + * so the example completes quickly; the time-zone-bound expression behaves + * identically here because it does not pin a time of day, but the call shape + * is the same one you would use for {@code "0 30 9 * * *"} ("9:30 every day + * in New York"). + */ +public class ScheduleCron { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleCron() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + CountDownLatch latch = new CountDownLatch(4); + Dispatcher d = connection.createDispatcher(); + + js.subscribe(TARGETS, d, m -> { + report("TARGETED via '" + TARGETS + "'", m); + m.ack(); + latch.countDown(); + }, false); + + String cronSubject = SCHEDULE_PREFIX + "cron"; + String cronTzSubject = SCHEDULE_PREFIX + "cron-tz"; + + // Six-field cron: every two seconds. + report("SCHEDULING " + cronSubject + " with cron '*/2 * * * * *'"); + new ScheduledMessageBuilder() + .scheduleSubject(cronSubject) + .targetSubject(TARGET_PREFIX + "cron") + .scheduleCron("*/2 * * * * *") + .data("Cron-Every-2s") + .scheduleMessage(js); + + // Same expression, evaluated in a specific IANA time zone. + report("SCHEDULING " + cronTzSubject + " with cron '*/3 * * * * *' (America/New_York)"); + new ScheduledMessageBuilder() + .scheduleSubject(cronTzSubject) + .targetSubject(TARGET_PREFIX + "cron-tz") + .scheduleCron("*/3 * * * * *", "America/New_York") + .data("Cron-Every-3s-NY") + .scheduleMessage(js); + + latch.await(); + + // Cron schedules keep firing until they are removed. + report("CANCEL " + cronSubject, ScheduleManagement.cancelSchedule(jsm, cronSubject, STREAM)); + report("CANCEL " + cronTzSubject, ScheduleManagement.cancelSchedule(jsm, cronTzSubject, STREAM)); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleEveryAndIn.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleEveryAndIn.java new file mode 100644 index 0000000..195abf4 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleEveryAndIn.java @@ -0,0 +1,128 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: the remaining {@code scheduleEvery(...)} and {@code scheduleIn(...)} + * overloads on {@link ScheduledMessageBuilder} that the {@link ScheduleBasics} + * example does not exercise: + *

+ */ +public class ScheduleEveryAndIn { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleEveryAndIn() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + CountDownLatch latch = new CountDownLatch(6); + Dispatcher d = connection.createDispatcher(); + + js.subscribe(TARGETS, d, m -> { + report("TARGETED via '" + TARGETS + "'", m); + m.ack(); + latch.countDown(); + }, false); + + // @every using Go's time.ParseDuration syntax. + String everyStringSubject = SCHEDULE_PREFIX + "every-string"; + report("SCHEDULING " + everyStringSubject + " with scheduleEvery(\"2s\")"); + new ScheduledMessageBuilder() + .scheduleSubject(everyStringSubject) + .targetSubject(TARGET_PREFIX + "every-string") + .scheduleEvery("2s") + .data("Every-2s-String") + .scheduleMessage(js); + + // @every using a java.time.Duration. + String everyDurationSubject = SCHEDULE_PREFIX + "every-duration"; + report("SCHEDULING " + everyDurationSubject + " with scheduleEvery(Duration.ofSeconds(3))"); + new ScheduledMessageBuilder() + .scheduleSubject(everyDurationSubject) + .targetSubject(TARGET_PREFIX + "every-duration") + .scheduleEvery(Duration.ofSeconds(3)) + .data("Every-3s-Duration") + .scheduleMessage(js); + + // One-shot via Duration. + String inDurationSubject = SCHEDULE_PREFIX + "in-duration"; + report("SCHEDULING " + inDurationSubject + " with scheduleIn(Duration.ofSeconds(2))"); + new ScheduledMessageBuilder() + .scheduleSubject(inDurationSubject) + .targetSubject(TARGET_PREFIX + "in-duration") + .scheduleIn(Duration.ofSeconds(2)) + .data("In-2s-Duration") + .scheduleMessage(js); + + // One-shot via (long, TimeUnit). + String inUnitSubject = SCHEDULE_PREFIX + "in-unit"; + report("SCHEDULING " + inUnitSubject + " with scheduleIn(4, TimeUnit.SECONDS)"); + new ScheduledMessageBuilder() + .scheduleSubject(inUnitSubject) + .targetSubject(TARGET_PREFIX + "in-unit") + .scheduleIn(4, TimeUnit.SECONDS) + .data("In-4s-Unit") + .scheduleMessage(js); + + latch.await(); + + // The two recurring schedules will keep firing if not cancelled. + report("CANCEL " + everyStringSubject, ScheduleManagement.cancelSchedule(jsm, everyStringSubject, STREAM)); + report("CANCEL " + everyDurationSubject, ScheduleManagement.cancelSchedule(jsm, everyDurationSubject, STREAM)); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java similarity index 96% rename from schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java rename to schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java index 1e7bc4b..9eef648 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java @@ -9,9 +9,9 @@ /** * Small console-logging helpers shared by the example apps. */ -public class ScheduleUtils { +public class ScheduleExampleUtils { - private ScheduleUtils() {} + private ScheduleExampleUtils() {} /** * Print a pipe-separated, timestamped line to {@code System.out}. Each object is diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java index 748a25e..fab67f4 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java @@ -5,7 +5,6 @@ import io.nats.client.*; import io.nats.client.api.StorageType; -import io.nats.client.api.StreamInfo; import io.nats.client.impl.Headers; import io.nats.client.impl.NatsMessage; import io.synadia.sm.ScheduleManagement; @@ -14,7 +13,7 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; -import static io.synadia.examples.ScheduleUtils.report; +import static io.synadia.examples.ScheduleExampleUtils.report; /** * Example: schedule a message whose body and headers are taken from the last @@ -54,8 +53,7 @@ public static void main(String[] args) { try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} // Use the utility to properly create a schedulable stream - StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); - report("Created stream", si.getConfiguration()); + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(2); diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleHeadersAndCopy.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleHeadersAndCopy.java new file mode 100644 index 0000000..49fad3c --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleHeadersAndCopy.java @@ -0,0 +1,112 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.util.concurrent.CountDownLatch; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: attaching user headers to a schedule and seeding a schedule from + * an existing {@link Message} via {@link ScheduledMessageBuilder#copy(Message)}. + *

+ * The builder writes its own {@code Nats-Schedule-*} headers after copying + * user headers, so user headers are preserved on the message that the schedule + * publishes to the target subject. + */ +public class ScheduleHeadersAndCopy { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleHeadersAndCopy() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + CountDownLatch latch = new CountDownLatch(2); + Dispatcher d = connection.createDispatcher(); + + js.subscribe(TARGETS, d, m -> { + report("TARGETED via '" + TARGETS + "'", m); + m.ack(); + latch.countDown(); + }, false); + + // 1) Attach custom headers to a scheduled message. + Headers userHeaders = new Headers(); + userHeaders.put("X-Trace-Id", "abc-123"); + userHeaders.put("X-Origin", "ScheduleHeadersAndCopy"); + + String withHeadersSubject = SCHEDULE_PREFIX + "with-headers"; + report("SCHEDULING " + withHeadersSubject + " with custom headers"); + new ScheduledMessageBuilder() + .scheduleSubject(withHeadersSubject) + .targetSubject(TARGET_PREFIX + "with-headers") + .scheduleImmediate() + .headers(userHeaders) + .data("Has-User-Headers") + .scheduleMessage(js); + + // 2) Seed a schedule from an existing message via copy(). + // copy() pulls subject, data, and headers; the schedule subject + // here is taken from the source message, while the target subject + // is set explicitly. + Headers seedHeaders = new Headers(); + seedHeaders.put("X-Seeded-From", "template"); + Message seed = new NatsMessage(SCHEDULE_PREFIX + "copied", null, seedHeaders, "Seed-Data".getBytes()); + report("SEED message (template for copy())", seed); + + String copiedSubject = SCHEDULE_PREFIX + "copied"; + report("SCHEDULING " + copiedSubject + " built via copy(seed)"); + new ScheduledMessageBuilder() + .copy(seed) + .targetSubject(TARGET_PREFIX + "copied") + .scheduleImmediate() + .scheduleMessage(js); + + latch.await(); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/SchedulePredefined.java b/schedule-message/src/examples/java/io/synadia/examples/SchedulePredefined.java new file mode 100644 index 0000000..46d2b00 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/SchedulePredefined.java @@ -0,0 +1,86 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.synadia.sm.PredefinedSchedules; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: build a schedule for every entry of {@link PredefinedSchedules} + * ({@code @hourly}, {@code @daily}, {@code @weekly}, ...). The shortest + * predefined interval is {@code @hourly}, so this example would not fire + * within a reasonable test run; instead it publishes each schedule, reports + * what got stored on the stream, then cancels it. + */ +public class SchedulePredefined { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private SchedulePredefined() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + for (PredefinedSchedules p : PredefinedSchedules.values()) { + String scheduleSubject = SCHEDULE_PREFIX + p.name().toLowerCase(); + String targetSubject = TARGET_PREFIX + p.name().toLowerCase(); + + // Show what is being sent to the schedule subject. + Message m = new ScheduledMessageBuilder() + .scheduleSubject(scheduleSubject) + .targetSubject(targetSubject) + .schedule(p) + .data("Predefined-" + p.name()) + .build(); + report("SCHEDULING " + p.name(), m); + + js.publish(m); + + // Predefined schedules fire no more often than hourly, + // so cancel each one immediately to keep the stream tidy. + report("CANCEL " + scheduleSubject, + ScheduleManagement.cancelSchedule(jsm, scheduleSubject, STREAM)); + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/SchedulePublishAndCancel.java b/schedule-message/src/examples/java/io/synadia/examples/SchedulePublishAndCancel.java new file mode 100644 index 0000000..8fb6099 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/SchedulePublishAndCancel.java @@ -0,0 +1,169 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.PublishAck; +import io.nats.client.api.StorageType; +import io.nats.client.impl.Headers; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: the three atomic publish-and-stop calls on {@link ScheduleManagement}. + * Each one publishes a message to the {@code targetSubject} and stops the named + * schedule as a single atomic step. + *

    + *
  1. {@link ScheduleManagement#publishAndCancelSchedule(JetStreamManagement, String, String, byte[], Headers)} + * — unguarded; always publishes and returns a non-null ack
  2. + *
  3. {@link ScheduleManagement#publishAndCancelScheduleIfExists(JetStreamManagement, String, String, byte[], Headers)} + * — guarded; returns {@code null} when the schedule is no longer present
  4. + *
  5. {@link ScheduleManagement#publishAndCancelSchedule(JetStreamManagement, String, long, String, byte[], Headers)} + * — explicit-sequence overload, uses + * {@code Nats-Expected-Last-Subject-Sequence} so the publish only succeeds + * while the schedule message is still at the named sequence
  6. + *
+ * The example also calls the guarded form against an already-cancelled subject + * to show the {@code null} return. + */ +public class SchedulePublishAndCancel { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private SchedulePublishAndCancel() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + CountDownLatch latch = new CountDownLatch(3); + Dispatcher d = connection.createDispatcher(); + + js.subscribe(TARGETS, d, m -> { + report("TARGETED via '" + TARGETS + "'", m); + m.ack(); + latch.countDown(); + }, false); + + String unguardedSubject = SCHEDULE_PREFIX + "unguarded"; + String guardedSubject = SCHEDULE_PREFIX + "guarded"; + String bySeqSubject = SCHEDULE_PREFIX + "by-seq"; + + // Plant three schedules an hour out so none fire during the example. + new ScheduledMessageBuilder() + .scheduleSubject(unguardedSubject) + .targetSubject(TARGET_PREFIX + "unguarded") + .scheduleIn(Duration.ofHours(1)) + .data("placeholder") + .scheduleMessage(js); + + new ScheduledMessageBuilder() + .scheduleSubject(guardedSubject) + .targetSubject(TARGET_PREFIX + "guarded") + .scheduleIn(Duration.ofHours(1)) + .data("placeholder") + .scheduleMessage(js); + + long bySeq = new ScheduledMessageBuilder() + .scheduleSubject(bySeqSubject) + .targetSubject(TARGET_PREFIX + "by-seq") + .scheduleIn(Duration.ofHours(1)) + .data("placeholder") + .scheduleMessage(js); + + // 1) Unguarded form: publish + stop, no existence check. + // Always publishes and returns a non-null PublishAck (the call + // throws on publish failure), so no null check is needed. + PublishAck ack1 = ScheduleManagement.publishAndCancelSchedule( + jsm, + unguardedSubject, + TARGET_PREFIX + "unguarded", + "unguarded-payload".getBytes(), + null); + report("publishAndCancelSchedule (unguarded) seq=" + ack1.getSeqno()); + + // 2) Guarded form: looks up the schedule first. + // Returns null if the schedule is no longer present (or if the + // stream lookup is ambiguous), in which case nothing was published. + PublishAck ack2 = ScheduleManagement.publishAndCancelScheduleIfExists( + jsm, + guardedSubject, + TARGET_PREFIX + "guarded", + "guarded-payload".getBytes(), + null); + if (ack2 == null) { + report("publishAndCancelScheduleIfExists (present) - schedule not found, nothing published"); + } + else { + report("publishAndCancelScheduleIfExists (present) seq=" + ack2.getSeqno()); + } + + // 3) Explicit-sequence overload: uses Nats-Expected-Last-Subject-Sequence. + // Return type is non-nullable PublishAck; the call throws on a + // precondition or publish failure, so no null check is needed. + PublishAck ack3 = ScheduleManagement.publishAndCancelSchedule( + jsm, + bySeqSubject, + bySeq, + TARGET_PREFIX + "by-seq", + "by-seq-payload".getBytes(), + null); + report("publishAndCancelSchedule (by sequence) seq=" + ack3.getSeqno()); + + // 4) Guarded form against an already-cancelled subject -> null, no publish. + PublishAck ack4 = ScheduleManagement.publishAndCancelScheduleIfExists( + jsm, + unguardedSubject, + TARGET_PREFIX + "unguarded", + "should-not-be-sent".getBytes(), + null); + if (ack4 == null) { + report("publishAndCancelScheduleIfExists (gone) - null as expected, nothing published"); + } + else { + report("publishAndCancelScheduleIfExists (gone) unexpectedly published seq=" + ack4.getSeqno()); + } + + latch.await(); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleTtlAndRollup.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleTtlAndRollup.java new file mode 100644 index 0000000..b72b2f7 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleTtlAndRollup.java @@ -0,0 +1,133 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.synadia.examples.ScheduleExampleUtils.report; + +/** + * Example: the {@code Nats-Schedule-TTL} and {@code Nats-Schedule-Rollup} + * headers, set via {@link ScheduledMessageBuilder#messageTtl(MessageTtl)} and + * {@link ScheduledMessageBuilder#rollup()}. + *

+ *

+ * After a few fires this example reports the per-subject stream counts so the + * difference between TTL'd and rolled-up targets is visible. + */ +public class ScheduleTtlAndRollup { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleTtlAndRollup() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + + String ttlSubject = SCHEDULE_PREFIX + "ttl"; + String rollupSubject = SCHEDULE_PREFIX + "rollup"; + String ttlTarget = TARGET_PREFIX + "ttl"; + String rollupTarget = TARGET_PREFIX + "rollup"; + + AtomicInteger ttlHits = new AtomicInteger(); + AtomicInteger rollupHits = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(6); + Dispatcher d = connection.createDispatcher(); + + js.subscribe(TARGETS, d, m -> { + if (m.getSubject().equals(ttlTarget)) ttlHits.incrementAndGet(); + if (m.getSubject().equals(rollupTarget)) rollupHits.incrementAndGet(); + report("TARGETED via '" + TARGETS + "'", m); + m.ack(); + latch.countDown(); + }, false); + + // Each published message gets a 3-second TTL on the stream. + report("SCHEDULING " + ttlSubject + " with messageTtl(3s)"); + new ScheduledMessageBuilder() + .scheduleSubject(ttlSubject) + .targetSubject(ttlTarget) + .scheduleEvery(2, TimeUnit.SECONDS) + .messageTtl(MessageTtl.seconds(3)) + .data("TTL-3s") + .scheduleMessage(js); + + // Each fire replaces any prior message on the target subject. + report("SCHEDULING " + rollupSubject + " with rollup()"); + new ScheduledMessageBuilder() + .scheduleSubject(rollupSubject) + .targetSubject(rollupTarget) + .scheduleEvery(2, TimeUnit.SECONDS) + .rollup() + .data("Rollup") + .scheduleMessage(js); + + latch.await(); + + report("CANCEL " + ttlSubject, ScheduleManagement.cancelSchedule(jsm, ttlSubject, STREAM)); + report("CANCEL " + rollupSubject, ScheduleManagement.cancelSchedule(jsm, rollupSubject, STREAM)); + + report("DELIVERED to " + ttlTarget, ttlHits.get()); + report("DELIVERED to " + rollupTarget, rollupHits.get()); + + // Both targets received the same number of fires, but the rollup + // target retains only the latest message; the TTL target retains + // each published message until it expires after 3s. + report("STREAM count for " + ttlTarget, + jsm.getStreamInfo(STREAM, io.nats.client.api.StreamInfoOptions.filterSubjects(ttlTarget)) + .getStreamState().getSubjectCount()); + report("STREAM count for " + rollupTarget, + jsm.getStreamInfo(STREAM, io.nats.client.api.StreamInfoOptions.filterSubjects(rollupTarget)) + .getStreamState().getSubjectCount()); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java b/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java index 4f01ab9..5613e09 100644 --- a/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java +++ b/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java @@ -25,10 +25,15 @@ * longer fire. {@link #cancelSchedule(JetStreamManagement, String, long)} deletes by * stream sequence; the subject-based overloads look the sequence up first. *
  • Atomic publish-and-stop — publish a message to a different subject and stop - * the schedule as a single atomic step, optionally guarded by an existence check on - * the schedule message. See - * {@link #publishAndCancelSchedule(JetStreamManagement, String, String, byte[], Headers, boolean)} - * and {@link #publishAndCancelSchedule(JetStreamManagement, String, long, String, byte[], Headers)}.
  • + * the schedule as a single atomic step. The unconditional form + * {@link #publishAndCancelSchedule(JetStreamManagement, String, String, byte[], Headers)} + * publishes without checking that the schedule still exists; + * {@link #publishAndCancelScheduleIfExists(JetStreamManagement, String, String, byte[], Headers)} + * guards the publish with an existence check, returning {@code null} if the schedule + * is no longer present. The sequence-bound variant + * {@link #publishAndCancelSchedule(JetStreamManagement, String, long, String, byte[], Headers)} + * uses {@code Nats-Expected-Last-Subject-Sequence} so the publish only succeeds while + * the schedule message is still at the named sequence. * * Per the ADR the publish subject of the atomic variants must not equal the schedule * subject; the server rejects such publishes with error code {@code 10212}. @@ -183,55 +188,78 @@ public static Result cancelSchedule(JetStreamManagement jsm, String scheduleSubj *
  • {@code Nats-Scheduler}: {@code scheduleSubject}
  • *
  • {@code Nats-Schedule-Next}: {@code purge}
  • * + * The publish is sent unconditionally; the schedule is stopped as a side effect of + * the server processing the headers. Use + * {@link #publishAndCancelScheduleIfExists(JetStreamManagement, String, String, byte[], Headers)} + * if you need to skip the publish when the schedule is no longer present. + *

    * The {@code targetSubject} must not equal {@code scheduleSubject}. This constraint * is enforced by the server, not by this method, so passing equal subjects surfaces * as a {@link JetStreamApiException} with error code {@code 10212} from the publish * call. * - * @param jsm the JetStream management context (its - * {@code jetStream()} context is used to publish) - * @param scheduleSubject the schedule subject to stop - * @param targetSubject the subject to publish to; this may be the - * original schedule's target subject (to publish - * early) or any other subject. Must not equal - * {@code scheduleSubject} — see above - * @param data the message body; may be {@code null} - * @param userHeaders extra headers to include on the published - * message; may be {@code null}. The - * {@code Nats-Scheduler} and - * {@code Nats-Schedule-Next} headers are always - * set by this method and override any conflicting - * keys from {@code userHeaders} - * @param publishOnlyIfScheduleExists when {@code true}, the publish is sent with an - * expected-last-subject-sequence guard so it - * only succeeds if the schedule message is still - * present; when {@code false}, the publish is - * sent unconditionally - * @return the {@link PublishAck} from the server, or {@code null} when - * {@code publishOnlyIfScheduleExists} is {@code true} and the schedule for the - * subject could not be located. The lookup requires exactly one matching - * stream; if zero or more than one stream covers {@code scheduleSubject}, the - * method returns {@code null} without publishing + * @param jsm the JetStream management context (its {@code jetStream()} + * context is used to publish) + * @param scheduleSubject the schedule subject to stop + * @param targetSubject the subject to publish to; this may be the original + * schedule's target subject (to publish early) or any other + * subject. Must not equal {@code scheduleSubject} — see above + * @param data the message body; may be {@code null} + * @param userHeaders extra headers to include on the published message; may be + * {@code null}. The {@code Nats-Scheduler} and + * {@code Nats-Schedule-Next} headers are always set by this + * method and override any conflicting keys from + * {@code userHeaders} + * @return the {@link PublishAck} from the server * @throws JetStreamApiException if the server returned an error * @throws IOException if the request could not be sent */ - public static @Nullable PublishAck publishAndCancelSchedule(JetStreamManagement jsm, String scheduleSubject, String targetSubject, - byte @Nullable[] data, @Nullable Headers userHeaders, boolean publishOnlyIfScheduleExists) throws JetStreamApiException, IOException { - if (publishOnlyIfScheduleExists) { - String streamName = findStreamLenient(jsm, scheduleSubject); - if (streamName != null) { - long seq = getScheduleSequence(jsm, streamName, scheduleSubject); - if (seq != -1) { - return publishAndCancelSchedule(jsm, scheduleSubject, seq, targetSubject, data, userHeaders); - } - } - return null; - } - + public static PublishAck publishAndCancelSchedule(JetStreamManagement jsm, String scheduleSubject, String targetSubject, + byte @Nullable[] data, @Nullable Headers userHeaders) throws JetStreamApiException, IOException { Headers h = makeHeaders(scheduleSubject, userHeaders); return jsm.jetStream().publish(targetSubject, h, data); } + /** + * Guarded variant of + * {@link #publishAndCancelSchedule(JetStreamManagement, String, String, byte[], Headers)}: + * looks up the schedule message first and only publishes if it is still present, using + * the {@code Nats-Expected-Last-Subject-Sequence} precondition so the operation is + * atomic with any concurrent fire. + *

    + * The lookup requires exactly one stream to cover {@code scheduleSubject}; if + * zero or more than one stream matches, the method returns {@code null} without + * publishing. + * + * @param jsm the JetStream management context + * @param scheduleSubject the schedule subject to stop + * @param targetSubject the subject to publish to; must not equal + * {@code scheduleSubject} (server rejects with code + * {@code 10212}) + * @param data the message body; may be {@code null} + * @param userHeaders extra headers to include on the published message; may be + * {@code null}. The {@code Nats-Scheduler} and + * {@code Nats-Schedule-Next} headers are always set by this + * method and override any conflicting keys from + * {@code userHeaders} + * @return the {@link PublishAck} from the server, or {@code null} if the schedule + * for {@code scheduleSubject} could not be located (no schedule message present, + * or the stream lookup was ambiguous) + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static @Nullable PublishAck publishAndCancelScheduleIfExists(JetStreamManagement jsm, String scheduleSubject, String targetSubject, + byte @Nullable[] data, @Nullable Headers userHeaders) throws JetStreamApiException, IOException { + String streamName = findStreamLenient(jsm, scheduleSubject); + if (streamName != null) { + long seq = getScheduleSequence(jsm, streamName, scheduleSubject); + if (seq != -1) { + return publishAndCancelSchedule(jsm, scheduleSubject, seq, targetSubject, data, userHeaders); + } + } + return null; + } + /** * Atomic publish-and-stop guarded by an explicit existence check. Same headers as * the simpler overload, but additionally sets: diff --git a/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java b/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java index 561f1c1..fc630bd 100644 --- a/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java +++ b/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java @@ -182,7 +182,8 @@ public void testCancelBySubjectInStream_wildcard_noMatches() throws Exception { ScheduleManagement.cancelSchedule(jsm, f.schedPrefix + ".*", f.stream)); } - // -- publishAndCancelSchedule(jsm, sched, tgt, data, publishOnlyIfExists) -- + // -- publishAndCancelSchedule(jsm, sched, tgt, data, userHeaders) ---------- + // -- publishAndCancelScheduleIfExists(jsm, sched, tgt, data, userHeaders) -- @Test public void testPublishAndCancel_unconditional_success() throws Exception { @@ -192,7 +193,7 @@ public void testPublishAndCancel_unconditional_success() throws Exception { scheduleInTheFuture(schedSubject, tgtSubject, "body"); PublishAck ack = ScheduleManagement.publishAndCancelSchedule( - jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, false); + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null); assertNotNull(ack); assertFalse(scheduleExists(f.stream, schedSubject)); @@ -205,8 +206,8 @@ public void testPublishAndCancel_ifExists_whenPresent() throws Exception { String tgtSubject = f.tgt("p2"); scheduleInTheFuture(schedSubject, tgtSubject, "body"); - PublishAck ack = ScheduleManagement.publishAndCancelSchedule( - jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, true); + PublishAck ack = ScheduleManagement.publishAndCancelScheduleIfExists( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null); assertNotNull(ack); assertFalse(scheduleExists(f.stream, schedSubject)); @@ -218,8 +219,8 @@ public void testPublishAndCancel_ifExists_whenMissing() throws Exception { String schedSubject = f.sched("p3"); String tgtSubject = f.tgt("p3"); - PublishAck ack = ScheduleManagement.publishAndCancelSchedule( - jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, true); + PublishAck ack = ScheduleManagement.publishAndCancelScheduleIfExists( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null); assertNull(ack); } @@ -267,7 +268,7 @@ public void testPublishAndCancel_targetEqualsScheduleSubject_serverRejects() thr JetStreamApiException ex = assertThrows(JetStreamApiException.class, () -> ScheduleManagement.publishAndCancelSchedule( - jsm, sameSubject, sameSubject, "x".getBytes(), null, false)); + jsm, sameSubject, sameSubject, "x".getBytes(), null)); assertEquals(10212, ex.getApiErrorCode()); } @@ -293,7 +294,7 @@ public void testPublishAndCancel_userHeadersCannotOverrideRequired() throws Exce userHeaders.put("X-Custom", "carry-me"); PublishAck ack = ScheduleManagement.publishAndCancelSchedule( - jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), userHeaders, false); + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), userHeaders); assertNotNull(ack); // The schedule was actually cancelled — proves the required headers won.