diff --git a/libraries/net/include/graphene/net/message.hpp b/libraries/net/include/graphene/net/message.hpp index cfff380f9b..6d254b6120 100644 --- a/libraries/net/include/graphene/net/message.hpp +++ b/libraries/net/include/graphene/net/message.hpp @@ -44,6 +44,11 @@ namespace graphene { namespace net { { boost::endian::little_uint32_buf_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE boost::endian::little_uint32_buf_t msg_type; // every channel gets a 16 bit message type specifier + message_header() + { + size = 0; + msg_type = 0; + } }; typedef fc::uint160_t message_hash_type; diff --git a/libraries/net/include/graphene/net/peer_connection.hpp b/libraries/net/include/graphene/net/peer_connection.hpp index dd9d6eb774..a00e43dcbf 100644 --- a/libraries/net/include/graphene/net/peer_connection.hpp +++ b/libraries/net/include/graphene/net/peer_connection.hpp @@ -164,25 +164,25 @@ namespace graphene { namespace net }; - size_t _total_queued_messages_size; + size_t _total_queued_messages_size = 0; std::queue, std::list > > _queued_messages; fc::future _send_queued_messages_done; public: fc::time_point connection_initiation_time; fc::time_point connection_closed_time; fc::time_point connection_terminated_time; - peer_connection_direction direction; + peer_connection_direction direction = peer_connection_direction::unknown; //connection_state state; - firewalled_state is_firewalled; + firewalled_state is_firewalled = firewalled_state::unknown; fc::microseconds clock_offset; fc::microseconds round_trip_delay; - our_connection_state our_state; - bool they_have_requested_close; - their_connection_state their_state; - bool we_have_requested_close; + our_connection_state our_state = our_connection_state::disconnected; + bool they_have_requested_close = false; + their_connection_state their_state = their_connection_state::disconnected; + bool we_have_requested_close = false; - connection_negotiation_status negotiation_status; + connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected; fc::oexception connection_closed_error; fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); } @@ -197,7 +197,7 @@ namespace graphene { namespace net * from the user_data field of the hello, or if none is present it will be filled with a * copy of node_public_key */ node_id_t node_id; - uint32_t core_protocol_version; + uint32_t core_protocol_version = 0; std::string user_agent; fc::optional graphene_git_revision_sha; fc::optional graphene_git_revision_unix_timestamp; @@ -210,8 +210,8 @@ namespace graphene { namespace net // its hello message. For outbound, they record what we sent the peer // in our hello message fc::ip::address inbound_address; - uint16_t inbound_port; - uint16_t outbound_port; + uint16_t inbound_port = 0; + uint16_t outbound_port = 0; /// @} typedef std::unordered_map item_to_time_map_type; @@ -220,15 +220,15 @@ namespace graphene { namespace net /// @{ boost::container::deque ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about std::set ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing - uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids - bool peer_needs_sync_items_from_us; - bool we_need_sync_items_from_peer; + uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids + bool peer_needs_sync_items_from_us = false; + bool we_need_sync_items_from_peer = false; fc::optional, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy() fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer std::set sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows fc::time_point_sec last_block_time_delegate_has_seen; - bool inhibit_fetching_sync_blocks; + bool inhibit_fetching_sync_blocks = false; /// @} /// non-synchronization state data @@ -258,18 +258,17 @@ namespace graphene { namespace net // blockchain catch up fc::time_point transaction_fetching_inhibited_until; - uint32_t last_known_fork_block_number; + uint32_t last_known_fork_block_number = 0; fc::future accept_or_connect_task_done; - firewall_check_state_data *firewall_check_state; -#ifndef NDEBUG + firewall_check_state_data *firewall_check_state = nullptr; private: - fc::thread* _thread; - unsigned _send_message_queue_tasks_running; // temporary debugging +#ifndef NDEBUG + fc::thread* _thread = nullptr; + unsigned _send_message_queue_tasks_running = 0; // temporary debugging #endif - bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system - private: + bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system peer_connection(peer_connection_delegate* delegate); void destroy(); public: diff --git a/libraries/net/message_oriented_connection.cpp b/libraries/net/message_oriented_connection.cpp index d5c0958f1b..b62651fa76 100644 --- a/libraries/net/message_oriented_connection.cpp +++ b/libraries/net/message_oriented_connection.cpp @@ -32,6 +32,8 @@ #include #include +#include + #ifdef DEFAULT_LOGGER # undef DEFAULT_LOGGER #endif @@ -61,8 +63,8 @@ namespace graphene { namespace net { fc::time_point _last_message_received_time; fc::time_point _last_message_sent_time; - bool _send_message_in_progress; - + std::atomic_bool _send_message_in_progress; + std::atomic_bool _read_loop_in_progress; #ifndef NDEBUG fc::thread* _thread; #endif @@ -99,7 +101,8 @@ namespace graphene { namespace net { _ready_for_sending(fc::promise::create()), _bytes_received(0), _bytes_sent(0), - _send_message_in_progress(false) + _send_message_in_progress(false), + _read_loop_in_progress(false) #ifndef NDEBUG ,_thread(&fc::thread::current()) #endif @@ -141,6 +144,20 @@ namespace graphene { namespace net { _sock.bind(local_endpoint); } + class no_parallel_execution_guard final + { + std::atomic_bool* _flag; + public: + explicit no_parallel_execution_guard(std::atomic_bool* flag) : _flag(flag) + { + bool expected = false; + FC_ASSERT( flag->compare_exchange_strong( expected, true ), "Only one thread at time can visit it"); + } + ~no_parallel_execution_guard() + { + *_flag = false; + } + }; void message_oriented_connection_impl::read_loop() { @@ -149,6 +166,8 @@ namespace graphene { namespace net { const int LEFTOVER = BUFFER_SIZE - sizeof(message_header); static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer"); + no_parallel_execution_guard guard( &_read_loop_in_progress ); + _connected_time = fc::time_point::now(); fc::oexception exception_to_rethrow; @@ -244,17 +263,7 @@ namespace graphene { namespace net { } send_message_scope_logger(remote_endpoint); #endif #endif - struct verify_no_send_in_progress { - bool& var; - verify_no_send_in_progress(bool& var) : var(var) - { - if (var) - elog("Error: two tasks are calling message_oriented_connection::send_message() at the same time"); - assert(!var); - var = true; - } - ~verify_no_send_in_progress() { var = false; } - } _verify_no_send_in_progress(_send_message_in_progress); + no_parallel_execution_guard guard( &_send_message_in_progress ); _ready_for_sending->wait(); try @@ -265,8 +274,11 @@ namespace graphene { namespace net { //pad the message we send to a multiple of 16 bytes size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16); std::unique_ptr padded_message(new char[size_with_padding]); + memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header)); memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size.value() ); + char* padding_space = padded_message.get() + sizeof(message_header) + message_to_send.size.value(); + memset(padding_space, 0, size_with_padding - size_of_message_and_header); _sock.write(padded_message.get(), size_with_padding); _sock.flush(); _bytes_sent += size_with_padding;