Skip to content

Commit

Permalink
[eclipse-kanto#44] Fix issues in ProcessMessages and rename to Proces…
Browse files Browse the repository at this point in the history
…sWSMessages

Signed-off-by: Dimiter Georgiev <dimiter.georgiev@bosch.io>
  • Loading branch information
Dimiter Georgiev authored and e-grigorov committed Nov 25, 2022
1 parent 8124d0b commit 890ce25
Showing 1 changed file with 66 additions and 86 deletions.
152 changes: 66 additions & 86 deletions integration/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (suite *ConnectorSuite) TestConnectionStatus() {
}
firstTime = false
statusURL := fmt.Sprintf("%s/features/ConnectionStatus/properties/status", suite.thingURL)
body, err := suite.doRequest("GET", statusURL)
body, err := suite.doRequest(http.MethodGet, statusURL)
if err != nil {
if time.Now().Before(threshold) {
continue
Expand Down Expand Up @@ -216,13 +216,13 @@ func (suite *ConnectorSuite) TestCommand() {
require.NoError(suite.T(), err, "cannot create a websocket connection to the backend")
defer ws.Close()

commandResponseCh := make(chan SubscribeResult)
commandResponseCh := make(chan ProcessWSMessageResult)

dittoHandler := func(requestID string, msg *protocol.Envelope) {
if msg.Path == fmt.Sprintf("/features/%s/inbox/messages/%s", featureID, commandName) {
value, ok := msg.Value.(string)
if !ok {
commandResponseCh <- SubscribeResult{false, fmt.Errorf("unexpected message payload: %v, %T", msg.Value, msg.Value)}
commandResponseCh <- ProcessWSMessageResult{false, fmt.Errorf("unexpected message payload: %v, %T", msg.Value, msg.Value)}
return
}
responsePayload := fmt.Sprintf(responsePayloadTemplate, value)
Expand All @@ -238,10 +238,10 @@ func (suite *ConnectorSuite) TestCommand() {
WithValue(responsePayload).
WithStatus(http.StatusOK)
if err := suite.dittoClient.Reply(requestID, responseMsg); err != nil {
commandResponseCh <- SubscribeResult{false, fmt.Errorf("failed to send response: %v", err)}
commandResponseCh <- ProcessWSMessageResult{false, fmt.Errorf("failed to send response: %v", err)}
return
}
commandResponseCh <- SubscribeResult{true, nil}
commandResponseCh <- ProcessWSMessageResult{true, nil}
}
}
suite.dittoClient.Subscribe(dittoHandler)
Expand All @@ -263,32 +263,30 @@ func (suite *ConnectorSuite) TestCommand() {
return nil
}

timeout := time.Duration(suite.cfg.EventTimeoutMs * int(time.Millisecond))
respCh := Subscribe(timeout, ws, func(respMsg *protocol.Envelope) (bool, error) {
err = websocket.JSON.Send(ws, cmdMsgEnvelope)
require.NoError(suite.T(), err, "unable to send command to the backend via websocket")

timeout := MillisToDuration(suite.cfg.EventTimeoutMs)
responseSentResult := WaitSubscribeResult(timeout, commandResponseCh, func() {})
require.Equal(suite.T(), ProcessWSMessageResult{true, nil}, responseSentResult, "command should be received and response should be sent")

result := ProcessWSMessages(timeout, ws, func(respMsg *protocol.Envelope) ProcessWSMessageResult {
expectedPath := strings.ReplaceAll(cmdMsgEnvelope.Path, "inbox", "outbox")
if err := checkEqual(expectedPath, respMsg.Path, "path"); err != nil {
return false, err
return ProcessWSMessageResult{false, err}
}
if err := checkEqual(*cmdMsgEnvelope.Topic, *respMsg.Topic, "topic"); err != nil {
return false, err
return ProcessWSMessageResult{false, err}
}
if err := checkEqual(respMsg.Status, http.StatusOK, "http ok"); err != nil {
return false, err
return ProcessWSMessageResult{false, err}
}
if err := checkEqual(fmt.Sprintf(responsePayloadTemplate, cmdMsgEnvelope.Value.(string)), respMsg.Value, "response payload"); err != nil {
return false, err
return ProcessWSMessageResult{false, err}
}
return true, nil
return ProcessWSMessageResult{true, nil}
})

err = websocket.JSON.Send(ws, cmdMsgEnvelope)
require.NoError(suite.T(), err, "unable to send command to the backend via websocket")

responseSentResult := WaitSubscribeResult(timeout, commandResponseCh, func() {})
require.Equal(suite.T(), SubscribeResult{true, nil}, responseSentResult, "command should be received and response should be sent")

subscribeResult := WaitSubscribeResult(timeout, respCh, func() { ws.Close() })
require.Equal(suite.T(), SubscribeResult{true, nil}, subscribeResult, "command response should be received")
require.Equal(suite.T(), ProcessWSMessageResult{true, nil}, result, "command response should be received")
}

func (suite *ConnectorSuite) TestEvent() {
Expand Down Expand Up @@ -319,25 +317,23 @@ func (suite *ConnectorSuite) testModify(channel string, newValue string) {

msg := cmd.Envelope(protocol.WithResponseRequired(false))

eventCh := Subscribe(timeout, ws, func(msg *protocol.Envelope) (bool, error) {
err = suite.sendDittoEvent(channel, msg)

require.NoError(suite.T(), err, "unable to send event to the backend")

result := ProcessWSMessages(timeout, ws, func(msg *protocol.Envelope) ProcessWSMessageResult {
suite.T().Logf("event received: %v", msg)
if msg.Topic.String() == fmt.Sprintf("%s/%s/things/twin/events/modified", namespace.Namespace, namespace.Name) &&
msg.Path == fmt.Sprintf("/features/%s/properties/%s", featureID, propertyName) &&
msg.Value == newValue {
return true, nil
return ProcessWSMessageResult{true, nil}
}
return false, fmt.Errorf("unexpected value: %s", msg.Value)
return ProcessWSMessageResult{false, fmt.Errorf("unexpected value: %s", msg.Value)}
})

err = suite.sendDittoEvent(channel, msg)

require.NoError(suite.T(), err, "unable to send event to the backend")

subscribeResult := WaitSubscribeResult(timeout, eventCh, func() { ws.Close() })
require.Equal(suite.T(), SubscribeResult{true, nil}, subscribeResult, "property changed event should be received")
require.Equal(suite.T(), ProcessWSMessageResult{true, nil}, result, "property changed event should be received")

propertyURL := fmt.Sprintf("%s/properties/%s", suite.featureURL, propertyName)
body, err := suite.doRequest("GET", propertyURL)
body, err := suite.doRequest(http.MethodGet, propertyURL)
require.NoError(suite.T(), err, "unable to get feature property")

assert.Equal(suite.T(), fmt.Sprintf("\"%s\"", newValue), strings.TrimSpace(string(body)), "property value updated")
Expand All @@ -364,91 +360,75 @@ func WaitForAck(
var payload []byte
deadline := time.Now().Add(timeout)
ws.SetDeadline(deadline)
var err error

for time.Now().Before(deadline) {
err = websocket.Message.Receive(ws, &payload)
if err == nil {
ack := strings.TrimSpace(string(payload))
if ack == expectedAck {
return nil
}
err := websocket.Message.Receive(ws, &payload)
if err != nil {
return fmt.Errorf("error reading from websocket: %s", err)
}
ack := strings.TrimSpace(string(payload))
if ack == expectedAck {
return nil
}
}

if err != nil {
return err
}
return errors.New("timeout")
}

func ProcessMessages(
type ProcessWSMessageResult struct {
Finished bool
Err error
}

func ProcessWSMessages(
timeout time.Duration,
ws *websocket.Conn,
process func(*protocol.Envelope) (bool, error)) (bool, error) {

var payload []byte
process func(*protocol.Envelope) ProcessWSMessageResult) ProcessWSMessageResult {
deadline := time.Now().Add(timeout)
ws.SetDeadline(deadline)

var err error
var stop bool
for !stop && time.Now().Before(deadline) {
err = websocket.Message.Receive(ws, &payload)
var envelope *protocol.Envelope
if err == nil {
envelope = &protocol.Envelope{}
err = json.Unmarshal(payload, envelope)
if err == nil {
stop, err = process(envelope)
} else {
// Unmarshalling error, the payload is not a JSON of protocol.Envelope
// Ignore the error
fmt.Fprintf(os.Stderr, "error unmarshalling a protocol.Envelope: %v", err)
err = nil
}
result := ProcessWSMessageResult{}
for !result.Finished && time.Now().Before(deadline) {
var payload []byte
webSocketErr := websocket.Message.Receive(ws, &payload)
if webSocketErr != nil {
result.Err = fmt.Errorf("error reading from websocket: %s", webSocketErr)
return result
}
if err != nil {
return false, err
envelope := &protocol.Envelope{}
unmarshalErr := json.Unmarshal(payload, envelope)
if unmarshalErr == nil {
result = process(envelope)
} else {
// Unmarshalling error, the payload is not a JSON of protocol.Envelope
// Ignore the error
fmt.Fprintf(os.Stderr, "error unmarshalling a protocol.Envelope: %v", unmarshalErr)
}
}

if stop {
return true, nil
if !result.Finished {
result.Err = fmt.Errorf("not finished, expected WS response not received in %v, last error: %v", timeout, result.Err)
}
return false, fmt.Errorf("WS response not received in %v, last error: %v", timeout, err)
}

type SubscribeResult struct {
stopped bool
err error
return result
}

func Subscribe(
timeout time.Duration,
ws *websocket.Conn,
process func(*protocol.Envelope) (bool, error)) chan SubscribeResult {
responseCh := make(chan SubscribeResult)
process func(*protocol.Envelope) ProcessWSMessageResult) chan ProcessWSMessageResult {
responseCh := make(chan ProcessWSMessageResult)
go func() {
stopped, err := ProcessMessages(timeout, ws, process)
responseCh <- SubscribeResult{
stopped: stopped,
err: err,
}
responseCh <- ProcessWSMessages(timeout, ws, process)
}()

return responseCh
}

func WaitSubscribeResult(timeout time.Duration, resultCh chan SubscribeResult, closer func()) SubscribeResult {
func WaitSubscribeResult(timeout time.Duration, resultCh chan ProcessWSMessageResult, closer func()) ProcessWSMessageResult {
select {
case result := <-resultCh:
return result
case <-time.After(timeout):
return SubscribeResult{
stopped: false,
err: errors.New("timeout"),
}
closer()
return ProcessWSMessageResult{false, errors.New("timeout")}
}
}

Expand Down

0 comments on commit 890ce25

Please sign in to comment.