From 4d1df0b75c71e8b3fb9849f16fcdfb5dfb071eeb Mon Sep 17 00:00:00 2001 From: AndKram Date: Mon, 9 Jul 2018 11:30:14 +0100 Subject: [PATCH 01/15] macos async cunk_test fails --- Malmo/src/AgentHost.cpp | 5 +++-- .../java/com/microsoft/Malmo/Client/ClientStateMachine.java | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index 62b06cc89..f32977c29 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -65,7 +65,7 @@ namespace malmo // start the io_service on background threads this->work = boost::in_place(boost::ref(this->io_service)); - const int NUM_BACKGROUND_THREADS = 1; // can be increased if I/O becomes a bottleneck + const int NUM_BACKGROUND_THREADS = 2; // can be increased if I/O becomes a bottleneck for( int i = 0; i < NUM_BACKGROUND_THREADS; i++ ) this->background_threads.push_back( boost::make_shared( boost::bind( &boost::asio::io_service::run, &this->io_service ) ) ); } @@ -314,9 +314,10 @@ namespace malmo { reply = rpc.sendStringAndGetShortReply(this->io_service, item->ip_address, item->control_port, request, false); } - catch (std::exception&) + catch (std::exception& e) { // This is expected quite often - client is likely not running. + LOGINFO(LT("Client could not be contacted: ", item->ip_address, LT(":"), item->control_port), LT(" "), e.what()); continue; } LOGINFO(LT("Reserving client, received reply from "), item->ip_address, LT(": "), reply); diff --git a/Minecraft/src/main/java/com/microsoft/Malmo/Client/ClientStateMachine.java b/Minecraft/src/main/java/com/microsoft/Malmo/Client/ClientStateMachine.java index 6be09083f..23c0e9a10 100755 --- a/Minecraft/src/main/java/com/microsoft/Malmo/Client/ClientStateMachine.java +++ b/Minecraft/src/main/java/com/microsoft/Malmo/Client/ClientStateMachine.java @@ -433,6 +433,7 @@ public void onError(String error, DataOutputStream dos) { dos.writeInt(error.length()); dos.writeBytes(error); + dos.flush(); } catch (IOException e) { @@ -446,6 +447,7 @@ private void reply(String reply, DataOutputStream dos) { dos.writeInt(reply.length()); dos.writeBytes(reply); + dos.flush(); } catch (IOException e) { From 3fa41806d852c694dfb71edf32069bcea5c470f7 Mon Sep 17 00:00:00 2001 From: AndKram Date: Mon, 9 Jul 2018 11:47:43 +0100 Subject: [PATCH 02/15] fix log bracketing --- Malmo/src/AgentHost.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index f32977c29..a56953f55 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -317,7 +317,7 @@ namespace malmo catch (std::exception& e) { // This is expected quite often - client is likely not running. - LOGINFO(LT("Client could not be contacted: ", item->ip_address, LT(":"), item->control_port), LT(" "), e.what()); + LOGINFO(LT("Client could not be contacted: "), item->ip_address, LT(":"), item->control_port, LT(" "), e.what()); continue; } LOGINFO(LT("Reserving client, received reply from "), item->ip_address, LT(": "), reply); From 318764a3f5ebad1c9911618e48a37efc5e5d85fc Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 10 Jul 2018 08:45:58 +0100 Subject: [PATCH 03/15] merge master and python37 on MacOS --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 378e36310..d8fb4e997 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if( INCLUDE_PYTHON ) set( MACOS_USE_PYTHON_MODULE_DESC "Specifies which Python module to build Malmo on Apple MacOS" ) set( USE_PYTHON_VERSIONS 3.6 CACHE STRING ${USE_PYTHON_VERSIONS_DESC} ) # Boost has switched to using a 2 digit naming convention for python on MacOS. - set( MACOS_USE_PYTHON_MODULE "python36" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} ) + set( MACOS_USE_PYTHON_MODULE "python37" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} ) endif() set( WARNINGS_AS_ERRORS OFF ) From 66783ec0de0436380c3eae2d1afd8833dc9378fa Mon Sep 17 00:00:00 2001 From: AndKram Date: Wed, 11 Jul 2018 13:22:55 +0100 Subject: [PATCH 04/15] 3 background threads --- Malmo/src/AgentHost.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index a56953f55..466d49e9e 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -65,7 +65,7 @@ namespace malmo // start the io_service on background threads this->work = boost::in_place(boost::ref(this->io_service)); - const int NUM_BACKGROUND_THREADS = 2; // can be increased if I/O becomes a bottleneck + const int NUM_BACKGROUND_THREADS = 3; // can be increased if I/O becomes a bottleneck for( int i = 0; i < NUM_BACKGROUND_THREADS; i++ ) this->background_threads.push_back( boost::make_shared( boost::bind( &boost::asio::io_service::run, &this->io_service ) ) ); } From e98b221218e84c22fcbdce0abfe6e1f58ef32f8d Mon Sep 17 00:00:00 2001 From: AndKram Date: Mon, 16 Jul 2018 10:43:17 +0100 Subject: [PATCH 05/15] getline returns iostream --- Malmo/src/AgentHost.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index 466d49e9e..0a83d21ed 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -113,7 +113,7 @@ namespace malmo std::string line = ""; std::string version = ""; // Keep concatenating lines until we have a match, or we run out of schema. - while (version.empty() && getline(stream, line)) + while (version.empty() && !stream.eof() && getline(stream, line)) { boost::trim(line); xml += line; From 4d4199b2e0399426e69cf62ac970d95f6c1001cf Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 17 Jul 2018 14:16:11 +0100 Subject: [PATCH 06/15] close with server releasing --- Malmo/src/AgentHost.cpp | 36 +++++++++++++++--- Malmo/src/StringServer.cpp | 16 +++++++- Malmo/src/StringServer.h | 12 +++++- Malmo/src/TCPServer.cpp | 44 +++++++++++++++++++--- Malmo/src/TCPServer.h | 12 +++++- Malmo/src/VideoServer.cpp | 14 ++++++- Malmo/src/VideoServer.h | 11 +++++- Malmo/test/CppTests/test_client_server.cpp | 2 +- Malmo/test/CppTests/test_persistence.cpp | 14 +++---- Malmo/test/CppTests/test_string_server.cpp | 10 ++--- Malmo/test/CppTests/test_video_server.cpp | 8 ++-- 11 files changed, 141 insertions(+), 38 deletions(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index 0a83d21ed..c22a6fd46 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -453,7 +453,8 @@ namespace malmo { boost::lock_guard scope_guard(this->world_state_mutex); - return this->world_state; + WorldState current_world_state(this->world_state); + return current_world_state; } WorldState AgentHost::getWorldState() @@ -499,8 +500,13 @@ namespace malmo { return; // can re-use existing server } + + if (this->mission_control_server != 0) { + this->mission_control_server->close(); + } + this->mission_control_server = boost::make_shared(this->io_service, port, boost::bind(&AgentHost::onMissionControlMessage, this, _1), "mcp"); - this->mission_control_server->start(); + this->mission_control_server->start(mission_control_server); } boost::shared_ptr AgentHost::listenForVideo(boost::shared_ptr video_server, int port, short width, short height, short channels, TimestampedVideoFrame::FrameType frametype) @@ -531,6 +537,10 @@ namespace malmo video_server->getChannels() != channels || video_server->getFrameType() != frametype) { + if (video_server != 0) { + video_server->close(); + } + // Can't use the server passed in - create a new one. ret_server = boost::make_shared( this->io_service, port, width, height, channels, frametype, boost::bind(&AgentHost::onVideo, this, _1)); @@ -541,8 +551,8 @@ namespace malmo ret_server->recordBmps(this->current_mission_record->getTemporaryDirectory()); } - ret_server->start(); - } + ret_server->start(ret_server); + } else { // re-use the existing video_server // but now we need to re-create the file writers with the new file names @@ -563,8 +573,12 @@ namespace malmo { if( !this->rewards_server || ( port != 0 && this->rewards_server->getPort() != port ) ) { + if (rewards_server != nullptr) { + rewards_server->close(); + } + this->rewards_server = boost::make_shared(this->io_service, port, boost::bind(&AgentHost::onReward, this, _1), "rew"); - this->rewards_server->start(); + this->rewards_server->start(rewards_server); } if (this->current_mission_record->isRecordingRewards()){ @@ -576,8 +590,12 @@ namespace malmo { if( !this->observations_server || ( port != 0 && this->observations_server->getPort() != port ) ) { + if (observations_server != nullptr) { + observations_server->close(); + } + this->observations_server = boost::make_shared(this->io_service, port, boost::bind(&AgentHost::onObservation, this, _1), "obs"); - this->observations_server->start(); + this->observations_server->start(observations_server); } if (this->current_mission_record->isRecordingObservations()){ @@ -716,26 +734,32 @@ namespace malmo { if (this->video_server) { this->video_server->stopRecording(); + this->video_server->close(); } if (this->depth_server) { this->depth_server->stopRecording(); + this->video_server->close(); } if (this->luminance_server) { this->luminance_server->stopRecording(); + this->video_server->close(); } if (this->colourmap_server) { this->colourmap_server->stopRecording(); + this->video_server->close(); } if (this->observations_server){ this->observations_server->stopRecording(); + this->video_server->close(); } if (this->rewards_server){ this->rewards_server->stopRecording(); + this->video_server->close(); } if (this->commands_stream.is_open()){ diff --git a/Malmo/src/StringServer.cpp b/Malmo/src/StringServer.cpp index 809342931..8b442d0f8 100755 --- a/Malmo/src/StringServer.cpp +++ b/Malmo/src/StringServer.cpp @@ -24,6 +24,8 @@ #include #include +#include + namespace malmo { StringServer::StringServer(boost::asio::io_service& io_service, int port, const boost::function handle_string, const std::string& log_name) : handle_string(handle_string) @@ -31,9 +33,19 @@ namespace malmo { { } - void StringServer::start() + void StringServer::start(boost::shared_ptr& scope) { - this->server.start(); + this->scope = scope; + this->server.start(scope.get()); + } + + void StringServer::close() { + this->server.close(); + } + + void StringServer::release() { + std::cout << "release string server" << std::endl; + this->scope = 0; } StringServer& StringServer::record(std::string path) diff --git a/Malmo/src/StringServer.h b/Malmo/src/StringServer.h index 9b3eb29cf..d36f5e51a 100755 --- a/Malmo/src/StringServer.h +++ b/Malmo/src/StringServer.h @@ -27,6 +27,7 @@ // Boost: #include #include +#include // STL: #include @@ -36,7 +37,7 @@ namespace malmo { //! A TCP server that receives strings and can optionally persist to file. - class StringServer + class StringServer : ServerScope { public: @@ -57,7 +58,12 @@ namespace malmo void recordMessage(const TimestampedString message); //! Starts the string server. - void start(); + + void start(boost::shared_ptr& scope); + + virtual void release(); + + void close(); private: @@ -67,6 +73,8 @@ namespace malmo TCPServer server; std::ofstream writer; boost::mutex write_mutex; + + boost::shared_ptr scope = nullptr; }; } diff --git a/Malmo/src/TCPServer.cpp b/Malmo/src/TCPServer.cpp index fb07b7689..dbd457755 100755 --- a/Malmo/src/TCPServer.cpp +++ b/Malmo/src/TCPServer.cpp @@ -54,11 +54,17 @@ namespace malmo } } - void TCPServer::start() + void TCPServer::start(ServerScope* scope) { + this->scope = scope; this->startAccept(); } - + + void TCPServer::close() { + this->closing = true; + this->acceptor->close(); + } + void TCPServer::confirmWithFixedReply(std::string reply) { this->confirm_with_fixed_reply = true; @@ -72,9 +78,14 @@ namespace malmo void TCPServer::startAccept() { + boost::function deliverMsgIfNotClosed = [this](const TimestampedUnsignedCharVector msg) { + if (!this->closing) + this->onMessageReceived(msg); + }; + boost::shared_ptr new_connection = TCPConnection::create( this->acceptor->get_io_service(), - this->onMessageReceived, + deliverMsgIfNotClosed, this->expect_size_header, this->log_name ); @@ -89,17 +100,38 @@ namespace malmo boost::asio::placeholders::error)); } - void TCPServer::handleAccept( + void TCPServer::handleAccept( boost::shared_ptr new_connection, const boost::system::error_code& error) { if (!error) { - new_connection->read(); - this->startAccept(); + if (this->closing) + { + new_connection.get()->getSocket().close(); + if (scope != nullptr) + scope->release(); + } + else { + new_connection->read(); + if (!this->closing) + { + this->startAccept(); + } + else + { + if (scope != nullptr) + scope->release(); + } + } } else + { LOGERROR(LT("TCPServer::handleAccept("), this->log_name, LT(") - "), error.message()); + if (closing && scope != nullptr) { + scope->release(); + } + } } int TCPServer::getPort() const diff --git a/Malmo/src/TCPServer.h b/Malmo/src/TCPServer.h index 420c4ea36..8407eba8c 100755 --- a/Malmo/src/TCPServer.h +++ b/Malmo/src/TCPServer.h @@ -28,6 +28,11 @@ namespace malmo { + class ServerScope { + public: + virtual void release() = 0; + }; + //! A TCP server that calls a function you provide when a message is received. class TCPServer { @@ -42,12 +47,14 @@ namespace malmo void expectSizeHeader(bool expect_size_header); //! Starts the TCP server. - void start(); + void start(ServerScope* scope); //! Gets the port this server is listening on. //! \returns The port this server is listening on. int getPort() const; + void close(); + private: virtual void startAccept(); @@ -68,6 +75,9 @@ namespace malmo std::string fixed_reply; bool expect_size_header; std::string log_name; + + bool closing = false; + ServerScope* scope = nullptr; }; } diff --git a/Malmo/src/VideoServer.cpp b/Malmo/src/VideoServer.cpp index e72468121..e705b82eb 100755 --- a/Malmo/src/VideoServer.cpp +++ b/Malmo/src/VideoServer.cpp @@ -41,12 +41,22 @@ namespace malmo { } - void VideoServer::start() + void VideoServer::start(boost::shared_ptr& scope) { + this->scope = scope; this->written_frames = this->queued_frames = this->received_frames = 0; - this->server.start(); + this->server.start(scope.get()); } + void VideoServer::close() { + this->server.close(); + } + + void VideoServer::release() { + std::cout << "release video server" << std::endl; + this->scope = nullptr; + } + void VideoServer::startRecording() { this->written_frames = this->queued_frames = this->received_frames = 0; diff --git a/Malmo/src/VideoServer.h b/Malmo/src/VideoServer.h index f4b01cf98..7c1f734dc 100755 --- a/Malmo/src/VideoServer.h +++ b/Malmo/src/VideoServer.h @@ -27,6 +27,7 @@ // Boost: #include +#include // STL: #include @@ -35,7 +36,7 @@ namespace malmo { //! A TCP server that receives video frames of a size specified beforehand and can optionally persist to file. - class VideoServer + class VideoServer : ServerScope { public: @@ -72,14 +73,20 @@ namespace malmo void startRecording(); //! Starts the video server. - void start(); + void start(boost::shared_ptr& scope); std::size_t receivedFrames() const { return this->received_frames; } std::size_t writtenFrames() const { return this->written_frames; } std::size_t queuedFrames() const { return this->queued_frames; } + void close(); + + virtual void release(); + private: + boost::shared_ptr scope; + void handleMessage( const TimestampedUnsignedCharVector message ); boost::function handle_frame; diff --git a/Malmo/test/CppTests/test_client_server.cpp b/Malmo/test/CppTests/test_client_server.cpp index 1d8012bfc..4799e8f0f 100755 --- a/Malmo/test/CppTests/test_client_server.cpp +++ b/Malmo/test/CppTests/test_client_server.cpp @@ -47,7 +47,7 @@ int main() std::cout << "Starting server.." << std::endl; boost::asio::io_service io_service; TCPServer server( io_service, 0, onMessageReceived, "test_client_server" ); - server.start(); + server.start(nullptr); boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service)); const int num_messages_sent = 100; diff --git a/Malmo/test/CppTests/test_persistence.cpp b/Malmo/test/CppTests/test_persistence.cpp index 3f37b1e0b..741252490 100755 --- a/Malmo/test/CppTests/test_persistence.cpp +++ b/Malmo/test/CppTests/test_persistence.cpp @@ -149,14 +149,14 @@ int runAgentHost(std::string filename) boost::asio::io_service io_service; - StringServer clientMissionControlServer(io_service, client_info.control_port, handleControlMessages, "test_mission_control"); - clientMissionControlServer.confirmWithFixedReply( "MALMOOK" ); - clientMissionControlServer.expectSizeHeader(false); - clientMissionControlServer.start(); + boost::shared_ptr clientMissionControlServer = boost::make_shared(io_service, client_info.control_port, handleControlMessages, "test_mission_control"); + clientMissionControlServer->confirmWithFixedReply( "MALMOOK" ); + clientMissionControlServer->expectSizeHeader(false); + clientMissionControlServer->start(clientMissionControlServer); - StringServer clientCommandsServer( io_service, commands_port, handleCommandMessages, "test_commands"); - clientCommandsServer.expectSizeHeader(false); - clientCommandsServer.start(); + boost::shared_ptr clientCommandsServer = boost::make_shared( io_service, commands_port, handleCommandMessages, "test_commands"); + clientCommandsServer->expectSizeHeader(false); + clientCommandsServer->start(clientCommandsServer); boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service)); diff --git a/Malmo/test/CppTests/test_string_server.cpp b/Malmo/test/CppTests/test_string_server.cpp index 92263f7c5..62de9297d 100755 --- a/Malmo/test/CppTests/test_string_server.cpp +++ b/Malmo/test/CppTests/test_string_server.cpp @@ -55,10 +55,10 @@ bool testStringServer( bool withReply ) std::cout << "Starting server.." << std::endl; boost::asio::io_service io_service; - StringServer server( io_service, 0, onMessageReceived, "test" ); + boost::shared_ptr server = boost::make_shared( io_service, 0, onMessageReceived, "test" ); if( withReply ) - server.confirmWithFixedReply( expected_reply ); - server.start(); + server->confirmWithFixedReply( expected_reply ); + server->start(server); boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service)); const int num_messages_sent = 100; @@ -69,14 +69,14 @@ bool testStringServer( bool withReply ) std::cout << "Sending messages.." << std::endl; for( int i = 0; i < num_messages_sent; i++ ) { if( withReply ) { - std::string reply = rpc.sendStringAndGetShortReply(io_service, "127.0.0.1", server.getPort(), expected_message, true); + std::string reply = rpc.sendStringAndGetShortReply(io_service, "127.0.0.1", server->getPort(), expected_message, true); if( reply != expected_reply ) { std::cout << "Unexpected reply." << std::endl; return false; } } else - SendStringOverTCP(io_service, "127.0.0.1", server.getPort(), expected_message, true); + SendStringOverTCP(io_service, "127.0.0.1", server->getPort(), expected_message, true); } boost::this_thread::sleep( sleep_time ); // allow time for the messages to get through diff --git a/Malmo/test/CppTests/test_video_server.cpp b/Malmo/test/CppTests/test_video_server.cpp index 23e667fc9..fd4a730d2 100755 --- a/Malmo/test/CppTests/test_video_server.cpp +++ b/Malmo/test/CppTests/test_video_server.cpp @@ -86,10 +86,10 @@ int main() try{ boost::asio::io_service io_service; - VideoServer server(io_service, port, width, width, channels, TimestampedVideoFrame::VIDEO, boost::function(handleFrame)); - server.recordMP4(filename, 10, 400000, true); - server.startRecording(); - server.start(); + boost::shared_ptr server = boost::make_shared(io_service, port, width, width, channels, TimestampedVideoFrame::VIDEO, boost::function(handleFrame)); + server->recordMP4(filename, 10, 400000, true); + server->startRecording(); + server->start(server); // start the io_service on a background thread boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service)); From 3a5efeb8a8120aa960300ac80602bc3099524344 Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 17 Jul 2018 14:26:32 +0100 Subject: [PATCH 07/15] fix c&p --- Malmo/src/AgentHost.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index c22a6fd46..69df1876b 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -739,27 +739,27 @@ namespace malmo if (this->depth_server) { this->depth_server->stopRecording(); - this->video_server->close(); + this->depth_server->close(); } if (this->luminance_server) { this->luminance_server->stopRecording(); - this->video_server->close(); + this->luminance_server->close(); } if (this->colourmap_server) { this->colourmap_server->stopRecording(); - this->video_server->close(); + this->colourmap_server->close(); } if (this->observations_server){ this->observations_server->stopRecording(); - this->video_server->close(); + this->observations_server->close(); } if (this->rewards_server){ this->rewards_server->stopRecording(); - this->video_server->close(); + this->rewards_server->close(); } if (this->commands_stream.is_open()){ From c81cfbaef69ec533df1128f097c466461fa89bb4 Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 17 Jul 2018 14:41:25 +0100 Subject: [PATCH 08/15] check outbox before pop --- Malmo/src/ClientConnection.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Malmo/src/ClientConnection.cpp b/Malmo/src/ClientConnection.cpp index dbb856edc..009360743 100755 --- a/Malmo/src/ClientConnection.cpp +++ b/Malmo/src/ClientConnection.cpp @@ -148,7 +148,8 @@ namespace malmo if (ec) LOGERROR(LT("Error resolving remote endpoint: "), ec.message()); boost::lock_guard scope_guard(this->outbox_mutex); - this->outbox.pop_front(); + if (!this->outbox.empty()) + this->outbox.pop_front(); } if (!this->outbox.empty()) this->write(); From 05051009c97d36545d944e485e285940bff1f0f2 Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 17 Jul 2018 15:20:28 +0100 Subject: [PATCH 09/15] fix premature close --- Malmo/src/AgentHost.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index 69df1876b..1548c409e 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -734,32 +734,26 @@ namespace malmo { if (this->video_server) { this->video_server->stopRecording(); - this->video_server->close(); } if (this->depth_server) { this->depth_server->stopRecording(); - this->depth_server->close(); } if (this->luminance_server) { this->luminance_server->stopRecording(); - this->luminance_server->close(); } if (this->colourmap_server) { this->colourmap_server->stopRecording(); - this->colourmap_server->close(); } if (this->observations_server){ this->observations_server->stopRecording(); - this->observations_server->close(); } if (this->rewards_server){ this->rewards_server->stopRecording(); - this->rewards_server->close(); } if (this->commands_stream.is_open()){ From 20449d42ede7476806198441df0640e6e8849900 Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 17 Jul 2018 16:10:32 +0100 Subject: [PATCH 10/15] remove cout print --- Malmo/src/StringServer.cpp | 1 - Malmo/src/VideoServer.cpp | 1 - 2 files changed, 2 deletions(-) diff --git a/Malmo/src/StringServer.cpp b/Malmo/src/StringServer.cpp index 8b442d0f8..0ad200c27 100755 --- a/Malmo/src/StringServer.cpp +++ b/Malmo/src/StringServer.cpp @@ -44,7 +44,6 @@ namespace malmo { } void StringServer::release() { - std::cout << "release string server" << std::endl; this->scope = 0; } diff --git a/Malmo/src/VideoServer.cpp b/Malmo/src/VideoServer.cpp index e705b82eb..f6b53ea92 100755 --- a/Malmo/src/VideoServer.cpp +++ b/Malmo/src/VideoServer.cpp @@ -53,7 +53,6 @@ namespace malmo } void VideoServer::release() { - std::cout << "release video server" << std::endl; this->scope = nullptr; } From f8d497f01e78b07512233c8be39d401def5ab82d Mon Sep 17 00:00:00 2001 From: AndKram Date: Thu, 2 Aug 2018 09:41:44 +0100 Subject: [PATCH 11/15] convert reply ti async --- Malmo/src/TCPConnection.cpp | 61 ++++++++++++++++++++++++++----------- Malmo/src/TCPConnection.h | 8 ++++- 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/Malmo/src/TCPConnection.cpp b/Malmo/src/TCPConnection.cpp index 46c20292b..0713db94b 100755 --- a/Malmo/src/TCPConnection.cpp +++ b/Malmo/src/TCPConnection.cpp @@ -141,32 +141,59 @@ namespace malmo else LOGERROR(LT("TCPConnection("), this->log_name, LT(")::handle_read_line("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes_transferred: "), bytes_transferred, LT(" - ERROR: "), error.message()); } - + void TCPConnection::processMessage() { LOGFINE(LT("TCPConnection("), this->log_name, LT(")::processMessage("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes received: "), this->body_buffer.size()); - if( this->confirm_with_fixed_reply ) - sendReply(); - this->onMessageReceived( TimestampedUnsignedCharVector( boost::posix_time::microsec_clock::universal_time(), - this->body_buffer ) ); - this->read(); + if (this->confirm_with_fixed_reply) + { + reply(); + } + else + { + deliverMessage(); + } } - void TCPConnection::sendReply() + void TCPConnection::reply() { const int REPLY_SIZE_HEADER_LENGTH = 4; - boost::system::error_code ec; - u_long reply_size_header = htonl((u_long)this->fixed_reply.size()); - size_t bytes_written = boost::asio::write(this->socket, boost::asio::buffer(&reply_size_header, REPLY_SIZE_HEADER_LENGTH), ec); - if (bytes_written != REPLY_SIZE_HEADER_LENGTH || ec) - LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - ONLY SENT "), bytes_written, LT(" BYTES: "), ec.message()); - - bytes_written = boost::asio::write( this->socket, boost::asio::buffer(this->fixed_reply), boost::asio::transfer_all(), ec ); - if (ec) - LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - failed to send body of message: "), ec.message()); + this->reply_size_header = htonl((u_long)this->fixed_reply.size()); + + // Send header and continue after with response body. + boost::asio::async_write(this->socket, boost::asio::buffer(&this->reply_size_header, REPLY_SIZE_HEADER_LENGTH), boost::bind(&TCPConnection::transferredHeader, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + + void TCPConnection::transferredHeader(const boost::system::error_code& error, std::size_t bytes_transferred) { + if (!error) + { + // Send body and continue after with message delivery. + boost::asio::async_write(this->socket, boost::asio::buffer(this->fixed_reply), boost::bind(&TCPConnection::transferredBody, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } else - LOGFINE(LT("TCPConnection("), this->log_name, LT(")::sendReply sent "), bytes_written, LT(" bytes")); + { + LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredHeader - failed to send header of message: "), error.message()); + } + } + + void TCPConnection::transferredBody(const boost::system::error_code& error, std::size_t bytes_transferred) { + if (!error) + { + LOGFINE(LT("TCPConnection("), this->log_name, LT(")::transferredBody sent "), bytes_transferred, LT(" bytes")); + + this->deliverMessage(); + } + else + { + LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredBody - failed to send body of message: "), error.message()); + } + } + + void TCPConnection::deliverMessage() + { + this->onMessageReceived(TimestampedUnsignedCharVector(boost::posix_time::microsec_clock::universal_time(), this->body_buffer)); + this->read(); // Continue on with reading of next request message. } TCPConnection::TCPConnection(boost::asio::io_service& io_service, boost::function callback, bool expect_size_header, const std::string& log_name) diff --git a/Malmo/src/TCPConnection.h b/Malmo/src/TCPConnection.h index ebce5563f..b10976a15 100755 --- a/Malmo/src/TCPConnection.h +++ b/Malmo/src/TCPConnection.h @@ -64,8 +64,13 @@ namespace malmo void handle_read_line( const boost::system::error_code& error, size_t bytes_transferred ); size_t getSizeFromHeader(); + void processMessage(); - void sendReply(); + void reply(); + void deliverMessage(); + + void transferredHeader(const boost::system::error_code& ec, std::size_t transferred); + void transferredBody(const boost::system::error_code& ec, std::size_t transferred); private: @@ -84,6 +89,7 @@ namespace malmo std::string fixed_reply; bool expect_size_header; std::string log_name; + u_long reply_size_header; }; } From 11b9d233888ac9f8f01dd09771fc63bbb15ecf10 Mon Sep 17 00:00:00 2001 From: AndKram Date: Thu, 2 Aug 2018 12:56:22 +0100 Subject: [PATCH 12/15] version to 0.36 --- VERSION | 2 +- changelog.txt | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/VERSION b/VERSION index dd93f5d30..93d4c1ef0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.35.6 +0.36.0 diff --git a/changelog.txt b/changelog.txt index 34c2e442e..3fbcd9246 100755 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,7 @@ +0.36.0 +------------------- +Fixes for MacOS stability problems. + 0.35.0 ------------------- New: Now possible for agent to select the Minecraft client's command port using From 5688277940f3f5aff8a99efc4aeff8301d143981 Mon Sep 17 00:00:00 2001 From: AndKram Date: Fri, 3 Aug 2018 11:19:10 +0100 Subject: [PATCH 13/15] version schemas --- Schemas/Mission.xsd | 2 +- Schemas/MissionEnded.xsd | 2 +- Schemas/MissionHandlers.xsd | 2 +- Schemas/MissionInit.xsd | 2 +- Schemas/Types.xsd | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Schemas/Mission.xsd b/Schemas/Mission.xsd index bca1e65cf..edae040ea 100755 --- a/Schemas/Mission.xsd +++ b/Schemas/Mission.xsd @@ -6,7 +6,7 @@ xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" elementFormDefault="qualified" jaxb:version="2.1" - version="0.35"> + version="0.36"> diff --git a/Schemas/MissionEnded.xsd b/Schemas/MissionEnded.xsd index 822f176f8..daf850f79 100755 --- a/Schemas/MissionEnded.xsd +++ b/Schemas/MissionEnded.xsd @@ -4,7 +4,7 @@ targetNamespace="http://ProjectMalmo.microsoft.com" xmlns="http://ProjectMalmo.microsoft.com" elementFormDefault="qualified" - version="0.35"> + version="0.36"> diff --git a/Schemas/MissionHandlers.xsd b/Schemas/MissionHandlers.xsd index ddf2053e7..9df2c4b6c 100755 --- a/Schemas/MissionHandlers.xsd +++ b/Schemas/MissionHandlers.xsd @@ -6,7 +6,7 @@ xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" elementFormDefault="qualified" jaxb:version="2.1" - version="0.35"> + version="0.36"> diff --git a/Schemas/MissionInit.xsd b/Schemas/MissionInit.xsd index 839a159a3..efae0feb4 100755 --- a/Schemas/MissionInit.xsd +++ b/Schemas/MissionInit.xsd @@ -4,7 +4,7 @@ targetNamespace="http://ProjectMalmo.microsoft.com" xmlns="http://ProjectMalmo.microsoft.com" elementFormDefault="qualified" - version="0.35"> + version="0.36"> diff --git a/Schemas/Types.xsd b/Schemas/Types.xsd index 6afe9e642..4018c949d 100755 --- a/Schemas/Types.xsd +++ b/Schemas/Types.xsd @@ -6,7 +6,7 @@ xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" elementFormDefault="qualified" jaxb:version="2.1" - version="0.35"> + version="0.36"> From c4193f48a80b91a24f0f5e6a92e617c2cbef482d Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 7 Aug 2018 13:48:33 +0100 Subject: [PATCH 14/15] review comments --- Malmo/src/AgentHost.cpp | 2 +- Malmo/src/TCPServer.cpp | 8 ++++---- Malmo/src/VideoServer.cpp | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Malmo/src/AgentHost.cpp b/Malmo/src/AgentHost.cpp index 1548c409e..e636f407e 100755 --- a/Malmo/src/AgentHost.cpp +++ b/Malmo/src/AgentHost.cpp @@ -453,7 +453,7 @@ namespace malmo { boost::lock_guard scope_guard(this->world_state_mutex); - WorldState current_world_state(this->world_state); + WorldState current_world_state(this->world_state); // Copy while holding lock. return current_world_state; } diff --git a/Malmo/src/TCPServer.cpp b/Malmo/src/TCPServer.cpp index dbd457755..79843a7d3 100755 --- a/Malmo/src/TCPServer.cpp +++ b/Malmo/src/TCPServer.cpp @@ -109,8 +109,8 @@ namespace malmo if (this->closing) { new_connection.get()->getSocket().close(); - if (scope != nullptr) - scope->release(); + if (this->scope != nullptr) + this->scope->release(); // Release scope which can be self. } else { new_connection->read(); @@ -120,8 +120,8 @@ namespace malmo } else { - if (scope != nullptr) - scope->release(); + if (this->scope != nullptr) + this->scope->release(); // Release scope which can be self. } } } diff --git a/Malmo/src/VideoServer.cpp b/Malmo/src/VideoServer.cpp index f6b53ea92..8b49caea5 100755 --- a/Malmo/src/VideoServer.cpp +++ b/Malmo/src/VideoServer.cpp @@ -41,6 +41,7 @@ namespace malmo { } + // Start the server within a scope that is used to control sharing from async io calls. void VideoServer::start(boost::shared_ptr& scope) { this->scope = scope; From 2a977a611ab4123ea52d7bdf7ac7ba3e9da18147 Mon Sep 17 00:00:00 2001 From: AndKram Date: Tue, 7 Aug 2018 15:30:04 +0100 Subject: [PATCH 15/15] release on all errors --- Malmo/src/TCPServer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Malmo/src/TCPServer.cpp b/Malmo/src/TCPServer.cpp index 79843a7d3..232bf847c 100755 --- a/Malmo/src/TCPServer.cpp +++ b/Malmo/src/TCPServer.cpp @@ -104,13 +104,15 @@ namespace malmo boost::shared_ptr new_connection, const boost::system::error_code& error) { + // On closing or on error release scope of async io processing which can be us. + if (!error) { if (this->closing) { new_connection.get()->getSocket().close(); if (this->scope != nullptr) - this->scope->release(); // Release scope which can be self. + this->scope->release(); } else { new_connection->read(); @@ -121,15 +123,15 @@ namespace malmo else { if (this->scope != nullptr) - this->scope->release(); // Release scope which can be self. + this->scope->release(); } } } else { LOGERROR(LT("TCPServer::handleAccept("), this->log_name, LT(") - "), error.message()); - if (closing && scope != nullptr) { - scope->release(); + if (this->scope != nullptr) { + this->scope->release(); } } }