Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion retrier/src/main/java/io/synadia/retrier/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static <T> T execute(RetryConfig config, RetryAction<T> action) throws Ex
* or the observer declines to retry.
*/
public static <T> T execute(RetryConfig config, RetryAction<T> action, RetryObserver observer) throws Exception {
long[] backoffPolicy = config.getBackoffPolicy();;
long[] backoffPolicy = config.getBackoffPolicy();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated housekeeping.

int plen = backoffPolicy.length;
int retries = 0;
long deadlineExpiresAt = System.currentTimeMillis() + config.getDeadline();
Expand Down
2 changes: 1 addition & 1 deletion schedule-message/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ repositories {
}

dependencies {
implementation 'io.nats:jnats:2.25.3-SNAPSHOT'
implementation 'io.nats:jnats:2.25.3'
implementation 'org.jspecify:jspecify:1.0.0'
implementation 'io.synadia:counters:0.2.2'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,41 @@
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.DateTimeUtils;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduledMessageBuilder;
import io.synadia.sm.ScheduledStreamUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.synadia.examples.ScheduleUtils.report;

/**
* Example: build and publish a few scheduled messages using
* {@link io.synadia.sm.ScheduledMessageBuilder#scheduleMessage(io.nats.client.JetStream)}.
*/
public class ScheduleBasics {

/** Stream name used by this example. */
public static final String STREAM = "schedules-enabled";

/** Prefix for all schedule subjects in this example. */
public static final String SCHEDULE_PREFIX = "schedule.";

/** Prefix for all target subjects in this example. */
public static final String TARGET_PREFIX = "target.";

private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
private static final String TARGETS = TARGET_PREFIX + "*";

/** Subject patterns the example stream accepts. */
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};

private ScheduleBasics() {}

/**
* Example entry point.
* @param args ignored
*/
public static void main(String[] args) {
try {
Options options = new Options.Builder()
Expand All @@ -34,14 +50,14 @@ public static void main(String[] args) {
.build();

try (Connection connection = Nats.connect(options)) {
JetStreamManagement jsm = connection.jetStreamManagement();;
JetStreamManagement jsm = connection.jetStreamManagement();
JetStream js = connection.jetStream();

// delete the stream in case it existed, just for a fresh example
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

// Use the utility to properly create a schedulable stream
StreamInfo si = ScheduledStreamUtil.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
report("Created stream", si.getConfiguration());

CountDownLatch latch = new CountDownLatch(4);
Expand All @@ -60,32 +76,29 @@ public static void main(String[] args) {
latch.countDown();
}, false);

Message m = new ScheduledMessageBuilder()
report("SCHEDULE-NOW (publishing)");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "now")
.targetSubject(TARGET_PREFIX + "now")
.scheduleImmediate()
.data("Schedule-Now")
.build();
report("SCHEDULE-NOW (sending)", m);
js.publish(m);
.scheduleMessage(js);

m = new ScheduledMessageBuilder()
report("SCHEDULE-AT (publishing)");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
.data("Scheduled-At")
.build();
report("SCHEDULE-AT (sending)", m);
js.publish(m);
.scheduleMessage(js);

m = new ScheduledMessageBuilder()
report("SCHEDULE-EVERY (publishing)");
new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleEvery(1, TimeUnit.SECONDS)
.data("Every Second")
.build();
report("SCHEDULE-EVERY (sending)", m);
js.publish(m);
.scheduleMessage(js);

latch.await();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.examples;

import io.nats.client.*;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.DateTimeUtils;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduledMessageBuilder;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static io.synadia.examples.ScheduleUtils.report;

/**
* Example: same scenario as {@link ScheduleBasics}, but built using
* {@link io.synadia.sm.ScheduledMessageBuilder#build()} and then published
* via {@link io.nats.client.JetStream#publish(io.nats.client.Message)}.
*/
public class ScheduleBasicsAlternate {

/** Stream name used by this example. */
public static final String STREAM = "schedules-enabled";

/** Prefix for all schedule subjects in this example. */
public static final String SCHEDULE_PREFIX = "schedule.";

/** Prefix for all target subjects in this example. */
public static final String TARGET_PREFIX = "target.";

private static final String SCHEDULES = SCHEDULE_PREFIX + ">";
private static final String TARGETS = TARGET_PREFIX + "*";

/** Subject patterns the example stream accepts. */
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGETS};

private ScheduleBasicsAlternate() {}

/**
* Example entry point.
* @param args ignored
*/
public static void main(String[] args) {
try {
Options options = new Options.Builder()
.server("nats://localhost:4222")
.errorListener(new ErrorListener() {})
.build();

try (Connection connection = Nats.connect(options)) {
JetStreamManagement jsm = connection.jetStreamManagement();
JetStream js = connection.jetStream();

// delete the stream in case it existed, just for a fresh example
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

// Use the utility to properly create a schedulable stream
StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
report("Created stream", si.getConfiguration());

CountDownLatch latch = new CountDownLatch(4);
Dispatcher d = connection.createDispatcher();

// subscribe to the subject that receives the schedule message
js.subscribe(SCHEDULES, d, m -> {
report("SCHEDULED (received)", m);
m.ack();
}, false);

// subscribe to the target subject
js.subscribe(TARGETS, d, m -> {
report("TARGETED (received)", m);
m.ack();
latch.countDown();
}, false);

Message m = new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "now")
.targetSubject(TARGET_PREFIX + "now")
.scheduleImmediate()
.data("Schedule-Now")
.build();
report("SCHEDULE-NOW (publishing)", m);
js.publish(m);

m = new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
.targetSubject(TARGET_PREFIX + "at")
.scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5))
.data("Scheduled-At")
.build();
report("SCHEDULE-AT (publishing)", m);
js.publish(m);

m = new ScheduledMessageBuilder()
.scheduleSubject(SCHEDULE_PREFIX + "at")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this overwrite the above message? Should these be + "every"?

.targetSubject(TARGET_PREFIX + "at")
.scheduleEvery(1, TimeUnit.SECONDS)
.data("Every Second")
.build();
report("SCHEDULE-EVERY (publishing)", m);
js.publish(m);

latch.await();
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,37 @@
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsMessage;
import io.synadia.sm.ScheduleManagement;
import io.synadia.sm.ScheduledMessageBuilder;
import io.synadia.sm.ScheduledStreamUtil;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;

import static io.synadia.examples.ScheduleUtils.report;

/**
* Example: schedule a message whose body and headers are taken from the last
* message published on a separate source subject (the
* {@code Nats-Schedule-Source} feature in ADR-51).
*/
public class ScheduleFromSource {

/** Stream name used by this example. */
public static final String STREAM = "schedules-enabled";

private static final String SCHEDULES = "schedules";
private static final String TARGET = "target";
private static final String SOURCE = "source";

/** Subject patterns the example stream accepts. */
public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGET, SOURCE};

private ScheduleFromSource() {}

/**
* Example entry point.
* @param args ignored
*/
public static void main(String[] args) {
try {
Options options = new Options.Builder()
Expand All @@ -40,7 +54,7 @@ public static void main(String[] args) {
try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

// Use the utility to properly create a schedulable stream
StreamInfo si = ScheduledStreamUtil.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
StreamInfo si = ScheduleManagement.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS);
report("Created stream", si.getConfiguration());

CountDownLatch latch1 = new CountDownLatch(1);
Expand Down Expand Up @@ -72,7 +86,7 @@ public static void main(String[] args) {
Headers sourceHeaders = new Headers();
sourceHeaders.put("foo1", "bar1");
Message sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes());
report("SOURCE 1 (sending)", sourceMessage);
report("SOURCE 1 (publishing)", sourceMessage);
js.publish(sourceMessage);
connection.flush(Duration.ofSeconds(1));

Expand All @@ -82,7 +96,7 @@ public static void main(String[] args) {
.scheduleImmediate()
.sources(SOURCE)
.build();
report("SCHEDULE 1 (sending)", scheduleMessage);
report("SCHEDULE 1 (publishing)", scheduleMessage);
js.publish(scheduleMessage);

latch1.await();
Expand All @@ -91,11 +105,11 @@ public static void main(String[] args) {
sourceHeaders = new Headers();
sourceHeaders.put("foo2", "bar2");
sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes());
report("SOURCE 2 (sending)", sourceMessage);
report("SOURCE 2 (publishing)", sourceMessage);
js.publish(sourceMessage);
connection.flush(Duration.ofSeconds(1));

report("SCHEDULE 2 (sending)", scheduleMessage);
report("SCHEDULE 2 (publishing)", scheduleMessage);
js.publish(scheduleMessage);

latch2.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@
import io.nats.client.Message;
import io.nats.client.impl.Headers;

/**
* Small console-logging helpers shared by the example apps.
*/
public class ScheduleUtils {

private ScheduleUtils() {}

/**
* Print a pipe-separated, timestamped line to {@code System.out}. Each object is
* rendered with {@link Object#toString()}, except {@link Message}, which is rendered
* via {@link #toString(Message)}.
* @param objects the values to render
*/
public static void report(Object... objects) {
StringBuilder sb = new StringBuilder();
boolean first = true;
Expand All @@ -28,6 +39,12 @@ public static void report(Object... objects) {
System.out.println("[" + System.currentTimeMillis() + "] " + sb);
}

/**
* Format a {@link Message} as a short multi-line string showing the subject, data
* (when present), and headers (when any).
* @param msg the message to format
* @return a human-readable representation
*/
public static String toString(Message msg) {
StringBuilder sb = new StringBuilder(System.lineSeparator())
.append(" Subject: ").append(msg.getSubject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

package io.synadia.sm;

/**
* Predefined cron-like schedule shortcuts supported by NATS message schedules
* (per ADR-51). Pass one to
* {@link ScheduledMessageBuilder#schedule(PredefinedSchedules)}.
*/
public enum PredefinedSchedules {
/**
* Run once a year, midnight, Jan. 1st. Same as Yearly. Equivalent to cron string 0 0 0 1 1 *
Expand Down
Loading
Loading