Skip to content

Commit 4b47f79

Browse files
(2.14) Reset consumer to new starting sequence
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent b94011e commit 4b47f79

File tree

7 files changed

+329
-1
lines changed

7 files changed

+329
-1
lines changed

server/consumer.go

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,13 @@ type consumer struct {
437437
lss *lastSeqSkipList
438438
rlimit *rate.Limiter
439439
reqSub *subscription
440+
resetSub *subscription
440441
ackSub *subscription
441442
ackReplyT string
442443
ackSubj string
443444
nextMsgSubj string
444445
nextMsgReqs *ipQueue[*nextMsgReq]
446+
resetSubj string
445447
maxp int
446448
pblimit int
447449
maxpb int
@@ -1263,6 +1265,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12631265
o.ackReplyT = fmt.Sprintf("%s.%%d.%%d.%%d.%%d.%%d", pre)
12641266
o.ackSubj = fmt.Sprintf("%s.*.*.*.*.*", pre)
12651267
o.nextMsgSubj = fmt.Sprintf(JSApiRequestNextT, mn, o.name)
1268+
o.resetSubj = fmt.Sprintf(JSApiConsumerResetT, mn, o.name)
12661269

12671270
// Check/update the inactive threshold
12681271
o.updateInactiveThreshold(&o.cfg)
@@ -1547,6 +1550,11 @@ func (o *consumer) setLeader(isLeader bool) {
15471550
o.deleteWithoutAdvisory()
15481551
return
15491552
}
1553+
if o.resetSub, err = o.subscribeInternal(o.resetSubj, o.processResetReq); err != nil {
1554+
o.mu.Unlock()
1555+
o.deleteWithoutAdvisory()
1556+
return
1557+
}
15501558

15511559
// Check on flow control settings.
15521560
if o.cfg.FlowControl {
@@ -1667,8 +1675,9 @@ func (o *consumer) setLeader(isLeader bool) {
16671675
// ok if they are nil, we protect inside unsubscribe()
16681676
o.unsubscribe(o.ackSub)
16691677
o.unsubscribe(o.reqSub)
1678+
o.unsubscribe(o.resetSub)
16701679
o.unsubscribe(o.fcSub)
1671-
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
1680+
o.ackSub, o.reqSub, o.resetSub, o.fcSub = nil, nil, nil, nil
16721681
if o.infoSub != nil {
16731682
o.srv.sysUnsubscribe(o.infoSub)
16741683
o.infoSub = nil
@@ -2596,6 +2605,53 @@ func (o *consumer) updateSkipped(seq uint64) {
25962605
o.propose(b[:])
25972606
}
25982607

2608+
func (o *consumer) resetStartingSeq(seq uint64, reply string) {
2609+
o.mu.Lock()
2610+
defer o.mu.Unlock()
2611+
2612+
// Must be a minimum of 1.
2613+
if seq <= 0 {
2614+
seq = 1
2615+
}
2616+
o.resetLocalStartingSeq(seq)
2617+
// Clustered mode and R>1.
2618+
if o.node != nil {
2619+
b := make([]byte, 1+8+len(reply))
2620+
b[0] = byte(resetSeqOp)
2621+
var le = binary.LittleEndian
2622+
le.PutUint64(b[1:], seq)
2623+
copy(b[1+8:], reply)
2624+
o.propose(b[:])
2625+
} else if o.store != nil {
2626+
o.store.Reset(seq - 1)
2627+
if reply != _EMPTY_ {
2628+
o.outq.sendMsg(reply, nil)
2629+
}
2630+
// Cleanup messages that lost interest.
2631+
if o.retention == InterestPolicy {
2632+
if mset := o.mset; mset != nil {
2633+
o.mu.Unlock()
2634+
ss := mset.state()
2635+
o.checkStateForInterestStream(&ss)
2636+
o.mu.Lock()
2637+
}
2638+
}
2639+
2640+
// Recalculate pending, and re-trigger message delivery.
2641+
o.streamNumPending()
2642+
o.signalNewMessages()
2643+
}
2644+
}
2645+
2646+
// Lock should be held.
2647+
func (o *consumer) resetLocalStartingSeq(seq uint64) {
2648+
o.pending, o.rdc = nil, nil
2649+
o.rdq = nil
2650+
o.rdqi.Empty()
2651+
o.sseq, o.dseq = seq, 1
2652+
o.adflr, o.asflr = o.dseq-1, o.sseq-1
2653+
}
2654+
25992655
func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
26002656
// On exit make sure we nil out pch.
26012657
defer func() {
@@ -4119,6 +4175,35 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
41194175
o.nextMsgReqs.push(newNextMsgReq(reply, copyBytes(msg)))
41204176
}
41214177

4178+
// processResetReq will reset a consumer to a new starting sequence.
4179+
func (o *consumer) processResetReq(_ *subscription, c *client, _ *Account, _, reply string, rmsg []byte) {
4180+
if reply == _EMPTY_ {
4181+
return
4182+
}
4183+
4184+
sendErr := func(status int, description string) {
4185+
hdr := fmt.Appendf(nil, "NATS/1.0 %d %s\r\n\r\n", status, description)
4186+
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
4187+
}
4188+
4189+
hdr, msg := c.msgParts(rmsg)
4190+
if errorOnRequiredApiLevel(hdr) {
4191+
sendErr(412, "Required Api Level")
4192+
return
4193+
}
4194+
4195+
var req JSApiConsumerResetRequest
4196+
if err := json.Unmarshal(msg, &req); err != nil {
4197+
sendErr(400, "Bad Request")
4198+
return
4199+
}
4200+
if req.Seq == 0 {
4201+
sendErr(400, "Bad Request - Zero Seq")
4202+
return
4203+
}
4204+
o.resetStartingSeq(req.Seq, reply)
4205+
}
4206+
41224207
func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
41234208
o.mu.Lock()
41244209
defer o.mu.Unlock()
@@ -6059,9 +6144,11 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
60596144
o.active = false
60606145
o.unsubscribe(o.ackSub)
60616146
o.unsubscribe(o.reqSub)
6147+
o.unsubscribe(o.resetSub)
60626148
o.unsubscribe(o.fcSub)
60636149
o.ackSub = nil
60646150
o.reqSub = nil
6151+
o.resetSub = nil
60656152
o.fcSub = nil
60666153
if o.infoSub != nil {
60676154
o.srv.sysUnsubscribe(o.infoSub)

server/filestore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10900,6 +10900,7 @@ func (o *consumerFileStore) flushLoop(fch, qch chan struct{}) {
1090010900
func (o *consumerFileStore) SetStarting(sseq uint64) error {
1090110901
o.mu.Lock()
1090210902
o.state.Delivered.Stream = sseq
10903+
o.state.AckFloor.Stream = sseq
1090310904
buf, err := o.encodeState()
1090410905
o.mu.Unlock()
1090510906
if err != nil {
@@ -10924,6 +10925,14 @@ func (o *consumerFileStore) UpdateStarting(sseq uint64) {
1092410925
o.kickFlusher()
1092510926
}
1092610927

10928+
// Reset all values in the store, and reset the starting sequence.
10929+
func (o *consumerFileStore) Reset(sseq uint64) error {
10930+
o.mu.Lock()
10931+
o.state = ConsumerState{}
10932+
o.mu.Unlock()
10933+
return o.SetStarting(sseq)
10934+
}
10935+
1092710936
// HasState returns if this store has a recorded state.
1092810937
func (o *consumerFileStore) HasState() bool {
1092910938
o.mu.Lock()

server/jetstream_api.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ const (
178178
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
179179
JSApiRequestNextT = "$JS.API.CONSUMER.MSG.NEXT.%s.%s"
180180

181+
// JSApiConsumerResetT is the prefix for resetting a given consumer to a new starting sequence.
182+
JSApiConsumerResetT = "$JS.API.CONSUMER.RESET.%s.%s"
183+
181184
// JSApiConsumerUnpinT is the prefix for unpinning subscription for a given consumer.
182185
JSApiConsumerUnpin = "$JS.API.CONSUMER.UNPIN.*.*"
183186
JSApiConsumerUnpinT = "$JS.API.CONSUMER.UNPIN.%s.%s"
@@ -780,6 +783,11 @@ type JSApiConsumerGetNextRequest struct {
780783
PriorityGroup
781784
}
782785

786+
// JSApiConsumerResetRequest is for resetting a consumer to a specific sequence.
787+
type JSApiConsumerResetRequest struct {
788+
Seq uint64 `json:"seq"`
789+
}
790+
783791
// JSApiStreamTemplateCreateResponse for creating templates.
784792
// Deprecated: stream templates are deprecated and will be removed in a future version.
785793
type JSApiStreamTemplateCreateResponse struct {

server/jetstream_cluster.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ const (
119119
// Batch stream ops.
120120
batchMsgOp
121121
batchCommitMsgOp
122+
// Consumer rest to specific starting sequence.
123+
resetSeqOp
122124
)
123125

124126
// raftGroups are controlled by the metagroup controller.
@@ -5777,6 +5779,33 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
57775779
o.store.UpdateStarting(sseq - 1)
57785780
}
57795781
o.mu.Unlock()
5782+
case resetSeqOp:
5783+
o.mu.Lock()
5784+
var le = binary.LittleEndian
5785+
sseq := le.Uint64(buf[1:9])
5786+
reply := string(buf[9:])
5787+
o.resetLocalStartingSeq(sseq)
5788+
if o.store != nil {
5789+
o.store.Reset(sseq - 1)
5790+
}
5791+
if reply != _EMPTY_ {
5792+
o.outq.sendMsg(reply, nil)
5793+
}
5794+
// Cleanup messages that lost interest.
5795+
if o.retention == InterestPolicy {
5796+
if mset := o.mset; mset != nil {
5797+
o.mu.Unlock()
5798+
ss := mset.state()
5799+
o.checkStateForInterestStream(&ss)
5800+
o.mu.Lock()
5801+
}
5802+
}
5803+
// Recalculate pending, and re-trigger message delivery.
5804+
if o.isLeader() {
5805+
o.streamNumPending()
5806+
o.signalNewMessages()
5807+
}
5808+
o.mu.Unlock()
57805809
case addPendingRequest:
57815810
o.mu.Lock()
57825811
if !o.isLeader() {

0 commit comments

Comments
 (0)