Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import io.nats.client.*;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.DateTimeUtils;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduledMessageBuilder;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.synadia.examples.ScheduleUtils.report;
import static io.synadia.examples.ScheduleExampleUtils.report;

/**
* Example: build and publish a few scheduled messages using
Expand Down Expand Up @@ -57,50 +56,53 @@ public static void main(String[] args) {
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

// Use the utility to properly create a schedulable stream
StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
report("Created stream", si.getConfiguration());
ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);

CountDownLatch latch = new CountDownLatch(4);
Dispatcher d = connection.createDispatcher();

// subscribe to the subject that receives the schedule message
js.subscribe(SCHEDULES, d, m -> {
report("SCHEDULED (received)", m);
report("MONITORING via '" + SCHEDULES + "'", m);
m.ack();
}, false);

// subscribe to the target subject
js.subscribe(TARGETS, d, m -> {
report("TARGETED (received)", m);
report("TARGETED via '" + TARGETS + "'", m);
m.ack();
latch.countDown();
}, false);

report("SCHEDULE-NOW (publishing)");
report("SCHEDULING " + SCHEDULE_PREFIX + "now");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "now")
.targetSubject(TARGET_PREFIX + "now")
.scheduleImmediate()
.data("Schedule-Now")
.scheduleMessage(js);

report("SCHEDULE-AT (publishing)");
report("SCHEDULING " + SCHEDULE_PREFIX + "at");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
.data("Scheduled-At")
.scheduleMessage(js);

report("SCHEDULE-EVERY (publishing)");
report("SCHEDULING " + SCHEDULE_PREFIX + "every");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleSubject(SCHEDULE_PREFIX + "every")
.targetSubject(TARGET_PREFIX + "every")
.scheduleEvery(1, TimeUnit.SECONDS)
.data("Every Second")
.scheduleMessage(js);

latch.await();

// The "every" schedule keeps firing until it is removed.
report("CANCEL " + SCHEDULE_PREFIX + "every",
ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM));
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.synadia.examples.ScheduleUtils.report;
import static io.synadia.examples.ScheduleExampleUtils.report;

/**
* Example: same scenario as {@link ScheduleBasics}, but built using
* {@link io.synadia.sm.ScheduledMessageBuilder#build()} and then published
* via {@link io.nats.client.JetStream#publish(io.nats.client.Message)}.
* There is really no reason to do this unless you specifically want to log the
* actual message.
*/
public class ScheduleBasicsAlternate {

Expand Down Expand Up @@ -66,13 +68,13 @@ public static void main(String[] args) {

// subscribe to the subject that receives the schedule message
js.subscribe(SCHEDULES, d, m -> {
report("SCHEDULED (received)", m);
report("MONITORING via '" + SCHEDULES + "'", m);
m.ack();
}, false);

// subscribe to the target subject
js.subscribe(TARGETS, d, m -> {
report("TARGETED (received)", m);
report("TARGETED via '" + TARGETS + "'", m);
m.ack();
latch.countDown();
}, false);
Expand All @@ -83,7 +85,7 @@ public static void main(String[] args) {
.scheduleImmediate()
.data("Schedule-Now")
.build();
report("SCHEDULE-NOW (publishing)", m);
report("SCHEDULING " + SCHEDULE_PREFIX + "now", m);
js.publish(m);

m = new ScheduledMessageBuilder()
Expand All @@ -92,19 +94,23 @@ public static void main(String[] args) {
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
.data("Scheduled-At")
.build();
report("SCHEDULE-AT (publishing)", m);
report("SCHEDULING " + SCHEDULE_PREFIX + "at", m);
js.publish(m);

m = new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleSubject(SCHEDULE_PREFIX + "every")
.targetSubject(TARGET_PREFIX + "every")
.scheduleEvery(1, TimeUnit.SECONDS)
.data("Every Second")
.build();
report("SCHEDULE-EVERY (publishing)", m);
report("SCHEDULING " + SCHEDULE_PREFIX + "every", m);
js.publish(m);

latch.await();

// The "every" schedule keeps firing until it is removed.
report("CANCEL " + SCHEDULE_PREFIX + "every",
ScheduleManagement.cancelSchedule(jsm, SCHEDULE_PREFIX + "every", STREAM));
}
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.*;
import io.nats.client.api.StorageType;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduleManagement.Result;
import io.synadia.sm.ScheduledMessageBuilder;

import java.time.Duration;

import static io.synadia.examples.ScheduleExampleUtils.report;

/**
* Example: stop a running schedule with each
* {@link ScheduleManagement#cancelSchedule cancelSchedule} overload.
* <p>
* <ul>
* <li>by stream + sequence — the lowest-level call</li>
* <li>by stream + subject — looks up the sequence on a known stream</li>
* <li>by subject only — the helper locates the stream too</li>
* </ul>
* The example also shows a {@link Result#NOT_FOUND} outcome by cancelling a
* subject that no longer has a schedule.
*/
public class ScheduleCancel {

/** Stream name used by this example. */
public static final String STREAM = "schedules-enabled";

/** Prefix for all schedule subjects in this example. */
public static final String SCHEDULE_PREFIX = "schedule.";

/** Prefix for all target subjects in this example. */
public static final String TARGET_PREFIX = "target.";

private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
private static final String TARGETS = TARGET_PREFIX + "*";

/** Subject patterns the example stream accepts. */
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};

private ScheduleCancel() {}

/**
* Example entry point.
* @param args ignored
*/
public static void main(String[] args) {
try {
Options options = new Options.Builder()
.server("nats://localhost:4222")
.errorListener(new ErrorListener() {})
.build();

try (Connection connection = Nats.connect(options)) {
JetStreamManagement jsm = connection.jetStreamManagement();
JetStream js = connection.jetStream();

// delete the stream in case it existed, just for a fresh example
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);

String bySeqSubject = SCHEDULE_PREFIX + "by-seq";
String bySubjectSubject = SCHEDULE_PREFIX + "by-subject";
String byLookupSubject = SCHEDULE_PREFIX + "by-lookup";

// Schedule each one an hour out so it can't fire while the example runs.
long bySeq = new ScheduledMessageBuilder()
.scheduleSubject(bySeqSubject)
.targetSubject(TARGET_PREFIX + "by-seq")
.scheduleIn(Duration.ofHours(1))
.data("By-Seq")
.scheduleMessage(js);
report("SCHEDULED " + bySeqSubject + " at sequence " + bySeq);

new ScheduledMessageBuilder()
.scheduleSubject(bySubjectSubject)
.targetSubject(TARGET_PREFIX + "by-subject")
.scheduleIn(Duration.ofHours(1))
.data("By-Subject")
.scheduleMessage(js);
report("SCHEDULED " + bySubjectSubject);

new ScheduledMessageBuilder()
.scheduleSubject(byLookupSubject)
.targetSubject(TARGET_PREFIX + "by-lookup")
.scheduleIn(Duration.ofHours(1))
.data("By-Lookup")
.scheduleMessage(js);
report("SCHEDULED " + byLookupSubject);

// 1) Sequence-based cancel.
Result r1 = ScheduleManagement.cancelSchedule(jsm, STREAM, bySeq);
report("CANCEL by sequence", r1);

// 2) Subject + stream cancel.
Result r2 = ScheduleManagement.cancelSchedule(jsm, bySubjectSubject, STREAM);
report("CANCEL by subject + stream", r2);

// 3) Subject-only cancel; the helper locates the stream.
Result r3 = ScheduleManagement.cancelSchedule(jsm, byLookupSubject);
report("CANCEL by subject (auto-find)", r3);

// 4) Cancelling something that isn't there returns NOT_FOUND.
Result r4 = ScheduleManagement.cancelSchedule(jsm, bySeqSubject, STREAM);
report("CANCEL already-cancelled", r4);
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2025-2026 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.*;
import io.nats.client.api.StorageType;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduledMessageBuilder;

import java.util.concurrent.CountDownLatch;

import static io.synadia.examples.ScheduleExampleUtils.report;

/**
* Example: schedule using cron expressions, including the
* {@link io.synadia.sm.ScheduledMessageBuilder#scheduleCron(String, String)}
* variant that takes an IANA time zone.
* <p>
* NATS schedules use a six-field cron form (second minute hour day month
* day-of-week) per ADR-51. The expressions below fire on short intervals
* so the example completes quickly; the time-zone-bound expression behaves
* identically here because it does not pin a time of day, but the call shape
* is the same one you would use for {@code "0 30 9 * * *"} ("9:30 every day
* in New York").
*/
public class ScheduleCron {

/** Stream name used by this example. */
public static final String STREAM = "schedules-enabled";

/** Prefix for all schedule subjects in this example. */
public static final String SCHEDULE_PREFIX = "schedule.";

/** Prefix for all target subjects in this example. */
public static final String TARGET_PREFIX = "target.";

private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
private static final String TARGETS = TARGET_PREFIX + "*";

/** Subject patterns the example stream accepts. */
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};

private ScheduleCron() {}

/**
* Example entry point.
* @param args ignored
*/
public static void main(String[] args) {
try {
Options options = new Options.Builder()
.server("nats://localhost:4222")
.errorListener(new ErrorListener() {})
.build();

try (Connection connection = Nats.connect(options)) {
JetStreamManagement jsm = connection.jetStreamManagement();
JetStream js = connection.jetStream();

// delete the stream in case it existed, just for a fresh example
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);

CountDownLatch latch = new CountDownLatch(4);
Dispatcher d = connection.createDispatcher();

js.subscribe(TARGETS, d, m -> {
report("TARGETED via '" + TARGETS + "'", m);
m.ack();
latch.countDown();
}, false);

String cronSubject = SCHEDULE_PREFIX + "cron";
String cronTzSubject = SCHEDULE_PREFIX + "cron-tz";

// Six-field cron: every two seconds.
report("SCHEDULING " + cronSubject + " with cron '*/2 * * * * *'");
new ScheduledMessageBuilder()
.scheduleSubject(cronSubject)
.targetSubject(TARGET_PREFIX + "cron")
.scheduleCron("*/2 * * * * *")
.data("Cron-Every-2s")
.scheduleMessage(js);

// Same expression, evaluated in a specific IANA time zone.
report("SCHEDULING " + cronTzSubject + " with cron '*/3 * * * * *' (America/New_York)");
new ScheduledMessageBuilder()
.scheduleSubject(cronTzSubject)
.targetSubject(TARGET_PREFIX + "cron-tz")
.scheduleCron("*/3 * * * * *", "America/New_York")
.data("Cron-Every-3s-NY")
.scheduleMessage(js);

latch.await();

// Cron schedules keep firing until they are removed.
report("CANCEL " + cronSubject, ScheduleManagement.cancelSchedule(jsm, cronSubject, STREAM));
report("CANCEL " + cronTzSubject, ScheduleManagement.cancelSchedule(jsm, cronTzSubject, STREAM));
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Loading
Loading