diff --git a/retrier/src/main/java/io/synadia/retrier/Retrier.java b/retrier/src/main/java/io/synadia/retrier/Retrier.java index 2fdc49d..dc3249a 100644 --- a/retrier/src/main/java/io/synadia/retrier/Retrier.java +++ b/retrier/src/main/java/io/synadia/retrier/Retrier.java @@ -33,7 +33,7 @@ public static T execute(RetryConfig config, RetryAction action) throws Ex * or the observer declines to retry. */ public static T execute(RetryConfig config, RetryAction action, RetryObserver observer) throws Exception { - long[] backoffPolicy = config.getBackoffPolicy();; + long[] backoffPolicy = config.getBackoffPolicy(); int plen = backoffPolicy.length; int retries = 0; long deadlineExpiresAt = System.currentTimeMillis() + config.getDeadline(); diff --git a/schedule-message/build.gradle b/schedule-message/build.gradle index 2a61b39..406e1f3 100644 --- a/schedule-message/build.gradle +++ b/schedule-message/build.gradle @@ -40,7 +40,7 @@ repositories { } dependencies { - implementation 'io.nats:jnats:2.25.3-SNAPSHOT' + implementation 'io.nats:jnats:2.25.3' implementation 'org.jspecify:jspecify:1.0.0' implementation 'io.synadia:counters:0.2.2' diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java index 631a00b..0f3b6df 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java @@ -7,25 +7,41 @@ import io.nats.client.api.StorageType; import io.nats.client.api.StreamInfo; import io.nats.client.support.DateTimeUtils; +import io.synadia.sm.ScheduleManagement; import io.synadia.sm.ScheduledMessageBuilder; -import io.synadia.sm.ScheduledStreamUtil; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static io.synadia.examples.ScheduleUtils.report; +/** + * Example: build and publish a few scheduled messages using + * {@link io.synadia.sm.ScheduledMessageBuilder#scheduleMessage(io.nats.client.JetStream)}. + */ public class ScheduleBasics { + + /** Stream name used by this example. */ public static final String STREAM = "schedules-enabled"; + /** Prefix for all schedule subjects in this example. */ public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ public static final String TARGET_PREFIX = "target."; private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; private static final String TARGETS = TARGET_PREFIX + "*"; + /** Subject patterns the example stream accepts. */ public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + private ScheduleBasics() {} + + /** + * Example entry point. + * @param args ignored + */ public static void main(String[] args) { try { Options options = new Options.Builder() @@ -34,14 +50,14 @@ public static void main(String[] args) { .build(); try (Connection connection = Nats.connect(options)) { - JetStreamManagement jsm = connection.jetStreamManagement();; + JetStreamManagement jsm = connection.jetStreamManagement(); JetStream js = connection.jetStream(); // delete the stream in case it existed, just for a fresh example try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} // Use the utility to properly create a schedulable stream - StreamInfo si = ScheduledStreamUtil.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); report("Created stream", si.getConfiguration()); CountDownLatch latch = new CountDownLatch(4); @@ -60,32 +76,29 @@ public static void main(String[] args) { latch.countDown(); }, false); - Message m = new ScheduledMessageBuilder() + report("SCHEDULE-NOW (publishing)"); + new ScheduledMessageBuilder() .scheduleSubject(SCHEDULE_PREFIX + "now") .targetSubject(TARGET_PREFIX + "now") .scheduleImmediate() .data("Schedule-Now") - .build(); - report("SCHEDULE-NOW (sending)", m); - js.publish(m); + .scheduleMessage(js); - m = new ScheduledMessageBuilder() + report("SCHEDULE-AT (publishing)"); + new ScheduledMessageBuilder() .scheduleSubject(SCHEDULE_PREFIX + "at") .targetSubject(TARGET_PREFIX + "at") .scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5)) .data("Scheduled-At") - .build(); - report("SCHEDULE-AT (sending)", m); - js.publish(m); + .scheduleMessage(js); - m = new ScheduledMessageBuilder() + report("SCHEDULE-EVERY (publishing)"); + new ScheduledMessageBuilder() .scheduleSubject(SCHEDULE_PREFIX + "at") .targetSubject(TARGET_PREFIX + "at") .scheduleEvery(1, TimeUnit.SECONDS) .data("Every Second") - .build(); - report("SCHEDULE-EVERY (sending)", m); - js.publish(m); + .scheduleMessage(js); latch.await(); } diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java new file mode 100644 index 0000000..15ebd84 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasicsAlternate.java @@ -0,0 +1,114 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamInfo; +import io.nats.client.support.DateTimeUtils; +import io.synadia.sm.ScheduleManagement; +import io.synadia.sm.ScheduledMessageBuilder; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static io.synadia.examples.ScheduleUtils.report; + +/** + * Example: same scenario as {@link ScheduleBasics}, but built using + * {@link io.synadia.sm.ScheduledMessageBuilder#build()} and then published + * via {@link io.nats.client.JetStream#publish(io.nats.client.Message)}. + */ +public class ScheduleBasicsAlternate { + + /** Stream name used by this example. */ + public static final String STREAM = "schedules-enabled"; + + /** Prefix for all schedule subjects in this example. */ + public static final String SCHEDULE_PREFIX = "schedule."; + + /** Prefix for all target subjects in this example. */ + public static final String TARGET_PREFIX = "target."; + + private static final String SCHEDULES = SCHEDULE_PREFIX + ">"; + private static final String TARGETS = TARGET_PREFIX + "*"; + + /** Subject patterns the example stream accepts. */ + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS}; + + private ScheduleBasicsAlternate() {} + + /** + * Example entry point. + * @param args ignored + */ + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + // Use the utility to properly create a schedulable stream + StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + report("Created stream", si.getConfiguration()); + + CountDownLatch latch = new CountDownLatch(4); + Dispatcher d = connection.createDispatcher(); + + // subscribe to the subject that receives the schedule message + js.subscribe(SCHEDULES, d, m -> { + report("SCHEDULED (received)", m); + m.ack(); + }, false); + + // subscribe to the target subject + js.subscribe(TARGETS, d, m -> { + report("TARGETED (received)", m); + m.ack(); + latch.countDown(); + }, false); + + Message m = new ScheduledMessageBuilder() + .scheduleSubject(SCHEDULE_PREFIX + "now") + .targetSubject(TARGET_PREFIX + "now") + .scheduleImmediate() + .data("Schedule-Now") + .build(); + report("SCHEDULE-NOW (publishing)", m); + js.publish(m); + + m = new ScheduledMessageBuilder() + .scheduleSubject(SCHEDULE_PREFIX + "at") + .targetSubject(TARGET_PREFIX + "at") + .scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5)) + .data("Scheduled-At") + .build(); + report("SCHEDULE-AT (publishing)", m); + js.publish(m); + + m = new ScheduledMessageBuilder() + .scheduleSubject(SCHEDULE_PREFIX + "at") + .targetSubject(TARGET_PREFIX + "at") + .scheduleEvery(1, TimeUnit.SECONDS) + .data("Every Second") + .build(); + report("SCHEDULE-EVERY (publishing)", m); + js.publish(m); + + latch.await(); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java index 0627381..748a25e 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java @@ -8,23 +8,37 @@ import io.nats.client.api.StreamInfo; import io.nats.client.impl.Headers; import io.nats.client.impl.NatsMessage; +import io.synadia.sm.ScheduleManagement; import io.synadia.sm.ScheduledMessageBuilder; -import io.synadia.sm.ScheduledStreamUtil; import java.time.Duration; import java.util.concurrent.CountDownLatch; import static io.synadia.examples.ScheduleUtils.report; +/** + * Example: schedule a message whose body and headers are taken from the last + * message published on a separate source subject (the + * {@code Nats-Schedule-Source} feature in ADR-51). + */ public class ScheduleFromSource { + + /** Stream name used by this example. */ public static final String STREAM = "schedules-enabled"; private static final String SCHEDULES = "schedules"; private static final String TARGET = "target"; private static final String SOURCE = "source"; + /** Subject patterns the example stream accepts. */ public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGET, SOURCE}; + private ScheduleFromSource() {} + + /** + * Example entry point. + * @param args ignored + */ public static void main(String[] args) { try { Options options = new Options.Builder() @@ -40,7 +54,7 @@ public static void main(String[] args) { try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} // Use the utility to properly create a schedulable stream - StreamInfo si = ScheduledStreamUtil.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); report("Created stream", si.getConfiguration()); CountDownLatch latch1 = new CountDownLatch(1); @@ -72,7 +86,7 @@ public static void main(String[] args) { Headers sourceHeaders = new Headers(); sourceHeaders.put("foo1", "bar1"); Message sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes()); - report("SOURCE 1 (sending)", sourceMessage); + report("SOURCE 1 (publishing)", sourceMessage); js.publish(sourceMessage); connection.flush(Duration.ofSeconds(1)); @@ -82,7 +96,7 @@ public static void main(String[] args) { .scheduleImmediate() .sources(SOURCE) .build(); - report("SCHEDULE 1 (sending)", scheduleMessage); + report("SCHEDULE 1 (publishing)", scheduleMessage); js.publish(scheduleMessage); latch1.await(); @@ -91,11 +105,11 @@ public static void main(String[] args) { sourceHeaders = new Headers(); sourceHeaders.put("foo2", "bar2"); sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes()); - report("SOURCE 2 (sending)", sourceMessage); + report("SOURCE 2 (publishing)", sourceMessage); js.publish(sourceMessage); connection.flush(Duration.ofSeconds(1)); - report("SCHEDULE 2 (sending)", scheduleMessage); + report("SCHEDULE 2 (publishing)", scheduleMessage); js.publish(scheduleMessage); latch2.await(); diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java index fac2489..1e7bc4b 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java @@ -6,8 +6,19 @@ import io.nats.client.Message; import io.nats.client.impl.Headers; +/** + * Small console-logging helpers shared by the example apps. + */ public class ScheduleUtils { + private ScheduleUtils() {} + + /** + * Print a pipe-separated, timestamped line to {@code System.out}. Each object is + * rendered with {@link Object#toString()}, except {@link Message}, which is rendered + * via {@link #toString(Message)}. + * @param objects the values to render + */ public static void report(Object... objects) { StringBuilder sb = new StringBuilder(); boolean first = true; @@ -28,6 +39,12 @@ public static void report(Object... objects) { System.out.println("[" + System.currentTimeMillis() + "] " + sb); } + /** + * Format a {@link Message} as a short multi-line string showing the subject, data + * (when present), and headers (when any). + * @param msg the message to format + * @return a human-readable representation + */ public static String toString(Message msg) { StringBuilder sb = new StringBuilder(System.lineSeparator()) .append(" Subject: ").append(msg.getSubject()); diff --git a/schedule-message/src/main/java/io/synadia/sm/PredefinedSchedules.java b/schedule-message/src/main/java/io/synadia/sm/PredefinedSchedules.java index 83f77fc..3bae59d 100644 --- a/schedule-message/src/main/java/io/synadia/sm/PredefinedSchedules.java +++ b/schedule-message/src/main/java/io/synadia/sm/PredefinedSchedules.java @@ -3,6 +3,11 @@ package io.synadia.sm; +/** + * Predefined cron-like schedule shortcuts supported by NATS message schedules + * (per ADR-51). Pass one to + * {@link ScheduledMessageBuilder#schedule(PredefinedSchedules)}. + */ public enum PredefinedSchedules { /** * Run once a year, midnight, Jan. 1st. Same as Yearly. Equivalent to cron string 0 0 0 1 1 * diff --git a/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java b/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java new file mode 100644 index 0000000..4f01ab9 --- /dev/null +++ b/schedule-message/src/main/java/io/synadia/sm/ScheduleManagement.java @@ -0,0 +1,326 @@ +package io.synadia.sm; + +import io.nats.client.*; +import io.nats.client.api.*; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; + +import java.io.IOException; +import java.util.List; + +import static io.nats.client.support.NatsJetStreamConstants.*; +import static io.nats.client.support.Validator.notPrintableOrHasWildGt; + +/** + * Helper utilities for stopping NATS message schedules early, per + * ADR-51 + * (section Ending/stopping schedules early), plus a couple of convenience helpers for + * creating schedule-capable streams. + *

+ * The class exposes two families of operations: + *

    + *
  • Basic stop — remove the schedule message from its stream so it can no + * longer fire. {@link #cancelSchedule(JetStreamManagement, String, long)} deletes by + * stream sequence; the subject-based overloads look the sequence up first.
  • + *
  • Atomic publish-and-stop — publish a message to a different subject and stop + * the schedule as a single atomic step, optionally guarded by an existence check on + * the schedule message. See + * {@link #publishAndCancelSchedule(JetStreamManagement, String, String, byte[], Headers, boolean)} + * and {@link #publishAndCancelSchedule(JetStreamManagement, String, long, String, byte[], Headers)}.
  • + *
+ * Per the ADR the publish subject of the atomic variants must not equal the schedule + * subject; the server rejects such publishes with error code {@code 10212}. + *

+ * All methods are static; the class is {@code abstract} purely to prevent instantiation. + */ +@NullMarked +public abstract class ScheduleManagement { + + /** Utility class — not intended to be instantiated. */ + private ScheduleManagement() {} + + /** + * Outcome of a {@code cancelSchedule(...)} call. + */ + public enum Result { + /** The schedule message was found and successfully deleted. */ + SUCCESS, + /** The server-side delete returned {@code false}. */ + FAILURE, + /** No schedule message was found for the given subject / sequence. */ + NOT_FOUND + } + + /** + * Add a new stream with message scheduling enabled. + * Both {@code AllowMsgSchedules} and {@code AllowMsgTTL} are set on the stream — the + * latter is required for the {@code Nats-Schedule-TTL} header to take effect on + * messages produced by schedules. + * + * @param jsm the JetStream management context + * @param streamName the stream name + * @param storageType the storage type ({@code File} or {@code Memory}) + * @param subjects the subjects the stream will accept; must cover both the + * schedule subjects and any target subjects schedules publish to + * @return the created {@link StreamInfo} + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static StreamInfo createSchedulableStream(JetStreamManagement jsm, String streamName, StorageType storageType, String... subjects) throws JetStreamApiException, IOException { + StreamConfiguration sc = StreamConfiguration.builder() + .name(streamName) + .storageType(storageType) + .subjects(subjects) + .allowMessageSchedules() + .allowMessageTtl() + .build(); + return jsm.addStream(sc); + } + + /** + * Add a new stream with message scheduling enabled, derived from an existing + * {@link StreamConfiguration}. The supplied configuration is copied and + * {@code AllowMsgSchedules} / {@code AllowMsgTTL} are turned on; all other settings + * are preserved. + * + * @param jsm the JetStream management context + * @param startingStreamConfig the base configuration to copy from + * @return the created {@link StreamInfo} + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static StreamInfo createSchedulableStream(JetStreamManagement jsm, StreamConfiguration startingStreamConfig) throws JetStreamApiException, IOException { + StreamConfiguration sc = StreamConfiguration.builder(startingStreamConfig) + .allowMessageSchedules() + .allowMessageTtl() + .build(); + return jsm.addStream(sc); + } + + /** + * Stop a schedule by deleting its message at a specific stream sequence (ADR-51 + * mechanism: delete by stream sequence). The schedule stops firing as soon as + * its message is removed. + * + * @param jsm the JetStream management context + * @param stream the stream that holds the schedule message + * @param scheduleStreamSequence the stream sequence of the schedule message + * @return {@link Result#SUCCESS} on a successful delete, {@link Result#FAILURE} if + * the server reported the delete as unsuccessful, or {@link Result#NOT_FOUND} if + * no message exists at that sequence (server error {@code 10043}). Any other + * server error is rethrown. + * @throws JetStreamApiException if the server returned an error other than + * "message not found" + * @throws IOException if the request could not be sent + */ + public static Result cancelSchedule(JetStreamManagement jsm, String stream, long scheduleStreamSequence) throws JetStreamApiException, IOException { + try { + return jsm.deleteMessage(stream, scheduleStreamSequence) ? Result.SUCCESS : Result.FAILURE; + } + catch (JetStreamApiException e) { + if (e.getApiErrorCode() == 10043) { + return Result.NOT_FOUND; + } + throw e; + } + } + + /** + * Convenience overload that locates the stream that owns the schedule subject before + * delegating to {@link #cancelSchedule(JetStreamManagement, String, String)}. + *

+ * The lookup is strict: it fails if zero streams match the subject, and refuses to + * pick one when more than one stream matches. Pass the stream name explicitly to the + * three-argument overload if you need to disambiguate. + * + * @param jsm the JetStream management context + * @param scheduleSubject the schedule subject + * @return see {@link #cancelSchedule(JetStreamManagement, String, long)} + * @throws IllegalStateException if no stream — or more than one — covers the subject + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static Result cancelSchedule(JetStreamManagement jsm, String scheduleSubject) throws JetStreamApiException, IOException { + return cancelSchedule(jsm, scheduleSubject, findStream(jsm, scheduleSubject)); + } + + /** + * Stop a schedule identified by its subject in the given stream. Looks up the last + * message on the subject, verifies it is a schedule message (has the + * {@code Nats-Schedule} header), and deletes it by its stream sequence. + * + * @param jsm the JetStream management context + * @param scheduleSubject the exact schedule subject (wildcards are not supported) + * @param scheduleStream the name of the stream that holds the schedule message + * @return {@link Result#NOT_FOUND} if no schedule message exists on the subject; + * otherwise the result of the underlying sequence-based delete + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static Result cancelSchedule(JetStreamManagement jsm, String scheduleSubject, String scheduleStream) throws JetStreamApiException, IOException { + if (notPrintableOrHasWildGt(scheduleSubject)) { + // this is a wildcard subject so we must use purge + PurgeResponse response = jsm.purgeStream(scheduleStream, PurgeOptions.builder().subject(scheduleSubject).build()); + if (response.isSuccess()) { + return response.getPurged() > 0 ? Result.SUCCESS : Result.NOT_FOUND; + } + return Result.FAILURE; + } + + long seq = getScheduleSequence(jsm, scheduleStream, scheduleSubject); + if (seq == -1) { + return Result.NOT_FOUND; + } + return cancelSchedule(jsm, scheduleStream, seq); + } + + /** + * Atomically publish a message and stop the named schedule, per ADR-51's + * atomic stop mechanism. The published message carries: + *

    + *
  • {@code Nats-Scheduler}: {@code scheduleSubject}
  • + *
  • {@code Nats-Schedule-Next}: {@code purge}
  • + *
+ * The {@code targetSubject} must not equal {@code scheduleSubject}. This constraint + * is enforced by the server, not by this method, so passing equal subjects surfaces + * as a {@link JetStreamApiException} with error code {@code 10212} from the publish + * call. + * + * @param jsm the JetStream management context (its + * {@code jetStream()} context is used to publish) + * @param scheduleSubject the schedule subject to stop + * @param targetSubject the subject to publish to; this may be the + * original schedule's target subject (to publish + * early) or any other subject. Must not equal + * {@code scheduleSubject} — see above + * @param data the message body; may be {@code null} + * @param userHeaders extra headers to include on the published + * message; may be {@code null}. The + * {@code Nats-Scheduler} and + * {@code Nats-Schedule-Next} headers are always + * set by this method and override any conflicting + * keys from {@code userHeaders} + * @param publishOnlyIfScheduleExists when {@code true}, the publish is sent with an + * expected-last-subject-sequence guard so it + * only succeeds if the schedule message is still + * present; when {@code false}, the publish is + * sent unconditionally + * @return the {@link PublishAck} from the server, or {@code null} when + * {@code publishOnlyIfScheduleExists} is {@code true} and the schedule for the + * subject could not be located. The lookup requires exactly one matching + * stream; if zero or more than one stream covers {@code scheduleSubject}, the + * method returns {@code null} without publishing + * @throws JetStreamApiException if the server returned an error + * @throws IOException if the request could not be sent + */ + public static @Nullable PublishAck publishAndCancelSchedule(JetStreamManagement jsm, String scheduleSubject, String targetSubject, + byte @Nullable[] data, @Nullable Headers userHeaders, boolean publishOnlyIfScheduleExists) throws JetStreamApiException, IOException { + if (publishOnlyIfScheduleExists) { + String streamName = findStreamLenient(jsm, scheduleSubject); + if (streamName != null) { + long seq = getScheduleSequence(jsm, streamName, scheduleSubject); + if (seq != -1) { + return publishAndCancelSchedule(jsm, scheduleSubject, seq, targetSubject, data, userHeaders); + } + } + return null; + } + + Headers h = makeHeaders(scheduleSubject, userHeaders); + return jsm.jetStream().publish(targetSubject, h, data); + } + + /** + * Atomic publish-and-stop guarded by an explicit existence check. Same headers as + * the simpler overload, but additionally sets: + *
    + *
  • {@code Nats-Expected-Last-Subject-Sequence}: {@code scheduleStreamSequence}
  • + *
  • {@code Nats-Expected-Last-Subject-Sequence-Subject}: {@code scheduleSubject}
  • + *
+ * The publish — and therefore the stop — only succeeds if the schedule message is + * still present at the given sequence on its subject. Useful for stopping a schedule + * and publishing in one atomic step without risk of duplicating the message if the + * schedule fires concurrently. + * + * @param jsm the JetStream management context + * @param scheduleSubject the schedule subject to stop + * @param scheduleStreamSequence the expected stream sequence of the schedule message + * on {@code scheduleSubject} + * @param targetSubject the subject to publish to. Must not equal + * {@code scheduleSubject}; the server enforces this + * constraint and rejects with error code {@code 10212} + * if the two are equal + * @param data the message body; may be {@code null} + * @param userHeaders extra headers to include on the published message; + * may be {@code null}. The {@code Nats-Scheduler} and + * {@code Nats-Schedule-Next} headers are always set by + * this method and override any conflicting keys from + * {@code userHeaders} + * @return the {@link PublishAck} from the server + * @throws JetStreamApiException if the precondition fails, the server returned error + * {@code 10212} (target subject equals schedule subject), or any other server + * error occurred + * @throws IOException if the request could not be sent + */ + public static PublishAck publishAndCancelSchedule(JetStreamManagement jsm, String scheduleSubject, long scheduleStreamSequence, String targetSubject, + byte @Nullable[] data, @Nullable Headers userHeaders) throws JetStreamApiException, IOException { + Headers h = makeHeaders(scheduleSubject, userHeaders); + PublishOptions opts = PublishOptions.builder() + .expectedLastSubjectSequenceSubject(scheduleSubject) + .expectedLastSubjectSequence(scheduleStreamSequence) + .build(); + Message m = new NatsMessage(targetSubject, null, h, data); + return jsm.jetStream().publish(m , opts); + } + + private static Headers makeHeaders(String scheduleSubject, @Nullable Headers userHeaders) { + Headers h = new Headers(); + if (userHeaders != null) { + for (String key : userHeaders.keySet()) { + h.put(key, userHeaders.get(key)); + } + } + h.put(NATS_SCHEDULE_NEXT_HDR, "purge"); + h.put(NATS_SCHEDULER_HDR, scheduleSubject); + return h; + } + + private static @Nullable String findStreamLenient(JetStreamManagement jsm, String scheduleSubject) throws JetStreamApiException, IOException { + List streams = jsm.getStreamNames(scheduleSubject); + if (streams == null || streams.size() != 1) { + return null; + } + return streams.get(0); + } + + private static String findStream(JetStreamManagement jsm, String scheduleSubject) throws JetStreamApiException, IOException { + List streams = jsm.getStreamNames(scheduleSubject); + if (streams == null || streams.isEmpty()) { + throw new IllegalStateException("No stream found for subject [" + scheduleSubject + "]"); + } + if (streams.size() != 1) { + throw new IllegalStateException("Subject matches more than 1 stream [" + scheduleSubject + "]"); + } + return streams.get(0); + } + + private static long getScheduleSequence(JetStreamManagement jsm, String streamName, String scheduleSubject) throws IOException, JetStreamApiException { + try { + MessageInfo mi = jsm.getLastMessage(streamName, scheduleSubject); + if (mi != null) { + Headers headers = mi.getHeaders(); + if (headers != null && headers.containsKey(NATS_SCHEDULE_HDR)) { + return mi.getSeq(); + } + } + } + catch (JetStreamApiException e) { + if (e.getApiErrorCode() != 10037) { + throw e; + } + } + return -1; + } +} diff --git a/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java b/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java index 4725560..feda85f 100644 --- a/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java +++ b/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java @@ -1,10 +1,13 @@ -// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. // See LICENSE and NOTICE file for details. package io.synadia.sm; +import io.nats.client.JetStream; +import io.nats.client.JetStreamApiException; import io.nats.client.Message; import io.nats.client.MessageTtl; +import io.nats.client.api.PublishAck; import io.nats.client.impl.Headers; import io.nats.client.impl.NatsMessage; import io.nats.client.support.DateTimeUtils; @@ -12,6 +15,7 @@ import io.nats.client.support.Validator; import org.jspecify.annotations.NonNull; +import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -22,11 +26,29 @@ import java.util.concurrent.TimeUnit; /** - * Class to make a message that can be published to a stream that allows message scheduling + * Builder for constructing a NATS JetStream {@link Message} that carries a schedule per ADR-51. + *

+ * The resulting message is published to a schedule subject on a stream that supports message + * scheduling. The message itself does not get delivered to subscribers directly; instead the + * server interprets the {@code Nats-Schedule-*} headers and produces published messages on the + * configured target subject according to the schedule. + *

+ * Three scheduling modes are supported: + *

    + *
  • {@code @at} - a one-shot schedule at a specific point in time + * (see {@link #scheduleAt}, {@link #scheduleIn}, {@link #scheduleImmediate}).
  • + *
  • {@code @every} - a fixed interval, with a minimum supported interval of 1 second + * (see {@link #scheduleEvery}).
  • + *
  • Cron / predefined - a standard cron expression or one of the predefined entries + * such as {@code @hourly}, {@code @daily}, etc. + * (see {@link #scheduleCron}, {@link #schedule(PredefinedSchedules)}).
  • + *
*/ public class ScheduledMessageBuilder { + /** Number of nanoseconds in one second. */ public static final long NANOS_PER_SECOND = 1_000_000_000L; + private String scheduleString; private String timezone; private String scheduleSubject; @@ -34,8 +56,14 @@ public class ScheduledMessageBuilder { private Headers headers; private byte[] data; private MessageTtl messageTtl; + private boolean rollup; private final List sources = new ArrayList<>(); + /** + * Construct an empty builder. Configure the schedule and target subjects, the + * schedule (one of the {@code schedule*} methods), and any data / headers before + * calling {@link #build()} or {@link #scheduleMessage(JetStream)}. + */ public ScheduledMessageBuilder() {} /** @@ -105,6 +133,7 @@ public ScheduledMessageBuilder headers(Headers headers) { /** * Copy the subject, data and headers from an existing message * @param message the message + * @return the builder */ public ScheduledMessageBuilder copy(Message message) { scheduleSubject(message.getSubject()); @@ -125,7 +154,8 @@ public ScheduledMessageBuilder scheduleIn(Duration fromNow) { /** * Schedule for an amount of time from now. * This is not absolute since it takes time to build and send the message. - * @param fromNow how long from now to schedule + * @param fromNow how long from now to schedule, expressed in {@code timeUnit}s + * @param timeUnit the unit for {@code fromNow} * @return a ScheduledMessageBuilder object */ public ScheduledMessageBuilder scheduleIn(long fromNow, TimeUnit timeUnit) { @@ -228,11 +258,27 @@ public ScheduledMessageBuilder scheduleCron(String cron, String timezone) { return this; } + /** + * Set the {@code Nats-Schedule-TTL} header. This TTL is applied to each message + * the schedule publishes to the target subject (when the stream supports per-message TTLs); + * it is not a TTL on the schedule itself. + * @param messageTtl the per-published-message TTL + * @return the builder + */ public ScheduledMessageBuilder messageTtl(MessageTtl messageTtl) { this.messageTtl = messageTtl; return this; } + /** + * Set the {@code Nats-Schedule-Source} header. When set, the schedule reads the last + * message on the source subject and publishes it to the target subject; if no message + * exists on the source subject, the schedule's own body and headers are used as a + * fallback. Wildcards are not supported. Per ADR-51 conventions the header supports + * a list of source subjects. + * @param sources the source subjects + * @return the builder + */ public ScheduledMessageBuilder sources(List sources) { this.sources.clear(); if (sources != null) { @@ -241,6 +287,15 @@ public ScheduledMessageBuilder sources(List sources) { return this; } + /** + * Set the {@code Nats-Schedule-Source} header. When set, the schedule reads the last + * message on the source subject and publishes it to the target subject; if no message + * exists on the source subject, the schedule's own body and headers are used as a + * fallback. Wildcards are not supported. Per ADR-51 conventions the header supports + * a list of source subjects. + * @param sources the source subjects + * @return the builder + */ public ScheduledMessageBuilder sources(String... sources) { this.sources.clear(); if (sources != null) { @@ -249,6 +304,40 @@ public ScheduledMessageBuilder sources(String... sources) { return this; } + /** + * Set the {@code Nats-Schedule-Rollup} header to {@code sub}, which is the only + * valid value per ADR-51. This causes published messages to roll up the target subject. + * @return the builder + */ + public ScheduledMessageBuilder rollup() { + rollup = true; + return this; + } + + /** + * Build the scheduled message and publish it to JetStream. + * @param js the JetStream context used to publish + * @return the sequence number of the stored schedule message from the {@link PublishAck} + * @throws JetStreamApiException if the server returns an error + * @throws IOException if there is a communication problem with the server + */ + public long scheduleMessage(JetStream js) throws JetStreamApiException, IOException { + return js.publish(build()).getSeqno(); + } + + /** + * Build the fully constructed message ready to be published to the schedule subject. + *

+ * Validates that {@code scheduleSubject} and {@code targetSubject} are both supplied + * and printable without {@code *} or {@code >} wildcards, and that a schedule string + * has been set via one of the {@code scheduleXxx}/{@code schedule} methods. + *

+ * Sets the following headers as applicable: + * {@code Nats-Schedule}, {@code Nats-Schedule-Target}, + * {@code Nats-Schedule-TTL}, {@code Nats-Schedule-Time-Zone}, + * {@code Nats-Schedule-Source}, {@code Nats-Schedule-Rollup}. + * @return the constructed {@link Message} + */ public Message build() { Validator.required(scheduleSubject, "Publish Subject is required."); Validator.required(targetSubject, "Target Subject is required."); @@ -274,6 +363,9 @@ public Message build() { if (sources.size() > 0) { headers.put(NatsJetStreamConstants.NATS_SCHEDULE_SOURCE_HDR, sources); } + if (rollup) { + headers.put(NatsJetStreamConstants.NATS_SCHEDULE_ROLLUP_HDR, "sub"); + } return NatsMessage.builder() .subject(scheduleSubject) @@ -282,6 +374,12 @@ public Message build() { .build(); } + /** + * Format a {@link Duration} as a Go {@code time.ParseDuration()} string + * (for example {@code "1h30m5s"}). Used to render {@code @every} interval values. + * @param duration the duration to format + * @return the Go-formatted duration string + */ public static String toGoDuration(Duration duration) { long left = duration.toNanos(); long nanos = left % 1_000_000L; @@ -303,6 +401,13 @@ public static String toGoDuration(Duration duration) { return sb.toString(); } + /** + * Validate that a Go {@code time.ParseDuration()} formatted string represents a + * duration of at least one second, matching ADR-51's minimum supported interval + * rule for {@code @every} schedules. + * @param s the Go-formatted duration string + * @return {@code true} if the string parses successfully and is at least 1 second + */ public static boolean isAtLeastOneSecond(@NonNull String s) { long totalNanos = 0; int i = 0; diff --git a/schedule-message/src/main/java/io/synadia/sm/ScheduledStreamUtil.java b/schedule-message/src/main/java/io/synadia/sm/ScheduledStreamUtil.java deleted file mode 100644 index 26147c9..0000000 --- a/schedule-message/src/main/java/io/synadia/sm/ScheduledStreamUtil.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.synadia.sm; - -import io.nats.client.JetStreamApiException; -import io.nats.client.JetStreamManagement; -import io.nats.client.api.StorageType; -import io.nats.client.api.StreamConfiguration; -import io.nats.client.api.StreamInfo; - -import java.io.IOException; - -/** - * Class to make setting a per message ttl easier. - */ -public abstract class ScheduledStreamUtil { - - public static StreamInfo createSchedulableStream(JetStreamManagement jsm, String streamName, StorageType storageType, String... subjects) throws JetStreamApiException, IOException { - StreamConfiguration sc = StreamConfiguration.builder() - .name(streamName) - .storageType(storageType) - .subjects(subjects) - .allowMessageSchedules() - .allowMessageTtl() - .build(); - return jsm.addStream(sc); - } - - public static StreamInfo createSchedulableStream(JetStreamManagement jsm, StreamConfiguration startingStreamConfig) throws JetStreamApiException, IOException { - StreamConfiguration sc = StreamConfiguration.builder(startingStreamConfig) - .allowMessageSchedules() - .allowMessageTtl() - .build(); - return jsm.addStream(sc); - } -} diff --git a/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java b/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java new file mode 100644 index 0000000..561f1c1 --- /dev/null +++ b/schedule-message/src/test/java/io/synadia/sm/ScheduleManagementTests.java @@ -0,0 +1,311 @@ +// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.sm; + +import io.nats.NatsRunnerUtils; +import io.nats.NatsServerRunner; +import io.nats.client.*; +import io.nats.client.api.MessageInfo; +import io.nats.client.api.PublishAck; +import io.nats.client.api.StorageType; +import io.nats.client.impl.Headers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.logging.Level; + +import static io.nats.client.support.NatsJetStreamConstants.*; +import static org.junit.jupiter.api.Assertions.*; + +public class ScheduleManagementTests { + + static NatsServerRunner runner; + static Connection nc; + static JetStreamManagement jsm; + static JetStream js; + + @BeforeAll + public static void beforeAll() throws Exception { + NatsRunnerUtils.setDefaultOutputLevel(Level.WARNING); + runner = new NatsServerRunner(false, true); + Options options = Options.builder() + .server(runner.getNatsLocalhostUri()) + .errorListener(new ErrorListener() {}) + .build(); + nc = Nats.connect(options); + jsm = nc.jetStreamManagement(); + js = nc.jetStream(); + } + + @AfterAll + public static void afterAll() throws Exception { + if (nc != null) nc.close(); + if (runner != null) runner.close(); + } + + // -- helpers --------------------------------------------------------------- + + /** Holder for a freshly created stream and the schedule / target subject prefixes scoped to it. */ + private static class Fixture { + final String stream; + final String schedPrefix; // e.g. "sched." + final String tgtPrefix; // e.g. "tgt." + Fixture(String stream, String schedPrefix, String tgtPrefix) { + this.stream = stream; + this.schedPrefix = schedPrefix; + this.tgtPrefix = tgtPrefix; + } + String sched(String leaf) { return schedPrefix + "." + leaf; } + String tgt(String leaf) { return tgtPrefix + "." + leaf; } + } + + /** Create a schedulable stream with unique schedule and target subject patterns. */ + private static Fixture newFixture() throws Exception { + String id = NUID.nextGlobalSequence(); + String stream = "stream_" + id; + String schedPrefix = "sched_" + id; + String tgtPrefix = "tgt_" + id; + ScheduleManagement.createSchedulableStream( + jsm, stream, StorageType.Memory, + schedPrefix + ".>", + tgtPrefix + ".>"); + return new Fixture(stream, schedPrefix, tgtPrefix); + } + + /** + * Schedule a single delayed message far enough in the future that it cannot fire + * during the test. Returns the stream sequence the schedule message landed at. + */ + private static long scheduleInTheFuture(String schedSubject, String targetSubject, String data) throws Exception { + return new ScheduledMessageBuilder() + .scheduleSubject(schedSubject) + .targetSubject(targetSubject) + .scheduleIn(Duration.ofHours(1)) + .data(data) + .scheduleMessage(js); + } + + /** True iff a schedule message still exists on the subject (carries the Nats-Schedule header). */ + private static boolean scheduleExists(String streamName, String schedSubject) throws Exception { + try { + MessageInfo mi = jsm.getLastMessage(streamName, schedSubject); + return mi != null + && mi.getHeaders() != null + && mi.getHeaders().containsKey(NATS_SCHEDULE_HDR); + } + catch (JetStreamApiException e) { + if (e.getApiErrorCode() == 10037) { + return false; + } + throw e; + } + } + + // -- cancelSchedule(jsm, stream, scheduleStreamSequence) ------------------- + + @Test + public void testCancelBySequence() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("a"); + long seq = scheduleInTheFuture(schedSubject, f.tgt("a"), "body"); + + assertTrue(scheduleExists(f.stream, schedSubject)); + assertEquals(ScheduleManagement.Result.SUCCESS, ScheduleManagement.cancelSchedule(jsm, f.stream, seq)); + assertFalse(scheduleExists(f.stream, schedSubject)); + + assertEquals(ScheduleManagement.Result.NOT_FOUND, + ScheduleManagement.cancelSchedule(jsm, f.stream, 99_999L)); + } + + // -- cancelSchedule(jsm, scheduleSubject) ---------------------------------- + + @Test + public void testCancelBySubject_success() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("b"); + scheduleInTheFuture(schedSubject, f.tgt("b"), "body"); + + assertTrue(scheduleExists(f.stream, schedSubject)); + assertEquals(ScheduleManagement.Result.SUCCESS, + ScheduleManagement.cancelSchedule(jsm, schedSubject)); + assertFalse(scheduleExists(f.stream, schedSubject)); + } + + @Test + public void testCancelBySubject_noStreamThrows() { + String orphan = "no_such_subject_" + NUID.nextGlobalSequence(); + assertThrows(IllegalStateException.class, + () -> ScheduleManagement.cancelSchedule(jsm, orphan)); + } + + // -- cancelSchedule(jsm, scheduleSubject, scheduleStream) ------------------ + + @Test + public void testCancelBySubjectInStream_exact_subject() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("c"); + scheduleInTheFuture(schedSubject, f.tgt("c"), "body"); + + assertTrue(scheduleExists(f.stream, schedSubject)); + assertEquals(ScheduleManagement.Result.SUCCESS, + ScheduleManagement.cancelSchedule(jsm, schedSubject, f.stream)); + assertFalse(scheduleExists(f.stream, schedSubject)); + + assertEquals(ScheduleManagement.Result.NOT_FOUND, + ScheduleManagement.cancelSchedule(jsm, f.sched("nope"), f.stream)); + } + + @Test + public void testCancelBySubjectInStream_wildcard_purgesAll() throws Exception { + Fixture f = newFixture(); + String s1 = f.sched("w1"); + String s2 = f.sched("w2"); + String s3 = f.sched("w3"); + scheduleInTheFuture(s1, f.tgt("w1"), "1"); + scheduleInTheFuture(s2, f.tgt("w2"), "2"); + scheduleInTheFuture(s3, f.tgt("w3"), "3"); + + assertEquals(ScheduleManagement.Result.SUCCESS, + ScheduleManagement.cancelSchedule(jsm, f.schedPrefix + ".*", f.stream)); + assertFalse(scheduleExists(f.stream, s1)); + assertFalse(scheduleExists(f.stream, s2)); + assertFalse(scheduleExists(f.stream, s3)); + } + + @Test + public void testCancelBySubjectInStream_wildcard_noMatches() throws Exception { + Fixture f = newFixture(); + assertEquals(ScheduleManagement.Result.NOT_FOUND, + ScheduleManagement.cancelSchedule(jsm, f.schedPrefix + ".*", f.stream)); + } + + // -- publishAndCancelSchedule(jsm, sched, tgt, data, publishOnlyIfExists) -- + + @Test + public void testPublishAndCancel_unconditional_success() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("p1"); + String tgtSubject = f.tgt("p1"); + scheduleInTheFuture(schedSubject, tgtSubject, "body"); + + PublishAck ack = ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, false); + + assertNotNull(ack); + assertFalse(scheduleExists(f.stream, schedSubject)); + } + + @Test + public void testPublishAndCancel_ifExists_whenPresent() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("p2"); + String tgtSubject = f.tgt("p2"); + scheduleInTheFuture(schedSubject, tgtSubject, "body"); + + PublishAck ack = ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, true); + + assertNotNull(ack); + assertFalse(scheduleExists(f.stream, schedSubject)); + } + + @Test + public void testPublishAndCancel_ifExists_whenMissing() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("p3"); + String tgtSubject = f.tgt("p3"); + + PublishAck ack = ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), null, true); + + assertNull(ack); + } + + // -- publishAndCancelSchedule(jsm, sched, seq, tgt, data) ------------------ + + @Test + public void testPublishAndCancel_withSequence_success() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("p4"); + String tgtSubject = f.tgt("p4"); + long seq = scheduleInTheFuture(schedSubject, tgtSubject, "body"); + + PublishAck ack = ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, seq, tgtSubject, "cancel-now".getBytes(), null); + + assertNotNull(ack); + assertFalse(scheduleExists(f.stream, schedSubject)); + } + + @Test + public void testPublishAndCancel_withSequence_wrongSequenceFails() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("p5"); + String tgtSubject = f.tgt("p5"); + long seq = scheduleInTheFuture(schedSubject, tgtSubject, "body"); + + assertThrows(JetStreamApiException.class, () -> + ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, seq + 999, tgtSubject, "cancel-now".getBytes(), null)); + } + + // -- ADR-51 constraint: targetSubject must not equal scheduleSubject ------- + + /** + * The server rejects an atomic stop publish whose target subject equals the schedule + * subject (ADR-51 error 10212). The client does not pre-check this — it lets the + * server surface the rejection. + */ + @Test + public void testPublishAndCancel_targetEqualsScheduleSubject_serverRejects() throws Exception { + Fixture f = newFixture(); + String sameSubject = f.sched("same"); + scheduleInTheFuture(sameSubject, f.tgt("same"), "body"); + + JetStreamApiException ex = assertThrows(JetStreamApiException.class, () -> + ScheduleManagement.publishAndCancelSchedule( + jsm, sameSubject, sameSubject, "x".getBytes(), null, false)); + assertEquals(10212, ex.getApiErrorCode()); + } + + // -- userHeaders cannot override the required Nats-Scheduler / Nats-Schedule-Next -- + + /** + * If the caller passes the protected headers ({@code Nats-Scheduler} or + * {@code Nats-Schedule-Next}) in {@code userHeaders}, the values set by the method + * must win — otherwise the atomic stop signal would be corrupted and the schedule + * would not be purged. Unrelated user headers must still be carried through to the + * published message. + */ + @Test + public void testPublishAndCancel_userHeadersCannotOverrideRequired() throws Exception { + Fixture f = newFixture(); + String schedSubject = f.sched("hdr"); + String tgtSubject = f.tgt("hdr"); + scheduleInTheFuture(schedSubject, tgtSubject, "body"); + + Headers userHeaders = new Headers(); + userHeaders.put(NATS_SCHEDULE_NEXT_HDR, "not-purge"); + userHeaders.put(NATS_SCHEDULER_HDR, "wrong-subject"); + userHeaders.put("X-Custom", "carry-me"); + + PublishAck ack = ScheduleManagement.publishAndCancelSchedule( + jsm, schedSubject, tgtSubject, "cancel-now".getBytes(), userHeaders, false); + assertNotNull(ack); + + // The schedule was actually cancelled — proves the required headers won. + assertFalse(scheduleExists(f.stream, schedSubject)); + + // The published target message carries the correct required values plus the user header. + MessageInfo tgtMsg = jsm.getLastMessage(f.stream, tgtSubject); + assertNotNull(tgtMsg); + Headers tgtHeaders = tgtMsg.getHeaders(); + assertNotNull(tgtHeaders); + assertEquals("purge", tgtHeaders.getFirst(NATS_SCHEDULE_NEXT_HDR)); + assertEquals(schedSubject, tgtHeaders.getFirst(NATS_SCHEDULER_HDR)); + assertEquals("carry-me", tgtHeaders.getFirst("X-Custom")); + } +}