diff --git a/integration/connector_test.go b/integration/connector_test.go index 580200a..6a9e61e 100644 --- a/integration/connector_test.go +++ b/integration/connector_test.go @@ -10,7 +10,7 @@ // // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -//go:build integration +//+go:build integration package integration @@ -267,9 +267,22 @@ func (suite *ConnectorSuite) TestCommand() { require.NoError(suite.T(), err, "unable to send command to the backend via websocket") timeout := MillisToDuration(suite.cfg.EventTimeoutMs) - responseSentResult := WaitSubscribeResult(timeout, commandResponseCh, func() {}) - require.Equal(suite.T(), ProcessWSMessageResult{true, nil}, responseSentResult, "command should be received and response should be sent") + // Check the sending of the response from the feature to the backend + var responseSentResult ProcessWSMessageResult + select { + case result := <-commandResponseCh: + responseSentResult = result + case <-time.After(timeout): + responseSentResult.Err = errors.New("timeout") + } + require.Equal( + suite.T(), + ProcessWSMessageResult{true, nil}, + responseSentResult, + "command should be received and response should be sent") + + // Check the response from the feature to the backend result := ProcessWSMessages(timeout, ws, func(respMsg *protocol.Envelope) ProcessWSMessageResult { expectedPath := strings.ReplaceAll(cmdMsgEnvelope.Path, "inbox", "outbox") if err := checkEqual(expectedPath, respMsg.Path, "path"); err != nil { @@ -411,27 +424,6 @@ func ProcessWSMessages( return result } -func Subscribe( - timeout time.Duration, - ws *websocket.Conn, - process func(*protocol.Envelope) ProcessWSMessageResult) chan ProcessWSMessageResult { - responseCh := make(chan ProcessWSMessageResult) - go func() { - responseCh <- ProcessWSMessages(timeout, ws, process) - }() - return responseCh -} - -func WaitSubscribeResult(timeout time.Duration, resultCh chan ProcessWSMessageResult, closer func()) ProcessWSMessageResult { - select { - case result := <-resultCh: - return result - case <-time.After(timeout): - closer() - return ProcessWSMessageResult{false, errors.New("timeout")} - } -} - func (suite *ConnectorSuite) newWSConnection() (*websocket.Conn, error) { wsAddress, err := asWSAddress(suite.cfg.DigitalTwinAPIAddress) if err != nil {