Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/gh 241 referenced resources not included when unsubscribed #242

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 58 additions & 23 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ConnSubscriber interface {
CID() string
Token() json.RawMessage
Subscribe(rid string, direct bool, throttle *rescache.Throttle) (*Subscription, error)
Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool)
Unsubscribe(sub *Subscription, direct bool, sent bool, count int, tryDelete bool)
Access(sub *Subscription, callback func(*rescache.Access))
Send(data []byte)
Enqueue(f func()) bool
Expand Down Expand Up @@ -56,8 +56,9 @@ type Subscription struct {
throttle *rescache.Throttle

// Protected by conn
direct int // Number of direct subscriptions
indirect int // Number of indirect subscriptions
direct int // Number of direct subscriptions
indirect int // Number of indirect subscriptions (sent or loading)
indirectsent int // Number of indirect subscriptions (sent)
}

type reference struct {
Expand Down Expand Up @@ -267,12 +268,12 @@ func (s *Subscription) onLoaded(rcb *readyCallback) {

// GetRPCResources returns a rpc.Resources object.
// It will lock the subscription and queue any events until ReleaseRPCResources is called.
func (s *Subscription) GetRPCResources() *rpc.Resources {
func (s *Subscription) GetRPCResources(indirect bool) *rpc.Resources {
r := &rpc.Resources{}
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
s.populateResourcesLegacy(r)
s.populateResourcesLegacy(r, indirect)
} else {
s.populateResources(r)
s.populateResources(r, indirect)
}
return r
}
Expand Down Expand Up @@ -326,7 +327,11 @@ func (s *Subscription) unqueueEvents(reason uint8) {
// populateResources iterates recursively down the subscription tree
// and populates the rpc.Resources object with all non-sent resources
// referenced by the subscription, as well as the subscription's own data.
func (s *Subscription) populateResources(r *rpc.Resources) {
func (s *Subscription) populateResources(r *rpc.Resources, indirect bool) {
if indirect {
s.indirectsent++
}

// Quick exit if resource is already sent
if s.state == stateSent || s.state == stateToSend {
return
Expand Down Expand Up @@ -362,13 +367,16 @@ func (s *Subscription) populateResources(r *rpc.Resources) {
s.state = stateToSend

for _, sc := range s.refs {
sc.sub.populateResources(r)
sc.sub.populateResources(r, true)
}
}

// populateResourcesLegacy is the same as populateResources, but uses legacy
// encodings of resources.
func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) {
func (s *Subscription) populateResourcesLegacy(r *rpc.Resources, indirect bool) {
if indirect {
s.indirectsent++
}
// Quick exit if resource is already sent
if s.state == stateSent || s.state == stateToSend {
return
Expand Down Expand Up @@ -404,7 +412,7 @@ func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) {
s.state = stateToSend

for _, sc := range s.refs {
sc.sub.populateResourcesLegacy(r)
sc.sub.populateResourcesLegacy(r, true)
}
}

Expand Down Expand Up @@ -449,7 +457,7 @@ func (s *Subscription) subscribeRef(v codec.Value) bool {
// we unsubscribe to all and exit with error
s.c.Debugf("Failed to subscribe to %s. Aborting subscribeRef", v.RID)
for _, ref := range s.refs {
s.c.Unsubscribe(ref.sub, false, 1, true)
s.c.Unsubscribe(ref.sub, false, false, 1, true)
}
s.refs = nil
s.err = err
Expand Down Expand Up @@ -493,8 +501,9 @@ func containsString(path []string, rid string) bool {
}

func (s *Subscription) unsubscribeRefs() {
sent := s.IsSent()
for _, ref := range s.refs {
s.c.Unsubscribe(ref.sub, false, 1, false)
s.c.Unsubscribe(ref.sub, false, sent, 1, false)
}
s.refs = nil
}
Expand Down Expand Up @@ -532,7 +541,7 @@ func (s *Subscription) removeReference(rid string) {
ref := s.refs[rid]
ref.count--
if ref.count == 0 {
s.c.Unsubscribe(ref.sub, false, 1, true)
s.c.Unsubscribe(ref.sub, false, s.IsSent(), 1, true)
delete(s.refs, rid)
}
}
Expand Down Expand Up @@ -598,6 +607,10 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {

// Quick exit if added resource is already sent to client
if sub.IsSent() {
// We increase the indirectsent references, otherwise increased
// when calling sub.GetRPCResources, since we have no new
// resources to populate.
sub.indirectsent++
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.AddEvent{Idx: idx, Value: v.RawMessage}))
return
}
Expand All @@ -612,7 +625,7 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {
return
}

r := sub.GetRPCResources()
r := sub.GetRPCResources(true)
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.AddEvent{Idx: idx, Value: v.RawMessage, Resources: r}))
sub.ReleaseRPCResources()

Expand Down Expand Up @@ -654,6 +667,7 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
ch := event.Changed
old := event.OldValues
var subs []*Subscription
hasUnsent := false

for _, v := range ch {
if v.Type == codec.ValueTypeReference {
Expand All @@ -663,12 +677,11 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
// TODO handle error properly
return
}
if !sub.IsSent() {
if subs == nil {
subs = make([]*Subscription, 0, len(ch))
}
subs = append(subs, sub)
hasUnsent = hasUnsent || !sub.IsSent()
if subs == nil {
subs = make([]*Subscription, 0, len(ch))
}
subs = append(subs, sub)
}
}

Expand All @@ -681,7 +694,13 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
}

// Quick exit if there are no new unsent subscriptions
if subs == nil {
if !hasUnsent {
// We increase the indirectsent references, otherwise increased when
// calling sub.populateResources, in a simple loop, since we have no
// new resources to populate.
for _, sub := range subs {
sub.indirectsent++
}
// Legacy behavior
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed)}))
Expand Down Expand Up @@ -711,12 +730,12 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
// Legacy behavior
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
for _, sub := range subs {
sub.populateResourcesLegacy(r)
sub.populateResourcesLegacy(r, true)
}
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed), Resources: r}))
} else {
for _, sub := range subs {
sub.populateResources(r)
sub.populateResources(r, true)
}
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: event.Changed, Resources: r}))
}
Expand Down Expand Up @@ -769,7 +788,7 @@ func (s *Subscription) validateAccess(a *rescache.Access) {
// an unsubscribe event if any direct subscriptions existed.
func (s *Subscription) unsubscribeDirect(reason *reserr.Error) {
if s.direct > 0 {
s.c.Unsubscribe(s, true, s.direct, true)
s.c.Unsubscribe(s, true, false, s.direct, true)
s.c.Send(rpc.NewEvent(s.rid, "unsubscribe", rpc.UnsubscribeEvent{Reason: reason}))
}
}
Expand All @@ -796,6 +815,22 @@ func (s *Subscription) Dispose() {
}
}

// Unsend sets its indirectsent to zero for itself and counts down 1 for all its
// references. It also sets status to stateReady. Is called from tryDelete when
// a subscription has indirect references, but has reached 0 indirectsent
// references.
func (s *Subscription) Unsend() {
s.state = stateReady
s.indirectsent = 0

for _, ref := range s.refs {
sub := ref.sub
if sub.state == stateSent && sub.indirectsent > 0 {
sub.indirectsent--
}
}
}

// doneLoading will decrease all loading counters for
// each readyCallback, and test if they reach 0.
func (s *Subscription) doneLoading() {
Expand Down
40 changes: 23 additions & 17 deletions server/wsConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (c *wsConn) GetResource(rid string, cb func(data *rpc.Resources, err error)
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -245,9 +245,9 @@ func (c *wsConn) GetResource(rid string, cb func(data *rpc.Resources, err error)
return
}

cb(sub.GetRPCResources(), nil)
cb(sub.GetRPCResources(false), nil)
sub.ReleaseRPCResources()
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
})
})
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (c *wsConn) GetSubscription(rid string, cb func(sub *Subscription, err erro
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -304,7 +304,7 @@ func (c *wsConn) GetSubscription(rid string, cb func(sub *Subscription, err erro
}
cb(sub, nil)
sub.ReleaseRPCResources()
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
})
})
}
Expand All @@ -319,19 +319,19 @@ func (c *wsConn) SubscribeResource(rid string, cb func(data *rpc.Resources, err
sub.CanGet(func(err error) {
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

sub.OnReady(func() {
err := sub.Error()
if err != nil {
cb(nil, err)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

cb(sub.GetRPCResources(), nil)
cb(sub.GetRPCResources(false), nil)
sub.ReleaseRPCResources()
})
})
Expand Down Expand Up @@ -446,7 +446,7 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{},
},
},
}, nil)
c.Unsubscribe(sub, true, 1, true)
c.Unsubscribe(sub, true, false, 1, true)
return
}

Expand All @@ -455,7 +455,7 @@ func (c *wsConn) handleResourceResult(refRID string, cb func(result interface{},
// as the call in itself succeeded.
cb(&rpc.CallResourceResult{
RID: sub.RID(),
Resources: sub.GetRPCResources(),
Resources: sub.GetRPCResources(false),
}, nil)
sub.ReleaseRPCResources()
})
Expand Down Expand Up @@ -502,12 +502,12 @@ func (c *wsConn) Subscribe(rid string, direct bool, t *rescache.Throttle) (*Subs

// unsubscribe counts down the subscription counter
// and deletes the subscription if the count reached 0.
func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, count int, tryDelete bool) {
func (c *wsConn) Unsubscribe(sub *Subscription, direct bool, sent bool, count int, tryDelete bool) {
if c.disposing {
return
}

c.removeCount(sub, direct, count, tryDelete)
c.removeCount(sub, direct, sent, count, tryDelete)
}

func (c *wsConn) UnsubscribeByRID(rid string, count int) bool {
Expand All @@ -520,7 +520,7 @@ func (c *wsConn) UnsubscribeByRID(rid string, count int) bool {
return false
}

c.removeCount(sub, true, count, true)
c.removeCount(sub, true, false, count, true)
return true
}

Expand All @@ -539,17 +539,23 @@ func (c *wsConn) addCount(s *Subscription, direct bool) error {
return nil
}

// removeCount decreases the subscription count and disposes the subscription
// if indirect and direct subscription count reaches 0
func (c *wsConn) removeCount(s *Subscription, direct bool, count int, tryDelete bool) {
if s.direct+s.indirect == 0 {
// removeCount decreases the subscription count and disposes the subscription if
// indirect, indirectsent, and direct subscription count reaches 0. If direct is
// false and the parent resource indirectly referencing the subscription has
// been sent to the client, sent should bet true. If direct is true, sent is
// ignored.
func (c *wsConn) removeCount(s *Subscription, direct bool, sent bool, count int, tryDelete bool) {
if s.direct+s.indirect+s.indirectsent == 0 {
return
}

if direct {
s.direct -= count
} else {
s.indirect -= count
if sent {
s.indirectsent -= count
}
}

if tryDelete {
Expand Down
Loading