diff --git a/server/subscription.go b/server/subscription.go index f383eff..9cabb72 100644 --- a/server/subscription.go +++ b/server/subscription.go @@ -271,7 +271,7 @@ func (s *Subscription) onLoaded(rcb *readyCallback) { func (s *Subscription) GetRPCResources(indirect bool) *rpc.Resources { r := &rpc.Resources{} if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue { - s.populateResourcesLegacy(r) + s.populateResourcesLegacy(r, indirect) } else { s.populateResources(r, indirect) } @@ -373,7 +373,10 @@ func (s *Subscription) populateResources(r *rpc.Resources, indirect bool) { // populateResourcesLegacy is the same as populateResources, but uses legacy // encodings of resources. -func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) { +func (s *Subscription) populateResourcesLegacy(r *rpc.Resources, indirect bool) { + if indirect { + s.indirectsent++ + } // Quick exit if resource is already sent if s.state == stateSent || s.state == stateToSend { return @@ -409,7 +412,7 @@ func (s *Subscription) populateResourcesLegacy(r *rpc.Resources) { s.state = stateToSend for _, sc := range s.refs { - sc.sub.populateResourcesLegacy(r) + sc.sub.populateResourcesLegacy(r, true) } } @@ -498,8 +501,9 @@ func containsString(path []string, rid string) bool { } func (s *Subscription) unsubscribeRefs() { + sent := s.IsSent() for _, ref := range s.refs { - s.c.Unsubscribe(ref.sub, false, s.IsSent(), 1, false) + s.c.Unsubscribe(ref.sub, false, sent, 1, false) } s.refs = nil } @@ -603,6 +607,10 @@ func (s *Subscription) processCollectionEvent(event *rescache.ResourceEvent) { // Quick exit if added resource is already sent to client if sub.IsSent() { + // We increase the indirectsent references, otherwise increased + // when calling sub.GetRPCResources, since we have no new + // resources to populate. + sub.indirectsent++ s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.AddEvent{Idx: idx, Value: v.RawMessage})) return } @@ -722,7 +730,7 @@ func (s *Subscription) processModelEvent(event *rescache.ResourceEvent) { // Legacy behavior if s.c.ProtocolVersion() < versionSoftResourceReferenceAndDataValue { for _, sub := range subs { - sub.populateResourcesLegacy(r) + sub.populateResourcesLegacy(r, true) } s.c.Send(rpc.NewEvent(s.rid, event.Event, rpc.ChangeEvent{Values: rescache.Legacy120ValueMap(event.Changed), Resources: r})) } else { diff --git a/test/08collection_event_test.go b/test/08collection_event_test.go index cec835f..8e9fe41 100644 --- a/test/08collection_event_test.go +++ b/test/08collection_event_test.go @@ -81,3 +81,22 @@ func TestAddRemoveEventsOnCachedCollection(t *testing.T) { } } } + +// Test add event with resource reference +func TestCollectionEvent_AddEventWithResourceReference_IncludesResource(t *testing.T) { + runTest(t, func(s *Session) { + model := resourceData("test.model") + + c := s.Connect() + subscribeToTestCollection(t, s, c) + + // Send add event on collection + s.ResourceEvent("test.collection", "add", json.RawMessage(`{"idx":3,"value":{"rid":"test.model"}}`)) + + // Handle model get request + s.GetRequest(t).AssertSubject(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`)) + + // Validate client event + c.GetEvent(t).Equals(t, "test.collection.add", json.RawMessage(`{"idx":3,"value":{"rid":"test.model"},"models":{"test.model":`+model+`}}`)) + }) +} diff --git a/test/30indirectsent_test.go b/test/30indirectsent_test.go new file mode 100644 index 0000000..2993cb1 --- /dev/null +++ b/test/30indirectsent_test.go @@ -0,0 +1,232 @@ +// Tests for the cases when a directly or indirectly subscribed resource has +// been sent to the client, and later is getting unsubscribe at the same time +// that another indirect reference is added but send to the client after the +// unsubscription. +// +// See: https://github.com/resgateio/resgate/issues/241 +package test + +import ( + "encoding/json" + "testing" +) + +// testIndirectSentSubscribeTypes contains different ways to subscribe and +// unsubscribe to the resource test.model, directly or indirectly. +var testIndirectSentSubscribeTypes = []struct { + Name string + Subscribe func(t *testing.T, s *Session, c *Conn) + Unsubscribe func(t *testing.T, s *Session, c *Conn) +}{ + { + "with directly unsubscribed resource", + func(t *testing.T, s *Session, c *Conn) { + subscribeToTestModel(t, s, c) + }, + func(t *testing.T, s *Session, c *Conn) { + c.Request("unsubscribe.test.model", nil).GetResponse(t) + }, + }, + { + "with indirectly unsubscribed resource", + func(t *testing.T, s *Session, c *Conn) { + subscribeToTestModelParent(t, s, c, false) + }, + func(t *testing.T, s *Session, c *Conn) { + c.Request("unsubscribe.test.model.parent", nil).GetResponse(t) + }, + }, + { + "with change event indirectly unsubscribed resource", + func(t *testing.T, s *Session, c *Conn) { + subscribeToTestModelParent(t, s, c, false) + }, + func(t *testing.T, s *Session, c *Conn) { + // Send change event removing the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`)) + // Consume client change event. + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`)) + }, + }, + { + "with directly subscribe, indirectly subscribe with change event, directly unsubscribe, indirectly unsubscribe with change event", + func(t *testing.T, s *Session, c *Conn) { + subscribeToTestModel(t, s, c) + subscribeToCustomResource(t, s, c, "test.model.parent", resource{typeModel, `{"name":"parent","child":null}`, nil}) + // Send change event adding the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`)) + // Consome client change event + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`)) + }, + func(t *testing.T, s *Session, c *Conn) { + c.Request("unsubscribe.test.model", nil).GetResponse(t) + // Send change event removing the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`)) + // Consume client change event. + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`)) + }, + }, + { + "with change event indirectly subscribing and unsubscribing resource", + func(t *testing.T, s *Session, c *Conn) { + model := resourceData("test.model") + subscribeToCustomResource(t, s, c, "test.model.parent", resource{typeModel, `{"name":"parent","child":null}`, nil}) + // Send change event adding the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}}}`)) + // Handle get request + s.GetRequest(t).AssertSubject(t, "get.test.model").RespondSuccess(json.RawMessage(`{"model":` + model + `}`)) + // Consome client change event + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":{"rid":"test.model"}},"models":{"test.model":`+model+`}}`)) + }, + func(t *testing.T, s *Session, c *Conn) { + // Send change event removing the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`)) + // Consume client change event. + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`)) + }, + }, + { + "with directly unsubscribed resource after removing an indirect reference using change event", + func(t *testing.T, s *Session, c *Conn) { + subscribeToTestModel(t, s, c) + subscribeToTestModelParent(t, s, c, true) + }, + func(t *testing.T, s *Session, c *Conn) { + // Send change event removing the indirectly subscribed resource. + s.ResourceEvent("test.model.parent", "change", json.RawMessage(`{"values":{"child":null}}`)) + // Consume client change event. + c.GetEvent(t).Equals(t, "test.model.parent.change", json.RawMessage(`{"values":{"child":null}}`)) + c.Request("unsubscribe.test.model", nil).GetResponse(t) + }, + }, +} + +func TestIndirectSent_SubscriptionReference_ContainsResource(t *testing.T) { + for _, sentSubscribeType := range testIndirectSentSubscribeTypes { + runNamedTest(t, sentSubscribeType.Name, func(s *Session) { + + model := resourceData("test.model") + modelDelayed := `{"name":"delayed"}` + modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` + + c := s.Connect() + // Subscribe to test.model + sentSubscribeType.Subscribe(t, s, c) + + // Get parent model + creq := c.Request("subscribe.test.model.delayedparent", nil) + + // Handle parent get and access request. + mreqs := s.GetParallelRequests(t, 2) + mreqs.GetRequest(t, "access.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"get":true}`)) + mreqs.GetRequest(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`)) + // Delay response to get request of referenced test.model.delayed + mreqsecond := s.GetRequest(t) + + // Call unsubscribe on test.model + sentSubscribeType.Unsubscribe(t, s, c) + + // Repond to parent get request + mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) + + // Get client response, which should include test.model. + creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`)) + }) + } +} + +func TestIndirectSent_ChangeEventReference_ContainsResource(t *testing.T) { + for _, sentSubscribeType := range testIndirectSentSubscribeTypes { + runNamedTest(t, sentSubscribeType.Name, func(s *Session) { + model := resourceData("test.model") + modelDelayed := `{"name":"delayed"}` + modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` + modelDelayedGrandParent := `{"name":"delayedgrandparent","parent":null}` + + c := s.Connect() + // Subscribe to test.model + sentSubscribeType.Subscribe(t, s, c) + + // Get grandparent model + creq := c.Request("subscribe.test.model.delayedgrandparent", nil) + + // Handle parent get and access request. + mreqs := s.GetParallelRequests(t, 2) + mreqs.GetRequest(t, "access.test.model.delayedgrandparent").RespondSuccess(json.RawMessage(`{"get":true}`)) + mreqs.GetRequest(t, "get.test.model.delayedgrandparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedGrandParent + `}`)) + + // Get client response. + creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model.delayedgrandparent":`+modelDelayedGrandParent+`}}`)) + + // Send change event, adding parent model reference to delayedgrandparent. + s.ResourceEvent("test.model.delayedgrandparent", "change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.delayedparent"}}}`)) + + // Handle parent get request + s.GetRequest(t).AssertSubject(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`)) + + // Delay response to get request of referenced test.model.delayed + mreqsecond := s.GetRequest(t) + + // Call unsubscribe on test.model + sentSubscribeType.Unsubscribe(t, s, c) + + // Repond to parent get request + mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) + + // Get change event, which should include test.model + c.GetEvent(t).Equals(t, "test.model.delayedgrandparent.change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.delayedparent"}},"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`)) + + // Send event on parent and validate client event + s.ResourceEvent("test.model.delayedparent", "custom", common.CustomEvent()) + c.GetEvent(t).Equals(t, "test.model.delayedparent.custom", common.CustomEvent()) + }) + } +} + +func TestIndirectSent_AddEventReference_ContainsResource(t *testing.T) { + for _, sentSubscribeType := range testIndirectSentSubscribeTypes { + runNamedTest(t, sentSubscribeType.Name, func(s *Session) { + model := resourceData("test.model") + modelDelayed := `{"name":"delayed"}` + modelDelayedParent := `{"name":"delayedparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` + collectionDelayedGrandParent := `["delayedgrandparent"]` + + c := s.Connect() + // Subscribe to test.model + sentSubscribeType.Subscribe(t, s, c) + + // Get grandparent model + creq := c.Request("subscribe.test.collection.delayedgrandparent", nil) + + // Handle parent get and access request. + mreqs := s.GetParallelRequests(t, 2) + mreqs.GetRequest(t, "access.test.collection.delayedgrandparent").RespondSuccess(json.RawMessage(`{"get":true}`)) + mreqs.GetRequest(t, "get.test.collection.delayedgrandparent").RespondSuccess(json.RawMessage(`{"collection":` + collectionDelayedGrandParent + `}`)) + + // Get client response. + creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"collections":{"test.collection.delayedgrandparent":`+collectionDelayedGrandParent+`}}`)) + + // Send add event, adding parent model reference to delayedgrandparent. + s.ResourceEvent("test.collection.delayedgrandparent", "add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.delayedparent"}}`)) + + // Handle parent get request + s.GetRequest(t).AssertSubject(t, "get.test.model.delayedparent").RespondSuccess(json.RawMessage(`{"model":` + modelDelayedParent + `}`)) + + // Delay response to get request of referenced test.model.delayed + mreqsecond := s.GetRequest(t) + + // Call unsubscribe on test.model + sentSubscribeType.Unsubscribe(t, s, c) + + // Repond to parent get request + mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) + + // Get add event, which should include test.model + c.GetEvent(t).Equals(t, "test.collection.delayedgrandparent.add", json.RawMessage(`{"idx":1,"value":{"rid":"test.model.delayedparent"},"models":{"test.model":`+model+`,"test.model.delayedparent":`+modelDelayedParent+`,"test.model.delayed":`+modelDelayed+`}}`)) + + // Send event on parent and validate client event + s.ResourceEvent("test.model.delayedparent", "custom", common.CustomEvent()) + c.GetEvent(t).Equals(t, "test.model.delayedparent.custom", common.CustomEvent()) + }) + } +} diff --git a/test/30refsent_test.go b/test/30refsent_test.go deleted file mode 100644 index 109f1c6..0000000 --- a/test/30refsent_test.go +++ /dev/null @@ -1,117 +0,0 @@ -// Tests for the cases when a directly or indirectly subscribed resource has -// been sent to the client, and later is getting unsubscribe at the same time -// that another indirect reference is added but send to the client after the -// unsubscription. -// -// See: https://github.com/resgateio/resgate/issues/241 -package test - -import ( - "encoding/json" - "testing" -) - -func TestRefSent_SubscriptionReferenceDirectlyUnsubscribedBeforeReady_ContainsResource(t *testing.T) { - runTest(t, func(s *Session) { - model := resourceData("test.model") - modelDelayed := `{"foo":"modelDelayed"}` - modelParent := `{"name":"parent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` - - c := s.Connect() - subscribeToTestModel(t, s, c) - - // Get parent model - creq := c.Request("subscribe.test.model.parent", nil) - - // Handle parent get and access request. - mreqs := s.GetParallelRequests(t, 2) - mreqs.GetRequest(t, "access.test.model.parent").RespondSuccess(json.RawMessage(`{"get":true}`)) - mreqs.GetRequest(t, "get.test.model.parent").RespondSuccess(json.RawMessage(`{"model":` + modelParent + `}`)) - // Delay response to get request of referenced test.model.delayed - mreqsecond := s.GetRequest(t) - - // Call unsubscribe on (first) parent - c.Request("unsubscribe.test.model", nil).GetResponse(t) - - // Repond to parent get request - mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) - - // Get client response, which should include test.model. - creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`,"test.model.parent":`+modelParent+`,"test.model.delayed":`+modelDelayed+`}}`)) - }) -} - -func TestRefSent_SubscriptionReferenceIndirectlyUnsubscribedBeforeReady_ContainsResource(t *testing.T) { - runTest(t, func(s *Session) { - model := resourceData("test.model") - modelDelayed := `{"foo":"modelDelayed"}` - modelSecondParent := `{"name":"secondparent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` - - c := s.Connect() - subscribeToTestModelParent(t, s, c, false) - - // Get secondparent model - creq := c.Request("subscribe.test.model.secondparent", nil) - - // Handle secondparent get and access request. - mreqs := s.GetParallelRequests(t, 2) - mreqs.GetRequest(t, "access.test.model.secondparent").RespondSuccess(json.RawMessage(`{"get":true}`)) - mreqs.GetRequest(t, "get.test.model.secondparent").RespondSuccess(json.RawMessage(`{"model":` + modelSecondParent + `}`)) - // Delay response to get request of referenced test.model.delayed - mreqsecond := s.GetRequest(t) - - // Call unsubscribe on (first) parent - c.Request("unsubscribe.test.model.parent", nil).GetResponse(t) - - // Repond to secondparent get request - mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) - - // Get client response, which should include test.model. - creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model":`+model+`,"test.model.secondparent":`+modelSecondParent+`,"test.model.delayed":`+modelDelayed+`}}`)) - }) -} - -func TestRefSent_ChangeEventReferenceDirectlyUnsubscribedBeforeReady_ContainsResource(t *testing.T) { - runTest(t, func(s *Session) { - model := resourceData("test.model") - modelDelayed := `{"foo":"modelDelayed"}` - modelParent := `{"name":"parent","child":{"rid":"test.model"},"delayed":{"rid":"test.model.delayed"}}` - modelGrandParent := `{"name":"grandparent","parent":null}` - - c := s.Connect() - subscribeToTestModel(t, s, c) - - // Get grandparent model - creq := c.Request("subscribe.test.model.grandparent", nil) - - // Handle parent get and access request. - mreqs := s.GetParallelRequests(t, 2) - mreqs.GetRequest(t, "access.test.model.grandparent").RespondSuccess(json.RawMessage(`{"get":true}`)) - mreqs.GetRequest(t, "get.test.model.grandparent").RespondSuccess(json.RawMessage(`{"model":` + modelGrandParent + `}`)) - - // Get client response. - creq.GetResponse(t).AssertResult(t, json.RawMessage(`{"models":{"test.model.grandparent":`+modelGrandParent+`}}`)) - - // Send change event, adding parent model reference to grandparent. - s.ResourceEvent("test.model.grandparent", "change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.parent"}}}`)) - - // Handle parent get request - s.GetRequest(t).AssertSubject(t, "get.test.model.parent").RespondSuccess(json.RawMessage(`{"model":` + modelParent + `}`)) - - // Delay response to get request of referenced test.model.delayed - mreqsecond := s.GetRequest(t) - - // Call unsubscribe on (first) parent - c.Request("unsubscribe.test.model", nil).GetResponse(t) - - // Repond to parent get request - mreqsecond.RespondSuccess(json.RawMessage(`{"model":` + modelDelayed + `}`)) - - // Get change event, which should include test.model - c.GetEvent(t).Equals(t, "test.model.grandparent.change", json.RawMessage(`{"values":{"parent":{"rid":"test.model.parent"}},"models":{"test.model":`+model+`,"test.model.parent":`+modelParent+`,"test.model.delayed":`+modelDelayed+`}}`)) - - // Send event on parent and validate client event - s.ResourceEvent("test.model.parent", "custom", common.CustomEvent()) - c.GetEvent(t).Equals(t, "test.model.parent.custom", common.CustomEvent()) - }) -}