Skip to content

Commit

Permalink
[eclipse-kanto#44] Remove the Subscribe and WaitSubscribeResult funct…
Browse files Browse the repository at this point in the history
…ions

Signed-off-by: Dimiter Georgiev <dimiter.georgiev@bosch.io>
  • Loading branch information
Dimiter Georgiev authored and e-grigorov committed Nov 25, 2022
1 parent 890ce25 commit 3695e7a
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions integration/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0

//go:build integration
//+go:build integration

package integration

Expand Down Expand Up @@ -267,9 +267,22 @@ func (suite *ConnectorSuite) TestCommand() {
require.NoError(suite.T(), err, "unable to send command to the backend via websocket")

timeout := MillisToDuration(suite.cfg.EventTimeoutMs)
responseSentResult := WaitSubscribeResult(timeout, commandResponseCh, func() {})
require.Equal(suite.T(), ProcessWSMessageResult{true, nil}, responseSentResult, "command should be received and response should be sent")

// Check the sending of the response from the feature to the backend
var responseSentResult ProcessWSMessageResult
select {
case result := <-commandResponseCh:
responseSentResult = result
case <-time.After(timeout):
responseSentResult.Err = errors.New("timeout")
}
require.Equal(
suite.T(),
ProcessWSMessageResult{true, nil},
responseSentResult,
"command should be received and response should be sent")

// Check the response from the feature to the backend
result := ProcessWSMessages(timeout, ws, func(respMsg *protocol.Envelope) ProcessWSMessageResult {
expectedPath := strings.ReplaceAll(cmdMsgEnvelope.Path, "inbox", "outbox")
if err := checkEqual(expectedPath, respMsg.Path, "path"); err != nil {
Expand Down Expand Up @@ -411,27 +424,6 @@ func ProcessWSMessages(
return result
}

func Subscribe(
timeout time.Duration,
ws *websocket.Conn,
process func(*protocol.Envelope) ProcessWSMessageResult) chan ProcessWSMessageResult {
responseCh := make(chan ProcessWSMessageResult)
go func() {
responseCh <- ProcessWSMessages(timeout, ws, process)
}()
return responseCh
}

func WaitSubscribeResult(timeout time.Duration, resultCh chan ProcessWSMessageResult, closer func()) ProcessWSMessageResult {
select {
case result := <-resultCh:
return result
case <-time.After(timeout):
closer()
return ProcessWSMessageResult{false, errors.New("timeout")}
}
}

func (suite *ConnectorSuite) newWSConnection() (*websocket.Conn, error) {
wsAddress, err := asWSAddress(suite.cfg.DigitalTwinAPIAddress)
if err != nil {
Expand Down

0 comments on commit 3695e7a

Please sign in to comment.