Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes in internal logic of sending packets #124

Merged
merged 40 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5ec1c2f
Fix issue of data recv being interrupted
aggarw13 Dec 3, 2020
4205109
Rename elapsedTimeMs variable to its express its new meaning
aggarw13 Dec 3, 2020
0704c72
Use configuration const for recvExact timeout
aggarw13 Dec 3, 2020
adce766
Remove timeout check from discardPacket and address CI check failures
aggarw13 Dec 3, 2020
f5aa1b3
Fix more CI check failures
aggarw13 Dec 3, 2020
76166d0
Remove another unused local variable
aggarw13 Dec 3, 2020
1d73bb7
Re-instate timeout in discard to reduce scope of changes
aggarw13 Dec 3, 2020
0c5a9e3
Remove unused variable again
aggarw13 Dec 3, 2020
34adc86
Fix failing unit test
aggarw13 Dec 3, 2020
2a3cc9a
Rename new config macro, and attempt to fix CBMC failures
aggarw13 Dec 3, 2020
70b181a
Doc: Improvement suggestions from code review
aggarw13 Dec 3, 2020
68fcb16
Fix quality check failures
aggarw13 Dec 3, 2020
4ba8c53
Add test case to check partial network reads with zero timeout durati…
aggarw13 Dec 3, 2020
b710790
style: Improving naming
aggarw13 Dec 3, 2020
f8586dc
Address complexity failure
aggarw13 Dec 3, 2020
c59d39b
Address comments
aggarw13 Dec 4, 2020
af1ca02
Doc: Add blocking time equation of Receive/ProcessLoop functions in t…
aggarw13 Dec 4, 2020
86798bf
Improvement in API doc
aggarw13 Dec 4, 2020
e1de841
Set MQTT_RECV_POLLING_TIMEOUT_MS so that recvExact runs in one iterat…
sarenameas Dec 4, 2020
9a792bc
Merge branch 'main' into fix/recv-timeout-issue
aggarw13 Dec 4, 2020
ebcfb92
doc: Add information about zero return value for Transport_Recv_t
aggarw13 Dec 4, 2020
fe0a071
Merge branch 'fix/recv-timeout-issue' of https://github.com/aggarw13/…
aggarw13 Dec 4, 2020
32f0291
fix: prevent possibility of infinite loop in timeout logic of Process…
aggarw13 Dec 4, 2020
dd91637
Merge branch 'main' into fix/recv-timeout-issue
aggarw13 Dec 4, 2020
a4bf796
Merge branch 'main' into fix/recv-timeout-issue
aggarw13 Dec 4, 2020
bf6f94d
style: Minor changes
aggarw13 Dec 4, 2020
092e7ff
Merge branch 'fix/recv-timeout-issue' of https://github.com/aggarw13/…
aggarw13 Dec 4, 2020
f0a1808
hygiene: minor name fix
aggarw13 Dec 4, 2020
06669de
fix: Possibility of infinite loop in sendPacket
aggarw13 Dec 4, 2020
9092419
Add the new configuration to doxygen
aggarw13 Dec 4, 2020
858da68
test: Add mock transport send function that always returns zero
aggarw13 Dec 4, 2020
71e2a40
fix: Issues in sendPacket and sendPublish
aggarw13 Dec 5, 2020
1f6a138
test: add test for sendPacket timeout
aggarw13 Dec 5, 2020
e03f4e5
Update Timer Overflow test
aggarw13 Dec 5, 2020
605a83c
test: temporarily comment out unused variable
aggarw13 Dec 5, 2020
685ed2b
test: fix the timer overflow test
aggarw13 Dec 5, 2020
3504b2d
Merge remote-tracking branch 'origin/main' into fix/send-packet-infin…
aggarw13 Dec 5, 2020
a82053c
Address review comments
aggarw13 Dec 7, 2020
245ef09
Merge branch 'main' into fix/send-packet-infinite-loop
aggarw13 Dec 7, 2020
ddbb4b7
style: make log messages concise
aggarw13 Dec 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/doxygen/pages.dox
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ Some configuration settings are C pre-processor constants, and some are function
@section MQTT_RECV_POLLING_TIMEOUT_MS
@copydoc MQTT_RECV_POLLING_TIMEOUT_MS

@section MQTT_SEND_RETRY_TIMEOUT_MS
@copydoc MQTT_SEND_RETRY_TIMEOUT_MS

@section MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT
@copydoc MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT

Expand Down
3 changes: 2 additions & 1 deletion lexicon.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ transportsectionimplementation
transportsectionoverview
transportsend
transportstruct
tx
uint
un
unsuback
Expand All @@ -371,4 +372,4 @@ xb
xc
xd
xe
xf
xf
50 changes: 36 additions & 14 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,16 @@
* @brief param[in] pBufferToSend Buffer to be sent to network.
* @brief param[in] bytesToSend Number of bytes to be sent.
*
* @return Total number of bytes sent, or negative number on network error.
* @note This operation may call the transport send function
* repeatedly to send bytes over the network until either:
* 1. The requested number of bytes @a bytesToSend have been sent.
* OR
* 2. No byte cannot be sent over the network for the MQTT_SEND_RETRY_TIMEOUT_MS
* duration.
* OR
* 3. There is an error in sending data over the network.
*
* @return Total number of bytes sent, or negative value on network error.
*/
static int32_t sendPacket( MQTTContext_t * pContext,
const uint8_t * pBufferToSend,
Expand Down Expand Up @@ -591,7 +600,7 @@ static int32_t sendPacket( MQTTContext_t * pContext,
const uint8_t * pIndex = pBufferToSend;
size_t bytesRemaining = bytesToSend;
int32_t totalBytesSent = 0, bytesSent;
uint32_t sendTime = 0U;
uint32_t lastSendTimeMs = 0U, timeSinceLastSendMs = 0U;
bool sendError = false;

assert( pContext != NULL );
Expand All @@ -601,8 +610,8 @@ static int32_t sendPacket( MQTTContext_t * pContext,

bytesRemaining = bytesToSend;

/* Record the time of transmission. */
sendTime = pContext->getTime();
/* Record the most recent time of successful transmission. */
lastSendTimeMs = pContext->getTime();

/* Loop until the entire packet is sent. */
while( ( bytesRemaining > 0UL ) && ( sendError == false ) )
Expand All @@ -617,8 +626,11 @@ static int32_t sendPacket( MQTTContext_t * pContext,
totalBytesSent = bytesSent;
sendError = true;
}
else
else if( bytesSent > 0 )
{
/* Record the most recent time of successful transmission. */
lastSendTimeMs = pContext->getTime();

/* It is a bug in the application's transport send implementation if
* more bytes than expected are sent. To avoid a possible overflow
* in converting bytesRemaining from unsigned to signed, this assert
Expand All @@ -628,20 +640,30 @@ static int32_t sendPacket( MQTTContext_t * pContext,
bytesRemaining -= ( size_t ) bytesSent;
totalBytesSent += bytesSent;
pIndex += bytesSent;
LogDebug( ( "BytesSent=%ld, BytesRemaining=%lu,"
" TotalBytesSent=%ld.",
LogDebug( ( "BytesSent=%ld, BytesRemaining=%lu",
( long int ) bytesSent,
( unsigned long ) bytesRemaining,
( long int ) totalBytesSent ) );
( unsigned long ) bytesRemaining ) );
}
else
{
/* No bytes were sent over the network. */
timeSinceLastSendMs = calculateElapsedTime( pContext->getTime(), lastSendTimeMs );

/* Check for timeout if we have been waiting to send any data over the network. */
if( timeSinceLastSendMs >= MQTT_SEND_RETRY_TIMEOUT_MS )
{
LogError( ( "Unable to send packet: Timed out in transport send." ) );
sendError = true;
}
}
}

/* Update time of last transmission if the entire packet is successfully sent. */
if( totalBytesSent > 0 )
{
pContext->lastPacketTime = sendTime;
pContext->lastPacketTime = lastSendTimeMs;
LogDebug( ( "Successfully sent packet at time %lu.",
( unsigned long ) sendTime ) );
( unsigned long ) lastSendTimeMs ) );
}

return totalBytesSent;
Expand Down Expand Up @@ -755,7 +777,7 @@ static int32_t recvExact( const MQTTContext_t * pContext,
/* Check for timeout if we have been waiting to receive any byte on the network. */
if( timeSinceLastRecvMs >= MQTT_RECV_POLLING_TIMEOUT_MS )
{
LogError( ( "Time expired while receiving packet." ) );
LogError( ( "Unable to receive packet: Timed out in transport recv." ) );
receiveError = true;
}
}
Expand Down Expand Up @@ -1387,7 +1409,7 @@ static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
pContext->networkBuffer.pBuffer,
headerSize );

if( bytesSent < 0 )
if( bytesSent < ( int32_t ) headerSize )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to make the same change in the other places where sendPacket is called, or will that be a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate PR to avoid scope screep in this PR.

{
LogError( ( "Transport send failed for PUBLISH header." ) );
status = MQTTSendFailed;
Expand All @@ -1405,7 +1427,7 @@ static MQTTStatus_t sendPublish( MQTTContext_t * pContext,
pPublishInfo->pPayload,
pPublishInfo->payloadLength );

if( bytesSent < 0 )
if( bytesSent < ( int32_t ) pPublishInfo->payloadLength )
{
LogError( ( "Transport send failed for PUBLISH payload." ) );
status = MQTTSendFailed;
Expand Down
26 changes: 24 additions & 2 deletions source/include/core_mqtt_config_defaults.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,37 @@
* If the timeout expires, the #MQTT_ProcessLoop and #MQTT_ReceiveLoop functions
* return #MQTTRecvFailed.
*
* <b>Possible values:</b> Any positive integer up to SIZE_MAX. Recommended to
* use a small timeout value. <br>
* <b>Possible values:</b> Any positive 32 bit integer. Recommended to use a
* small timeout value. <br>
* <b>Default value:</b> `10`
*
*/
#ifndef MQTT_RECV_POLLING_TIMEOUT_MS
#define MQTT_RECV_POLLING_TIMEOUT_MS ( 10U )
#endif

/**
* @brief The maximum duration between non-empty network transmissions while
* sending an MQTT packet via the #MQTT_ProcessLoop or #MQTT_ReceiveLoop
* API functions.
*
* When sending an MQTT packet, the transport send function may be called multiple
* times until all of the required number of bytes are sent.
* This timeout represents the maximum duration that is allowed for no data
* transmission over the network through the transport send function.
*
* If the timeout expires, the #MQTT_ProcessLoop and #MQTT_ReceiveLoop functions
* return #MQTTSendFailed.
*
* <b>Possible values:</b> Any positive 32 bit integer. Recommended to use a small
* timeout value. <br>
* <b>Default value:</b> `10`
*
*/
#ifndef MQTT_SEND_RETRY_TIMEOUT_MS
#define MQTT_SEND_RETRY_TIMEOUT_MS ( 10U )
#endif

/**
* @brief Macro that is called in the MQTT library for logging "Error" level
* messages.
Expand Down
7 changes: 6 additions & 1 deletion source/interface/transport_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ typedef struct NetworkContext NetworkContext_t;
*
* @return The number of bytes received or a negative value to indicate
* error.
*
* @note If no data is available on the network to read and no error
* has occurred, zero MUST be the return value. Zero MUST NOT be used
* if a network disconnection has occurred.
Expand All @@ -183,7 +184,11 @@ typedef int32_t ( * TransportRecv_t )( NetworkContext_t * pNetworkContext,
* @param[in] pBuffer Buffer containing the bytes to send over the network stack.
* @param[in] bytesToSend Number of bytes to send over the network.
*
* @return The number of bytes sent or a negative error code.
* @return The number of bytes sent or a negative value to indicate error.
*
* @note If no data is transmitted over the network due to a full TX buffer and
* no network error has occurred, this MUST return zero as the return value.
* Zero MUST NOT be returned if a network disconnection has occurred.
*/
/* @[define_transportsend] */
typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext,
Expand Down
82 changes: 77 additions & 5 deletions test/unit-test/core_mqtt_utest.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,18 @@
#define MQTT_OVERFLOW_OFFSET ( 3 )

/**
* @brief Subtract this value from max value of global entry time
* for the timer overflow test.
* @brief The number of times the "getTime()" function is called
* within a single iteration of the #MQTT_ProcessLoop.
*
* This constant is used for the timer overflow test which checks
* that the API can support normal behavior even if the timer
* overflows.
*
* @note Currently, there are 5 calls within a single iteration.
* This can change when the implementation changes which would be
* caught through unit test failure.
*/
#define MQTT_TIMER_CALLS_PER_ITERATION ( 4 )
#define MQTT_TIMER_CALLS_PER_ITERATION ( 5 )

/**
* @brief Timeout for the timer overflow test.
Expand Down Expand Up @@ -300,6 +308,19 @@ static int32_t transportSendFailure( NetworkContext_t * pNetworkContext,
return -1;
}

/**
* @brief Mocked transport send that always returns 0 bytes sent.
*/
static int32_t transportSendNoBytes( NetworkContext_t * pNetworkContext,
const void * pBuffer,
size_t bytesToWrite )
{
( void ) pNetworkContext;
( void ) pBuffer;
( void ) bytesToWrite;
return 0;
}

/**
* @brief Mocked transport send that succeeds then fails.
*/
Expand Down Expand Up @@ -1327,6 +1348,47 @@ void test_MQTT_Publish( void )
TEST_ASSERT_EQUAL_INT( MQTTSuccess, status );
}

/**
* @brief Test that verifies that the MQTT_Publish API detects a timeout
* and returns failure when the transport send function is unable to send any data
* over the network.
*/
void test_MQTT_Publish_Send_Timeout( void )
{
MQTTContext_t mqttContext;
MQTTPublishInfo_t publishInfo;
TransportInterface_t transport;
MQTTFixedBuffer_t networkBuffer;
MQTTStatus_t status;
size_t headerSize;

setupNetworkBuffer( &networkBuffer );
setupTransportInterface( &transport );

/* Set the transport send function to the mock that always returns zero
* bytes for the test. */
transport.send = transportSendNoBytes;

/* Initialize the MQTT context. */
MQTT_Init( &mqttContext, &transport, getTime, eventCallback, &networkBuffer );

/* Setup for making sure that the test results in calling sendPacket function
* where calls to transport send function are made (repeatedly to send packet
* over the network).*/
memset( &publishInfo, 0, sizeof( MQTTPublishInfo_t ) );
headerSize = 1;
publishInfo.pPayload = "Test";
publishInfo.payloadLength = 4;
MQTT_GetPublishPacketSize_IgnoreAndReturn( MQTTSuccess );
MQTT_SerializePublishHeader_ExpectAnyArgsAndReturn( MQTTSuccess );
MQTT_SerializePublishHeader_ReturnThruPtr_pHeaderSize( &headerSize );

/* Call the API function under test and expect that it detects a timeout in sending
* MQTT packet over the network. */
status = MQTT_Publish( &mqttContext, &publishInfo, 0 );
TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status );
}

/* ========================================================================== */

/**
Expand Down Expand Up @@ -1949,17 +2011,27 @@ void test_MQTT_ProcessLoop_Timer_Overflow( void )
MQTTPublishState_t publishState = MQTTPubAckSend;
MQTTPublishState_t ackState = MQTTPublishDone;
uint8_t i = 0;
uint8_t numIterations = ( MQTT_TIMER_OVERFLOW_TIMEOUT_MS / MQTT_TIMER_CALLS_PER_ITERATION ) + 1;

/* Calculate the number of iterations that the loop within the MQTT_ProcessLoop call
* will be executed for the time duration value in the test.
* The number of iterations is ceiling( Time Duration / Number of timer calls per iteration ) . */
uint8_t numIterations = ( MQTT_TIMER_OVERFLOW_TIMEOUT_MS + MQTT_TIMER_CALLS_PER_ITERATION - 1 ) /
MQTT_TIMER_CALLS_PER_ITERATION;

uint32_t expectedFinalTime;

setupTransportInterface( &transport );
setupNetworkBuffer( &networkBuffer );

networkBuffer.size = 1000;
incomingPacket.type = MQTT_PACKET_TYPE_PUBLISH;
incomingPacket.remainingLength = MQTT_SAMPLE_REMAINING_LENGTH;

globalEntryTime = UINT32_MAX - MQTT_OVERFLOW_OFFSET;
expectedFinalTime = MQTT_TIMER_CALLS_PER_ITERATION * numIterations - MQTT_OVERFLOW_OFFSET;

/* Calculate the expected time counter value after the MQTT_ProcessLoop API call.
* Note: The "+ 1" is for the call to getTime() function before the loop iterations. */
expectedFinalTime = globalEntryTime + ( numIterations * MQTT_TIMER_CALLS_PER_ITERATION ) + 1;
Copy link
Contributor

@muneebahmed10 muneebahmed10 Dec 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correcting the calculation of the expected time counter after the call to MQTT_ProcessLoop. The +1 is for the call made to getTime before all iterations of the loop in the API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the correction? This value is the same as before isn't it?
(UINT32_MAX - MQTT_OVERFLOW_OFFSET) + (numIterations * MQTT_TIMER_CALLS_PER_ITERATION ) + 1 is the same as
((0) - MQTT_OVERFLOW_OFFSET) + (numIterations * MQTT_TIMER_CALLS_PER_ITERATION) or
MQTT_TIMER_CALLS_PER_ITERATION * numIterations - MQTT_OVERFLOW_OFFSET

Also, basing expectedFinalTime off of globalEntryTime results in a calculation that always yields the final value, regardless if the counter rolls over back to 0 or not, which is what this test seems to be testing (e.g. If someone accidentally changed the value of globalEntryTime so there's no longer a rollover, this test would still pass).

Ultimately it doesn't matter, since the value is the same as it was before, but that's why this change looked weird to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, basing expectedFinalTime off of globalEntryTime results in a calculation that always yields the final value, regardless if the counter rolls over back to 0 or not,

This is the main motivation from a resiliency and readability perspective.

Copy link
Contributor

@muneebahmed10 muneebahmed10 Dec 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was saying it's less resilient since before, accidentally setting globalEntryTime incorrectly could be caught due to the test failing. Now, due to how expectedFinalTime is calculated from globalEntryTime, the test will pass regardless of the initial value of the entry time


mqttStatus = MQTT_Init( &context, &transport, getTime, eventCallback, &networkBuffer );
TEST_ASSERT_EQUAL( MQTTSuccess, mqttStatus );
Expand Down