Skip to content

Commit 5d59f80

Browse files
(2.14) Reset consumer to new starting sequence
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 7cdd5cf commit 5d59f80

File tree

9 files changed

+581
-1
lines changed

9 files changed

+581
-1
lines changed

server/consumer.go

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,13 @@ type consumer struct {
437437
lss *lastSeqSkipList
438438
rlimit *rate.Limiter
439439
reqSub *subscription
440+
resetSub *subscription
440441
ackSub *subscription
441442
ackReplyT string
442443
ackSubj string
443444
nextMsgSubj string
444445
nextMsgReqs *ipQueue[*nextMsgReq]
446+
resetSubj string
445447
maxp int
446448
pblimit int
447449
maxpb int
@@ -1263,6 +1265,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12631265
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
12641266
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
12651267
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
1268+
o.resetSubj = fmt.Sprintf(JSApiConsumerResetT, mn, o.name)
12661269

12671270
// Check/update the inactive threshold
12681271
o.updateInactiveThreshold(&o.cfg)
@@ -1547,6 +1550,11 @@ func (o *consumer) setLeader(isLeader bool) {
15471550
o.deleteWithoutAdvisory()
15481551
return
15491552
}
1553+
if o.resetSub, err = o.subscribeInternal(o.resetSubj, o.processResetReq); err != nil {
1554+
o.mu.Unlock()
1555+
o.deleteWithoutAdvisory()
1556+
return
1557+
}
15501558

15511559
// Check on flow control settings.
15521560
if o.cfg.FlowControl {
@@ -1667,8 +1675,9 @@ func (o *consumer) setLeader(isLeader bool) {
16671675
// ok if they are nil, we protect inside unsubscribe()
16681676
o.unsubscribe(o.ackSub)
16691677
o.unsubscribe(o.reqSub)
1678+
o.unsubscribe(o.resetSub)
16701679
o.unsubscribe(o.fcSub)
1671-
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
1680+
o.ackSub, o.reqSub, o.resetSub, o.fcSub = nil, nil, nil, nil
16721681
if o.infoSub != nil {
16731682
o.srv.sysUnsubscribe(o.infoSub)
16741683
o.infoSub = nil
@@ -2596,6 +2605,78 @@ func (o *consumer) updateSkipped(seq uint64) {
25962605
o.propose(b[:])
25972606
}
25982607

2608+
func (o *consumer) resetStartingSeq(seq uint64, reply string) (uint64, bool, error) {
2609+
o.mu.Lock()
2610+
defer o.mu.Unlock()
2611+
2612+
// Reset to a specific sequence, or back to the ack floor.
2613+
if seq == 0 {
2614+
seq = o.asflr + 1
2615+
} else if o.cfg.DeliverPolicy == DeliverAll {
2616+
// Always allowed.
2617+
goto VALID
2618+
} else if o.cfg.DeliverPolicy == DeliverByStartSequence {
2619+
// Only allowed if not going below what's configured.
2620+
if seq < o.cfg.OptStartSeq {
2621+
return 0, false, errors.New("below start seq")
2622+
}
2623+
goto VALID
2624+
} else if o.cfg.DeliverPolicy == DeliverByStartTime && o.mset != nil {
2625+
// Only allowed if not going below what's configured.
2626+
nseq := o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
2627+
if seq < nseq {
2628+
return 0, false, errors.New("below start time")
2629+
}
2630+
goto VALID
2631+
} else {
2632+
return 0, false, errors.New("not allowed")
2633+
}
2634+
2635+
VALID:
2636+
// Must be a minimum of 1.
2637+
if seq <= 0 {
2638+
seq = 1
2639+
}
2640+
o.resetLocalStartingSeq(seq)
2641+
// Clustered mode and R>1.
2642+
if o.node != nil {
2643+
b := make([]byte, 1+8+len(reply))
2644+
b[0] = byte(resetSeqOp)
2645+
var le = binary.LittleEndian
2646+
le.PutUint64(b[1:], seq)
2647+
copy(b[1+8:], reply)
2648+
o.propose(b[:])
2649+
return seq, false, nil
2650+
} else if o.store != nil {
2651+
o.store.Reset(seq - 1)
2652+
// Cleanup messages that lost interest.
2653+
if o.retention == InterestPolicy {
2654+
if mset := o.mset; mset != nil {
2655+
o.mu.Unlock()
2656+
ss := mset.state()
2657+
o.checkStateForInterestStream(&ss)
2658+
o.mu.Lock()
2659+
}
2660+
}
2661+
2662+
// Recalculate pending, and re-trigger message delivery.
2663+
o.streamNumPending()
2664+
o.signalNewMessages()
2665+
return seq, true, nil
2666+
}
2667+
return seq, false, nil
2668+
}
2669+
2670+
// Lock should be held.
2671+
func (o *consumer) resetLocalStartingSeq(seq uint64) {
2672+
o.pending, o.rdc = nil, nil
2673+
o.rdq = nil
2674+
o.rdqi.Empty()
2675+
o.sseq, o.dseq = seq, 1
2676+
o.adflr, o.asflr = o.dseq-1, o.sseq-1
2677+
o.ldt, o.lat = time.Time{}, time.Time{}
2678+
}
2679+
25992680
func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
26002681
// On exit make sure we nil out pch.
26012682
defer func() {
@@ -4119,6 +4200,48 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
41194200
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
41204201
}
41214202

4203+
// processResetReq will reset a consumer to a new starting sequence.
4204+
func (o *consumer) processResetReq(_ *subscription, c *client, a *Account, _, reply string, rmsg []byte) {
4205+
if reply == _EMPTY_ {
4206+
return
4207+
}
4208+
4209+
s := o.srv
4210+
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}
4211+
4212+
hdr, msg := c.msgParts(rmsg)
4213+
if errorOnRequiredApiLevel(hdr) {
4214+
resp.Error = NewJSRequiredApiLevelError()
4215+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4216+
return
4217+
}
4218+
4219+
// An empty message resets back to the ack floor, otherwise a custom sequence is used.
4220+
var req JSApiConsumerResetRequest
4221+
if len(msg) > 0 {
4222+
if err := json.Unmarshal(msg, &req); err != nil {
4223+
resp.Error = NewJSInvalidJSONError(err)
4224+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4225+
return
4226+
}
4227+
// Resetting to 0 is invalid.
4228+
if req.Seq == 0 {
4229+
resp.Error = NewJSInvalidJSONError(errors.New("reset to zero seq"))
4230+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4231+
return
4232+
}
4233+
}
4234+
resetSeq, canRespond, err := o.resetStartingSeq(req.Seq, reply)
4235+
if err != nil {
4236+
resp.Error = NewJSConsumerInvalidResetError(err)
4237+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4238+
} else if canRespond {
4239+
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
4240+
resp.ResetSeq = resetSeq
4241+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
4242+
}
4243+
}
4244+
41224245
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
41234246
o.mu.Lock()
41244247
defer o.mu.Unlock()
@@ -6060,9 +6183,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
60606183
o.active = false
60616184
o.unsubscribe(o.ackSub)
60626185
o.unsubscribe(o.reqSub)
6186+
o.unsubscribe(o.resetSub)
60636187
o.unsubscribe(o.fcSub)
60646188
o.ackSub = nil
60656189
o.reqSub = nil
6190+
o.resetSub = nil
60666191
o.fcSub = nil
60676192
if o.infoSub != nil {
60686193
o.srv.sysUnsubscribe(o.infoSub)

server/errors.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,5 +1998,15 @@
19981998
"help": "",
19991999
"url": "",
20002000
"deprecates": ""
2001+
},
2002+
{
2003+
"constant": "JSConsumerInvalidResetErr",
2004+
"code": 400,
2005+
"error_code": 10202,
2006+
"description": "invalid reset: {err}",
2007+
"comment": "",
2008+
"help": "",
2009+
"url": "",
2010+
"deprecates": ""
20012011
}
20022012
]

server/filestore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11184,6 +11184,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
1118411184
func (o *consumerFileStore) SetStarting(sseq uint64) error {
1118511185
o.mu.Lock()
1118611186
o.state.Delivered.Stream = sseq
11187+
o.state.AckFloor.Stream = sseq
1118711188
buf, err := o.encodeState()
1118811189
o.mu.Unlock()
1118911190
if err != nil {
@@ -11208,6 +11209,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) {
1120811209
o.kickFlusher()
1120911210
}
1121011211

11212+
// Reset all values in the store, and reset the starting sequence.
11213+
func (o *consumerFileStore) Reset(sseq uint64) error {
11214+
o.mu.Lock()
11215+
o.state = ConsumerState{}
11216+
o.mu.Unlock()
11217+
return o.SetStarting(sseq)
11218+
}
11219+
1121111220
// HasState returns if this store has a recorded state.
1121211221
func (o *consumerFileStore) HasState() bool {
1121311222
o.mu.Lock()

server/jetstream_api.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ const (
155155
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
156156
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
157157

158+
// JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
159+
JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
160+
158161
// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
159162
JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"
160163
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
@@ -757,6 +760,19 @@ type JSApiConsumerGetNextRequest struct {
757760
PriorityGroup
758761
}
759762

763+
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
764+
type JSApiConsumerResetRequest struct {
765+
Seq uint64 `json:"seq"`
766+
}
767+
768+
type JSApiConsumerResetResponse struct {
769+
ApiResponse
770+
*ConsumerInfo
771+
ResetSeq uint64 `json:"reset_seq"`
772+
}
773+
774+
const JSApiConsumerResetResponseType = "io.nats.jetstream.api.v1.consumer_reset_response"
775+
760776
// Structure that holds state for a JetStream API request that is processed
761777
// in a separate long-lived go routine. This is to avoid blocking connections.
762778
type jsAPIRoutedReq struct {

server/jetstream_cluster.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ const (
124124
// Batch stream ops.
125125
batchMsgOp
126126
batchCommitMsgOp
127+
// Consumer rest to specific starting sequence.
128+
resetSeqOp
127129
)
128130

129131
// raftGroups are controlled by the metagroup controller.
@@ -5832,6 +5834,39 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
58325834
o.store.UpdateStarting(sseq - 1)
58335835
}
58345836
o.mu.Unlock()
5837+
case resetSeqOp:
5838+
o.mu.Lock()
5839+
var le = binary.LittleEndian
5840+
sseq := le.Uint64(buf[1:9])
5841+
reply := string(buf[9:])
5842+
o.resetLocalStartingSeq(sseq)
5843+
if o.store != nil {
5844+
o.store.Reset(sseq - 1)
5845+
}
5846+
// Cleanup messages that lost interest.
5847+
if o.retention == InterestPolicy {
5848+
if mset := o.mset; mset != nil {
5849+
o.mu.Unlock()
5850+
ss := mset.state()
5851+
o.checkStateForInterestStream(&ss)
5852+
o.mu.Lock()
5853+
}
5854+
}
5855+
// Recalculate pending, and re-trigger message delivery.
5856+
if !o.isLeader() {
5857+
o.mu.Unlock()
5858+
} else {
5859+
o.streamNumPending()
5860+
o.signalNewMessages()
5861+
s, a := o.srv, o.acc
5862+
o.mu.Unlock()
5863+
if reply != _EMPTY_ {
5864+
var resp = JSApiConsumerResetResponse{ApiResponse: ApiResponse{Type: JSApiConsumerResetResponseType}}
5865+
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
5866+
resp.ResetSeq = sseq
5867+
s.sendInternalAccountMsg(a, reply, s.jsonResponse(&resp))
5868+
}
5869+
}
58355870
case addPendingRequest:
58365871
o.mu.Lock()
58375872
if !o.isLeader() {

0 commit comments

Comments
 (0)