@@ -94,6 +94,8 @@ public final class ArcpClient implements AutoCloseable, Flow.Subscriber<Envelope
9494 private volatile @ Nullable SessionId sessionId ;
9595 private volatile @ Nullable Session session ;
9696 private volatile boolean closed ;
97+ private final @ Nullable String resumeToken ;
98+ private final @ Nullable Long lastEventSeq ;
9799
98100 private ArcpClient (Builder b ) {
99101 this .transport = Objects .requireNonNull (b .transport , "transport" );
@@ -106,6 +108,8 @@ private ArcpClient(Builder b) {
106108 this .scheduler = b .scheduler != null ? b .scheduler
107109 : Executors .newScheduledThreadPool (1 , r -> Thread .ofPlatform ()
108110 .name ("arcp-client-scheduler" , 0 ).daemon (true ).unstarted (r ));
111+ this .resumeToken = b .resumeToken ;
112+ this .lastEventSeq = b .lastEventSeq ;
109113 }
110114
111115 public static Builder builder (Transport transport ) {
@@ -116,7 +120,7 @@ public static Builder builder(Transport transport) {
116120 public CompletableFuture <Session > connect () {
117121 transport .incoming ().subscribe (this );
118122 SessionHello hello = new SessionHello (
119- info , auth , Capabilities .of (requestedFeatures ), null , null );
123+ info , auth , Capabilities .of (requestedFeatures ), resumeToken , lastEventSeq );
120124 send (Message .Type .SESSION_HELLO , hello , null , null , null , null );
121125 return sessionFuture ;
122126 }
@@ -204,6 +208,20 @@ public void close() {
204208 scheduler .shutdownNow ();
205209 }
206210
211+ /** Returns the highest event sequence number seen from the server, or -1 if none. */
212+ public long lastSeenSeq () {
213+ return lastSeenSeq .get ();
214+ }
215+
216+ /** Returns the active session after {@link #connect()} completes. */
217+ public Session session () {
218+ Session current = session ;
219+ if (current == null ) {
220+ throw new IllegalStateException ("client is not connected" );
221+ }
222+ return current ;
223+ }
224+
207225 @ Override
208226 public void onSubscribe (Flow .Subscription s ) {
209227 this .subscription = s ;
@@ -481,6 +499,8 @@ public static final class Builder {
481499 private boolean autoAck = true ;
482500 private Duration ackInterval = Duration .ofMillis (200 );
483501 private @ Nullable ScheduledExecutorService scheduler ;
502+ private @ Nullable String resumeToken ;
503+ private @ Nullable Long lastEventSeq ;
484504
485505 Builder (Transport transport ) {
486506 this .transport = transport ;
@@ -526,6 +546,21 @@ public Builder scheduler(ScheduledExecutorService s) {
526546 return this ;
527547 }
528548
549+ /** Resume a prior session by supplying the token received in {@link Session#resumeToken()}. */
550+ public Builder resumeToken (String token ) {
551+ this .resumeToken = token ;
552+ return this ;
553+ }
554+
555+ /**
556+ * Resume from a known event sequence number (§6.3). Used together with
557+ * {@link #resumeToken(String)} to re-subscribe to events the client may have missed.
558+ */
559+ public Builder lastEventSeq (long seq ) {
560+ this .lastEventSeq = seq ;
561+ return this ;
562+ }
563+
529564 public ArcpClient build () {
530565 return new ArcpClient (this );
531566 }
0 commit comments