Skip to content

Commit

Permalink
GH-241: Added indirectsent counter to keep track if client of sent re…
Browse files Browse the repository at this point in the history
…ferences. Incomplete.
  • Loading branch information
jirenius committed Jun 6, 2024
1 parent 3084160 commit 0cac8a8
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 54 deletions.
65 changes: 46 additions & 19 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ConnSubscriber interface {
CID() string
Token() json.RawMessage
Subscribe(rid string, direct bool, throttle *rescache.Throttle) (*Subscription, error)
Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool)
Unsubscribe(sub *Subscription, direct bool, sent bool, count int, tryDelete bool)
Access(sub *Subscription, callback func(*rescache.Access))
Send(data []byte)
Enqueue(f func()) bool
Expand Down Expand Up @@ -56,8 +56,9 @@ type Subscription struct {
throttle *rescache.Throttle

// Protected by conn
direct int // Number of direct subscriptions
indirect int // Number of indirect subscriptions
direct int // Number of direct subscriptions
indirect int // Number of indirect subscriptions (sent or loading)
indirectsent int // Number of indirect subscriptions (sent)
}

type reference struct {
Expand Down Expand Up @@ -267,12 +268,12 @@ func (s *Subscription) onLoaded(rcb *readyCallback) {

// GetRPCResources returns a rpc.Resources object.
// It will lock the subscription and queue any events until ReleaseRPCResources is called.
func (s *Subscription) GetRPCResources() *rpc.Resources {
func (s *Subscription) GetRPCResources(indirect bool) *rpc.Resources {
r := &rpc.Resources{}
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
s.populateResourcesLegacy(r)
} else {
s.populateResources(r)
s.populateResources(r, indirect)
}
return r
}
Expand Down Expand Up @@ -326,7 +327,11 @@ func (s *Subscription) unqueueEvents(reason uint8) {
// populateResources iterates recursively down the subscription tree
// and populates the rpc.Resources object with all non-sent resources
// referenced by the subscription, as well as the subscription's own data.
func (s *Subscription) populateResources(r *rpc.Resources) {
func (s *Subscription) populateResources(r *rpc.Resources, indirect bool) {
if indirect {
s.indirectsent++
}

// Quick exit if resource is already sent
if s.state == stateSent || s.state == stateToSend {
return
Expand Down Expand Up @@ -362,7 +367,7 @@ func (s *Subscription) populateResources(r *rpc.Resources) {
s.state = stateToSend

for _, sc := range s.refs {
sc.sub.populateResources(r)
sc.sub.populateResources(r, true)
}
}

Expand Down Expand Up @@ -449,7 +454,7 @@ func (s *Subscription) subscribeRef(v codec.Value) bool {
// we unsubscribe to all and exit with error
s.c.Debugf("Failed to subscribe to %s. Aborting subscribeRef", v.RID)
for _, ref := range s.refs {
s.c.Unsubscribe(ref.sub, false, 1, true)
s.c.Unsubscribe(ref.sub, false, false, 1, true)
}
s.refs = nil
s.err = err
Expand Down Expand Up @@ -494,7 +499,7 @@ func containsString(path []string, rid string) bool {

func (s *Subscription) unsubscribeRefs() {
for _, ref := range s.refs {
s.c.Unsubscribe(ref.sub, false, 1, false)
s.c.Unsubscribe(ref.sub, false, s.IsSent(), 1, false)
}
s.refs = nil
}
Expand Down Expand Up @@ -532,7 +537,7 @@ func (s *Subscription) removeReference(rid string) {
ref := s.refs[rid]
ref.count--
if ref.count == 0 {
s.c.Unsubscribe(ref.sub, false, 1, true)
s.c.Unsubscribe(ref.sub, false, s.IsSent(), 1, true)
delete(s.refs, rid)
}
}
Expand Down Expand Up @@ -612,7 +617,7 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {
return
}

r := sub.GetRPCResources()
r := sub.GetRPCResources(true)
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.AddEvent{Idx: idx, Value: v.RawMessage, Resources: r}))
sub.ReleaseRPCResources()

Expand Down Expand Up @@ -654,6 +659,7 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
ch := event.Changed
old := event.OldValues
var subs []*Subscription
hasUnsent := false

for _, v := range ch {
if v.Type == codec.ValueTypeReference {
Expand All @@ -663,12 +669,11 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
// TODO handle error properly
return
}
if !sub.IsSent() {
if subs == nil {
subs = make([]*Subscription, 0, len(ch))
}
subs = append(subs, sub)
hasUnsent = hasUnsent || !sub.IsSent()
if subs == nil {
subs = make([]*Subscription, 0, len(ch))
}
subs = append(subs, sub)
}
}

Expand All @@ -681,7 +686,13 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
}

// Quick exit if there are no new unsent subscriptions
if subs == nil {
if !hasUnsent {
// We increase the indirectsent references, otherwise increased when
// calling sub.populateResources, in a simple loop, since we have no
// new resources to populate.
for _, sub := range subs {
sub.indirectsent++
}
// Legacy behavior
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed)}))
Expand Down Expand Up @@ -716,7 +727,7 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed), Resources: r}))
} else {
for _, sub := range subs {
sub.populateResources(r)
sub.populateResources(r, true)
}
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: event.Changed, Resources: r}))
}
Expand Down Expand Up @@ -769,7 +780,7 @@ func (s *Subscription) validateAccess(a *rescache.Access) {
// an unsubscribe event if any direct subscriptions existed.
func (s *Subscription) unsubscribeDirect(reason *reserr.Error) {
if s.direct > 0 {
s.c.Unsubscribe(s, true, s.direct, true)
s.c.Unsubscribe(s, true, false, s.direct, true)
s.c.Send(rpc.NewEvent(s.rid, "unsubscribe", rpc.UnsubscribeEvent{Reason: reason}))
}
}
Expand All @@ -796,6 +807,22 @@ func (s *Subscription) Dispose() {
}
}

// Unsend sets its indirectsent to zero for itself and counts down 1 for all its
// references. It also sets status to stateReady. Is called from tryDelete when
// a subscription has indirect references, but has reached 0 indirectsent
// references.
func (s *Subscription) Unsend() {
s.state = stateReady
s.indirectsent = 0

for _, ref := range s.refs {
sub := ref.sub
if sub.state == stateSent && sub.indirectsent > 0 {
sub.indirectsent--
}
}
}

// doneLoading will decrease all loading counters for
// each readyCallback, and test if they reach 0.
func (s *Subscription) doneLoading() {
Expand Down
40 changes: 23 additions & 17 deletions server/wsConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *wsConn) GetResource(rid string, cb func(data *rpc.Resources, err error)
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -245,9 +245,9 @@ func (c *wsConn) GetResource(rid string, cb func(data *rpc.Resources, err error)
return
}

cb(sub.GetRPCResources(), nil)
cb(sub.GetRPCResources(false), nil)
sub.ReleaseRPCResources()
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
})
})
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (c *wsConn) GetSubscription(rid string, cb func(sub *Subscription, err erro
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -304,7 +304,7 @@ func (c *wsConn) GetSubscription(rid string, cb func(sub *Subscription, err erro
}
cb(sub, nil)
sub.ReleaseRPCResources()
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
})
})
}
Expand All @@ -319,19 +319,19 @@ func (c *wsConn) SubscribeResource(rid string, cb func(data *rpc.Resources, err
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

sub.OnReady(func() {
err := sub.Error()
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

cb(sub.GetRPCResources(), nil)
cb(sub.GetRPCResources(false), nil)
sub.ReleaseRPCResources()
})
})
Expand Down Expand Up @@ -446,7 +446,7 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{},
},
},
}, nil)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -455,7 +455,7 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{},
// as the call in itself succeeded.
cb(&rpc.CallResourceResult{
RID: sub.RID(),
Resources: sub.GetRPCResources(),
Resources: sub.GetRPCResources(false),
}, nil)
sub.ReleaseRPCResources()
})
Expand Down Expand Up @@ -502,12 +502,12 @@ func (c *wsConn) Subscribe(rid string, direct bool, t *rescache.Throttle) (*Subs

// unsubscribe counts down the subscription counter
// and deletes the subscription if the count reached 0.
func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool) {
func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, sent bool, count int, tryDelete bool) {
if c.disposing {
return
}

c.removeCount(sub, direct, count, tryDelete)
c.removeCount(sub, direct, sent, count, tryDelete)
}

func (c *wsConn) UnsubscribeByRID(rid string, count int) bool {
Expand All @@ -520,7 +520,7 @@ func (c *wsConn) UnsubscribeByRID(rid string, count int) bool {
return false
}

c.removeCount(sub, true, count, true)
c.removeCount(sub, true, false, count, true)
return true
}

Expand All @@ -539,17 +539,23 @@ func (c *wsConn) addCount(s *Subscription, direct bool) error {
return nil
}

// removeCount decreases the subscription count and disposes the subscription
// if indirect and direct subscription count reaches 0
func (c *wsConn) removeCount(s *Subscription, direct bool, count int, tryDelete bool) {
if s.direct+s.indirect == 0 {
// removeCount decreases the subscription count and disposes the subscription if
// indirect, indirectsent, and direct subscription count reaches 0. If direct is
// false and the parent resource indirectly referencing the subscription has
// been sent to the client, sent should bet true. If direct is true, sent is
// ignored.
func (c *wsConn) removeCount(s *Subscription, direct bool, sent bool, count int, tryDelete bool) {
if s.direct+s.indirect+s.indirectsent == 0 {
return
}

if direct {
s.direct -= count
} else {
s.indirect -= count
if sent {
s.indirectsent -= count
}
}

if tryDelete {
Expand Down
Loading

0 comments on commit 0cac8a8

Please sign in to comment.