From 142241a5c00675a603db92dc77f5ac335b2ac33c Mon Sep 17 00:00:00 2001 From: pajama-coder Date: Sun, 15 Sep 2024 20:42:17 +0800 Subject: [PATCH] [fix] Fixed leaks of open files in read() filter --- src/api/logging.cpp | 3 ++- src/file.cpp | 7 ++----- src/filters/exec.cpp | 8 ++++---- src/fstream.cpp | 34 +++++++++++++++++++--------------- src/fstream.hpp | 12 ++++++------ src/os-platform.cpp | 4 ++++ src/os-platform.hpp | 1 + 7 files changed, 38 insertions(+), 31 deletions(-) diff --git a/src/api/logging.cpp b/src/api/logging.cpp index a9cff6dc..c6554ee6 100644 --- a/src/api/logging.cpp +++ b/src/api/logging.cpp @@ -279,9 +279,10 @@ void Logger::StdoutTarget::write(const Data &msg) { if (!m_file_stream) { m_file_stream = FileStream::make( 0, - dup(m_is_stderr ? STDERR_FILENO : STDOUT_FILENO), + m_is_stderr ? os::FileHandle::std_error() : os::FileHandle::std_output(), &s_dp_stdout ); + m_file_stream->set_no_close(); } Data *buf = Data::make(); s_dp.push(buf, &msg); diff --git a/src/file.cpp b/src/file.cpp index 0f052920..80be2aa5 100644 --- a/src/file.cpp +++ b/src/file.cpp @@ -64,11 +64,8 @@ void File::open_read(int seek, int size, const std::function net->post( [=]() { m_f = f; - m_stream = FileStream::make(size, f.get(), &s_dp); + m_stream = FileStream::make(size, f, &s_dp); if (is_std) m_stream->set_no_close(); - if (m_closed) { - close(); - } cb(m_stream); m_open_signal->fire(); release(); @@ -124,7 +121,7 @@ void File::open_write(bool append) { InputContext ic; m_f = f; m_writing = true; - m_stream = FileStream::make(0, f.get(), &s_dp); + m_stream = FileStream::make(0, f, &s_dp); if (is_std) m_stream->set_no_close(); if (!m_buffer.empty()) { m_stream->input()->input(Data::make(m_buffer)); diff --git a/src/filters/exec.cpp b/src/filters/exec.cpp index 3aab928b..d39e0ceb 100644 --- a/src/filters/exec.cpp +++ b/src/filters/exec.cpp @@ -253,7 +253,7 @@ bool Exec::exec_argv(const std::list &args) { fcntl(master_fd, F_SETFL, fcntl(master_fd, F_GETFL, 0) | O_NONBLOCK); - m_stdout = m_stdin = FileStream::make(-1, master_fd, &s_dp); + m_stdout = m_stdin = FileStream::make(-1, os::FileHandle(master_fd, "r+"), &s_dp); m_stdout->chain(m_stdout_reader.input()); } else { @@ -262,9 +262,9 @@ bool Exec::exec_argv(const std::list &args) { pipe(out); pipe(err); - m_stdin = FileStream::make(0, in[1], &s_dp); - m_stdout = FileStream::make(-1, out[0], &s_dp); - m_stderr = FileStream::make(-1, err[0], &s_dp); + m_stdin = FileStream::make(0, os::FileHandle(in[1], "w"), &s_dp); + m_stdout = FileStream::make(-1, os::FileHandle(out[0], "r"), &s_dp); + m_stderr = FileStream::make(-1, os::FileHandle(err[0], "r"), &s_dp); m_stdout->chain(m_stdout_reader.input()); m_stderr->chain(m_stderr_reader.input()); diff --git a/src/fstream.cpp b/src/fstream.cpp index e3534470..e1865d16 100644 --- a/src/fstream.cpp +++ b/src/fstream.cpp @@ -32,9 +32,10 @@ namespace pipy { -FileStream::FileStream(int read_size, handle_t fd, Data::Producer *dp) +FileStream::FileStream(int read_size, os::FileHandle fd, Data::Producer *dp) : FlushTarget(true) - , m_stream(Net::context(), fd) + , m_stream(Net::context(), fd.get()) + , m_fd(fd) , m_dp(dp) , m_read_size(read_size) { @@ -42,12 +43,11 @@ FileStream::FileStream(int read_size, handle_t fd, Data::Producer *dp) } void FileStream::close() { + if (m_closed) return; + std::error_code ec; - if (m_no_close) { - m_stream.release(); - } else { - m_stream.close(ec); - } + m_stream.release(); + if (!m_no_close) m_fd.close(); if (m_receiving_state == PAUSED) { m_receiving_state = RECEIVING; @@ -55,10 +55,12 @@ void FileStream::close() { } if (ec) { - Log::error("FileStream: %p, error closing stream [fd = %d], %s", this, m_fd, ec.message().c_str()); + Log::error("FileStream: %p, error closing stream [fd = %d], %s", this, m_fd.get(), ec.message().c_str()); } else if (Log::is_enabled(Log::FILES)) { - Log::debug(Log::FILES, "FileStream: %p, stream closed [fd = %d]", this, m_fd); + Log::debug(Log::FILES, "FileStream: %p, stream closed [fd = %d]", this, m_fd.get()); } + + m_closed = true; } void FileStream::on_event(Event *evt) { @@ -120,14 +122,16 @@ void FileStream::read() { } if (read_end || ec == asio::error::eof || ec == asio::error::broken_pipe) { - Log::debug(Log::FILES, "FileStream: %p, end of stream [fd = %d]", this, m_fd); + Log::debug(Log::FILES, "FileStream: %p, end of stream [fd = %d]", this, m_fd.get()); output(StreamEnd::make(StreamEnd::NO_ERROR)); + close(); } else if (ec && ec != asio::error::operation_aborted) { auto msg = ec.message(); Log::warn( "FileStream: %p, error reading from stream [fd = %d]: %s", - this, m_fd, msg.c_str()); + this, m_fd.get(), msg.c_str()); output(StreamEnd::make(StreamEnd::READ_ERROR)); + close(); } else if (m_receiving_state == PAUSING) { m_receiving_state = PAUSED; @@ -157,13 +161,13 @@ void FileStream::read() { } void FileStream::write(Data *data) { - if (!m_ended) { + if (!m_closed && !m_ended) { if (!data->empty()) { if (!m_overflowed) { if (m_buffer_limit > 0 && m_buffer.size() >= m_buffer_limit) { Log::error( "FileStream: %p, buffer overflow, size = %d, fd = %d", - this, m_fd, m_buffer.size()); + this, m_fd.get(), m_buffer.size()); m_overflowed = true; } } @@ -177,7 +181,7 @@ void FileStream::write(Data *data) { } void FileStream::end() { - if (!m_ended) { + if (!m_closed && !m_ended) { m_ended = true; if (m_buffer.empty()) { close(); @@ -200,7 +204,7 @@ void FileStream::pump() { auto msg = ec.message(); Log::warn( "FileStream: %p, error writing to stream [fd = %d], %s", - this, m_fd, msg.c_str()); + this, m_fd.get(), msg.c_str()); m_buffer.clear(); } else { diff --git a/src/fstream.hpp b/src/fstream.hpp index 01b7daa7..9190bf60 100644 --- a/src/fstream.hpp +++ b/src/fstream.hpp @@ -29,6 +29,7 @@ #include "net.hpp" #include "event.hpp" #include "input.hpp" +#include "os-platform.hpp" #include @@ -49,24 +50,22 @@ class FileStream : { public: #ifdef _WIN32 - typedef HANDLE handle_t; typedef asio::windows::random_access_handle stream_t; #else - typedef int handle_t; typedef asio::posix::stream_descriptor stream_t; #endif - static auto make(int read_size, handle_t fd, Data::Producer *dp) -> FileStream* { + static auto make(int read_size, os::FileHandle fd, Data::Producer *dp) -> FileStream* { return new FileStream(read_size, fd, dp); } - auto fd() const -> handle_t { return m_fd; } + auto fd() const -> os::FileHandle { return m_fd; } void set_no_close() { m_no_close = true; } void set_buffer_limit(size_t size) { m_buffer_limit = size; } void close(); private: - FileStream(int read_size, handle_t fd, Data::Producer *dp); + FileStream(int read_size, os::FileHandle fd, Data::Producer *dp); virtual void on_event(Event *evt) override; virtual void on_flush() override; @@ -80,7 +79,7 @@ class FileStream : }; stream_t m_stream; - handle_t m_fd; + os::FileHandle m_fd; Data::Producer* m_dp; Data m_buffer; size_t m_buffer_limit = 0; @@ -91,6 +90,7 @@ class FileStream : bool m_overflowed = false; bool m_pumping = false; bool m_ended = false; + bool m_closed = false; void read(); void write(Data *data); diff --git a/src/os-platform.cpp b/src/os-platform.cpp index 4c090743..adb7aee0 100644 --- a/src/os-platform.cpp +++ b/src/os-platform.cpp @@ -460,6 +460,10 @@ void kill(int pid, int sig) { ::kill(pid, sig); } +FileHandle::FileHandle(int fd, const char *mode) { + m_file = fdopen(fd, mode); +} + auto FileHandle::std_input() -> FileHandle { return FileHandle(stdin); } diff --git a/src/os-platform.hpp b/src/os-platform.hpp index 5584acd4..f16cb2df 100644 --- a/src/os-platform.hpp +++ b/src/os-platform.hpp @@ -122,6 +122,7 @@ namespace os { class FileHandle { public: FileHandle() : m_file(nullptr) {} + FileHandle(int fd, const char *mode); static auto std_input() -> FileHandle; static auto std_output() -> FileHandle;