Skip to content

Commit

Permalink
GH-241: Fixed bug for legacy behavior.
Browse files Browse the repository at this point in the history
Added more tests for indirectsent.
Added test for collection add event with resource reference.
  • Loading branch information
jirenius committed Jun 7, 2024
1 parent 0cac8a8 commit 0694f5a
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 122 deletions.
18 changes: 13 additions & 5 deletions server/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *Subscription) onLoaded(rcb *readyCallback) {
func (s *Subscription) GetRPCResources(indirect bool) *rpc.Resources {
r := &rpc.Resources{}
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
s.populateResourcesLegacy(r)
s.populateResourcesLegacy(r, indirect)
} else {
s.populateResources(r, indirect)
}
Expand Down Expand Up @@ -373,7 +373,10 @@ func (s *Subscription) populateResources(r *rpc.Resources, indirect bool) {

// populateResourcesLegacy is the same as populateResources, but uses legacy
// encodings of resources.
func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) {
func (s *Subscription) populateResourcesLegacy(r *rpc.Resources, indirect bool) {
if indirect {
s.indirectsent++
}
// Quick exit if resource is already sent
if s.state == stateSent || s.state == stateToSend {
return
Expand Down Expand Up @@ -409,7 +412,7 @@ func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) {
s.state = stateToSend

for _, sc := range s.refs {
sc.sub.populateResourcesLegacy(r)
sc.sub.populateResourcesLegacy(r, true)
}
}

Expand Down Expand Up @@ -498,8 +501,9 @@ func containsString(path []string, rid string) bool {
}

func (s *Subscription) unsubscribeRefs() {
sent := s.IsSent()
for _, ref := range s.refs {
s.c.Unsubscribe(ref.sub, false, s.IsSent(), 1, false)
s.c.Unsubscribe(ref.sub, false, sent, 1, false)
}
s.refs = nil
}
Expand Down Expand Up @@ -603,6 +607,10 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) {

// Quick exit if added resource is already sent to client
if sub.IsSent() {
// We increase the indirectsent references, otherwise increased
// when calling sub.GetRPCResources, since we have no new
// resources to populate.
sub.indirectsent++
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.AddEvent{Idx: idx, Value: v.RawMessage}))
return
}
Expand Down Expand Up @@ -722,7 +730,7 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) {
// Legacy behavior
if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue {
for _, sub := range subs {
sub.populateResourcesLegacy(r)
sub.populateResourcesLegacy(r, true)
}
s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed), Resources: r}))
} else {
Expand Down
19 changes: 19 additions & 0 deletions test/08collection_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,22 @@ func TestAddRemoveEventsOnCachedCollection(t *testing.T) {
}
}
}

// Test add event with resource reference
func TestCollectionEvent_AddEventWithResourceReference_IncludesResource(t *testing.T) {
runTest(t, func(s *Session) {
model := resourceData("test.model")

c := s.Connect()
subscribeToTestCollection(t, s, c)

// Send add event on collection
s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":3,"value":{"rid":"test.model"}}`))

// Handle model get request
s.GetRequest(t).AssertSubject(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))

// Validate client event
c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":3,"value":{"rid":"test.model"},"models":{"test.model":`+model+`}}`))
})
}
232 changes: 232 additions & 0 deletions test/30indirectsent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Tests for the cases when a directly or indirectly subscribed resource has
// been sent to the client, and later is getting unsubscribe at the same time
// that another indirect reference is added but send to the client after the
// unsubscription.
//
// See: https://github.com/resgateio/resgate/issues/241
package test

import (
"encoding/json"
"testing"
)

// testIndirectSentSubscribeTypes contains different ways to subscribe and
// unsubscribe to the resource test.model, directly or indirectly.
var testIndirectSentSubscribeTypes = []struct {
Name string
Subscribe func(t *testing.T, s *Session, c *Conn)
Unsubscribe func(t *testing.T, s *Session, c *Conn)
}{
{
"with directly unsubscribed resource",
func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModel(t, s, c)
},
func(t *testing.T, s *Session, c *Conn) {
c.Request("unsubscribe.test.model", nil).GetResponse(t)
},
},
{
"with indirectly unsubscribed resource",
func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModelParent(t, s, c, false)
},
func(t *testing.T, s *Session, c *Conn) {
c.Request("unsubscribe.test.model.parent", nil).GetResponse(t)
},
},
{
"with change event indirectly unsubscribed resource",
func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModelParent(t, s, c, false)
},
func(t *testing.T, s *Session, c *Conn) {
// Send change event removing the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`))
// Consume client change event.
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`))
},
},
{
"with directly subscribe, indirectly subscribe with change event, directly unsubscribe, indirectly unsubscribe with change event",
func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModel(t, s, c)
subscribeToCustomResource(t, s, c, "test.model.parent", resource{typeModel, `{"name":"parent","child":null}`, nil})
// Send change event adding the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`))
// Consome client change event
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`))
},
func(t *testing.T, s *Session, c *Conn) {
c.Request("unsubscribe.test.model", nil).GetResponse(t)
// Send change event removing the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`))
// Consume client change event.
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`))
},
},
{
"with change event indirectly subscribing and unsubscribing resource",
func(t *testing.T, s *Session, c *Conn) {
model := resourceData("test.model")
subscribeToCustomResource(t, s, c, "test.model.parent", resource{typeModel, `{"name":"parent","child":null}`, nil})
// Send change event adding the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`))
// Handle get request
s.GetRequest(t).AssertSubject(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`))
// Consome client change event
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}},"models":{"test.model":`+model+`}}`))
},
func(t *testing.T, s *Session, c *Conn) {
// Send change event removing the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`))
// Consume client change event.
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`))
},
},
{
"with directly unsubscribed resource after removing an indirect reference using change event",
func(t *testing.T, s *Session, c *Conn) {
subscribeToTestModel(t, s, c)
subscribeToTestModelParent(t, s, c, true)
},
func(t *testing.T, s *Session, c *Conn) {
// Send change event removing the indirectly subscribed resource.
s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`))
// Consume client change event.
c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`))
c.Request("unsubscribe.test.model", nil).GetResponse(t)
},
},
}

func TestIndirectSent_SubscriptionReference_ContainsResource(t *testing.T) {
for _, sentSubscribeType := range testIndirectSentSubscribeTypes {
runNamedTest(t, sentSubscribeType.Name, func(s *Session) {

model := resourceData("test.model")
modelDelayed := `{"name":"delayed"}`
modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}`

c := s.Connect()
// Subscribe to test.model
sentSubscribeType.Subscribe(t, s, c)

// Get parent model
creq := c.Request("subscribe.test.model.delayedparent", nil)

// Handle parent get and access request.
mreqs := s.GetParallelRequests(t, 2)
mreqs.GetRequest(t, "access.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"get":true}`))
mreqs.GetRequest(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`))
// Delay response to get request of referenced test.model.delayed
mreqsecond := s.GetRequest(t)

// Call unsubscribe on test.model
sentSubscribeType.Unsubscribe(t, s, c)

// Repond to parent get request
mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`))

// Get client response, which should include test.model.
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`))
})
}
}

func TestIndirectSent_ChangeEventReference_ContainsResource(t *testing.T) {
for _, sentSubscribeType := range testIndirectSentSubscribeTypes {
runNamedTest(t, sentSubscribeType.Name, func(s *Session) {
model := resourceData("test.model")
modelDelayed := `{"name":"delayed"}`
modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}`
modelDelayedGrandParent := `{"name":"delayedgrandparent","parent":null}`

c := s.Connect()
// Subscribe to test.model
sentSubscribeType.Subscribe(t, s, c)

// Get grandparent model
creq := c.Request("subscribe.test.model.delayedgrandparent", nil)

// Handle parent get and access request.
mreqs := s.GetParallelRequests(t, 2)
mreqs.GetRequest(t, "access.test.model.delayedgrandparent").RespondSuccess(json.RawMessage(`{"get":true}`))
mreqs.GetRequest(t, "get.test.model.delayedgrandparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedGrandParent + `}`))

// Get client response.
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model.delayedgrandparent":`+modelDelayedGrandParent+`}}`))

// Send change event, adding parent model reference to delayedgrandparent.
s.ResourceEvent("test.model.delayedgrandparent", "change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.delayedparent"}}}`))

// Handle parent get request
s.GetRequest(t).AssertSubject(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`))

// Delay response to get request of referenced test.model.delayed
mreqsecond := s.GetRequest(t)

// Call unsubscribe on test.model
sentSubscribeType.Unsubscribe(t, s, c)

// Repond to parent get request
mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`))

// Get change event, which should include test.model
c.GetEvent(t).Equals(t, "test.model.delayedgrandparent.change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.delayedparent"}},"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`))

// Send event on parent and validate client event
s.ResourceEvent("test.model.delayedparent", "custom", common.CustomEvent())
c.GetEvent(t).Equals(t, "test.model.delayedparent.custom", common.CustomEvent())
})
}
}

func TestIndirectSent_AddEventReference_ContainsResource(t *testing.T) {
for _, sentSubscribeType := range testIndirectSentSubscribeTypes {
runNamedTest(t, sentSubscribeType.Name, func(s *Session) {
model := resourceData("test.model")
modelDelayed := `{"name":"delayed"}`
modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}`
collectionDelayedGrandParent := `["delayedgrandparent"]`

c := s.Connect()
// Subscribe to test.model
sentSubscribeType.Subscribe(t, s, c)

// Get grandparent model
creq := c.Request("subscribe.test.collection.delayedgrandparent", nil)

// Handle parent get and access request.
mreqs := s.GetParallelRequests(t, 2)
mreqs.GetRequest(t, "access.test.collection.delayedgrandparent").RespondSuccess(json.RawMessage(`{"get":true}`))
mreqs.GetRequest(t, "get.test.collection.delayedgrandparent").RespondSuccess(json.RawMessage(`{"collection":` + collectionDelayedGrandParent + `}`))

// Get client response.
creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"collections":{"test.collection.delayedgrandparent":`+collectionDelayedGrandParent+`}}`))

// Send add event, adding parent model reference to delayedgrandparent.
s.ResourceEvent("test.collection.delayedgrandparent", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.delayedparent"}}`))

// Handle parent get request
s.GetRequest(t).AssertSubject(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`))

// Delay response to get request of referenced test.model.delayed
mreqsecond := s.GetRequest(t)

// Call unsubscribe on test.model
sentSubscribeType.Unsubscribe(t, s, c)

// Repond to parent get request
mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`))

// Get add event, which should include test.model
c.GetEvent(t).Equals(t, "test.collection.delayedgrandparent.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.delayedparent"},"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`))

// Send event on parent and validate client event
s.ResourceEvent("test.model.delayedparent", "custom", common.CustomEvent())
c.GetEvent(t).Equals(t, "test.model.delayedparent.custom", common.CustomEvent())
})
}
}
Loading

0 comments on commit 0694f5a

Please sign in to comment.