Skip to content

Commit

Permalink
Use generic sublist for consumer sublist in streams
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 12, 2025
1 parent b9608fe commit 054e2b1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 47 deletions.
18 changes: 7 additions & 11 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ type consumer struct {
ackMsgs *ipQueue[*jsAckMsg]

// for stream signaling when multiple filters are set.
sigSubs []*subscription
sigSubs []string

// Priority groups
// Details described in ADR-42.
Expand Down Expand Up @@ -5882,23 +5882,23 @@ func (o *consumer) account() *Account {

// Creates a sublist for consumer.
// All subjects share the same callback.
func (o *consumer) signalSubs() []*subscription {
func (o *consumer) signalSubs() []string {
o.mu.Lock()
defer o.mu.Unlock()

if o.sigSubs != nil {
return o.sigSubs
}

subs := []*subscription{}
if o.subjf == nil {
subs = append(subs, &subscription{subject: []byte(fwcs), icb: o.processStreamSignal})
if len(o.subjf) == 0 {
subs := []string{fwcs}
o.sigSubs = subs
return subs
}

subs := make([]string, 0, len(o.subjf))
for _, filter := range o.subjf {
subs = append(subs, &subscription{subject: []byte(filter.subject), icb: o.processStreamSignal})
subs = append(subs, filter.subject)
}
o.sigSubs = subs
return subs
Expand All @@ -5908,7 +5908,7 @@ func (o *consumer) signalSubs() []*subscription {
// We know that this subject matches us by how the parent handles registering us with the signaling sublist,
// but we must check if we are leader.
// We do need the sequence of the message however and we use the msg as the encoded seq.
func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, subject, _ string, seqb []byte) {
func (o *consumer) processStreamSignal(seq uint64) {
// We can get called here now when not leader, so bail fast
// and without acquiring any locks.
if !o.leader.Load() {
Expand All @@ -5919,10 +5919,6 @@ func (o *consumer) processStreamSignal(_ *subscription, _ *client, _ *Account, s
if o.mset == nil {
return
}

var le = binary.LittleEndian
seq := le.Uint64(seqb)

if seq > o.npf {
o.npc++
}
Expand Down
2 changes: 1 addition & 1 deletion server/gsl/gsl.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type GenericSublist[T comparable] struct {
// A node contains subscriptions and a pointer to the next level.
type node[T comparable] struct {
next *level[T]
subs map[T]string
subs map[T]string // value -> subject
}

// A level represents a group of nodes and special pointers to
Expand Down
63 changes: 28 additions & 35 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package server
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand All @@ -33,6 +32,7 @@ import (
"time"

"github.com/klauspost/compress/s2"
"github.com/nats-io/nats-server/v2/server/gsl"
"github.com/nats-io/nuid"
)

Expand Down Expand Up @@ -329,10 +329,10 @@ type stream struct {

// For processing consumers without main stream lock.
clsMu sync.RWMutex
cList []*consumer // Consumer list.
sch chan struct{} // Channel to signal consumers.
sigq *ipQueue[*cMsg] // Intra-process queue for the messages to signal to the consumers.
csl *Sublist // Consumer subscription list.
cList []*consumer // Consumer list.
sch chan struct{} // Channel to signal consumers.
sigq *ipQueue[*cMsg] // Intra-process queue for the messages to signal to the consumers.
csl *gsl.GenericSublist[*consumer] // Consumer subscription list.

// Leader will store seq/msgTrace in clustering mode. Used in applyStreamEntries
// to know if trace event should be sent after processing.
Expand Down Expand Up @@ -4118,9 +4118,14 @@ func (mset *stream) storeUpdates(md, bd int64, seq uint64, subj string) {
if md == -1 && seq > 0 && subj != _EMPTY_ {
// We use our consumer list mutex here instead of the main stream lock since it may be held already.
mset.clsMu.RLock()
// TODO(dlc) - Do sublist like signaling so we do not have to match?
for _, o := range mset.cList {
o.decStreamPending(seq, subj)
if mset.csl != nil {
mset.csl.Match(subj, func(o *consumer) {
o.decStreamPending(seq, subj)
})
} else {
for _, o := range mset.cList {
o.decStreamPending(seq, subj)
}
}
mset.clsMu.RUnlock()
} else if md < 0 {
Expand Down Expand Up @@ -5320,24 +5325,14 @@ func (mset *stream) signalConsumersLoop() {
// This will update and signal all consumers that match.
func (mset *stream) signalConsumers(subj string, seq uint64) {
mset.clsMu.RLock()
if mset.csl == nil {
mset.clsMu.RUnlock()
return
}
r := mset.csl.Match(subj)
mset.clsMu.RUnlock()

if len(r.psubs) == 0 {
defer mset.clsMu.RUnlock()
csl := mset.csl
if csl == nil {
return
}
// Encode the sequence here.
var eseq [8]byte
var le = binary.LittleEndian
le.PutUint64(eseq[:], seq)
msg := eseq[:]
for _, sub := range r.psubs {
sub.icb(sub, nil, nil, subj, _EMPTY_, msg)
}
csl.Match(subj, func(o *consumer) {
o.processStreamSignal(seq)
})
}

// Internal message for use by jetstream subsystem.
Expand Down Expand Up @@ -5892,10 +5887,10 @@ func (mset *stream) setConsumer(o *consumer) {
mset.clsMu.Lock()
mset.cList = append(mset.cList, o)
if mset.csl == nil {
mset.csl = NewSublistWithCache()
mset.csl = gsl.NewSublist[*consumer]()
}
for _, sub := range o.signalSubs() {
mset.csl.Insert(sub)
mset.csl.Insert(sub, o)
}
mset.clsMu.Unlock()
}
Expand All @@ -5921,7 +5916,7 @@ func (mset *stream) removeConsumer(o *consumer) {
// Always remove from the leader sublist.
if mset.csl != nil {
for _, sub := range o.signalSubs() {
mset.csl.Remove(sub)
mset.csl.Remove(sub, o)
}
}
mset.clsMu.Unlock()
Expand All @@ -5943,27 +5938,25 @@ func (mset *stream) swapSigSubs(o *consumer, newFilters []string) {
if o.sigSubs != nil {
if mset.csl != nil {
for _, sub := range o.sigSubs {
mset.csl.Remove(sub)
mset.csl.Remove(sub, o)
}
}
o.sigSubs = nil
}

if o.isLeader() {
if mset.csl == nil {
mset.csl = NewSublistWithCache()
mset.csl = gsl.NewSublist[*consumer]()
}
// If no filters are preset, add fwcs to sublist for that consumer.
if newFilters == nil {
sub := &subscription{subject: []byte(fwcs), icb: o.processStreamSignal}
mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
mset.csl.Insert(fwcs, o)
o.sigSubs = append(o.sigSubs, fwcs)
// If there are filters, add their subjects to sublist.
} else {
for _, filter := range newFilters {
sub := &subscription{subject: []byte(filter), icb: o.processStreamSignal}
mset.csl.Insert(sub)
o.sigSubs = append(o.sigSubs, sub)
mset.csl.Insert(filter, o)
o.sigSubs = append(o.sigSubs, filter)
}
}
}
Expand Down

0 comments on commit 054e2b1

Please sign in to comment.