Skip to content

Commit

Permalink
[fix] Fixed leaks of open files in read() filter
Browse files Browse the repository at this point in the history
  • Loading branch information
pajama-coder committed Sep 15, 2024
1 parent dc48280 commit 142241a
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 31 deletions.
3 changes: 2 additions & 1 deletion src/api/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,10 @@ void Logger::StdoutTarget::write(const Data &msg) {
if (!m_file_stream) {
m_file_stream = FileStream::make(
0,
dup(m_is_stderr ? STDERR_FILENO : STDOUT_FILENO),
m_is_stderr ? os::FileHandle::std_error() : os::FileHandle::std_output(),
&s_dp_stdout
);
m_file_stream->set_no_close();
}
Data *buf = Data::make();
s_dp.push(buf, &msg);
Expand Down
7 changes: 2 additions & 5 deletions src/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ void File::open_read(int seek, int size, const std::function<void(FileStream*)>
net->post(
[=]() {
m_f = f;
m_stream = FileStream::make(size, f.get(), &s_dp);
m_stream = FileStream::make(size, f, &s_dp);
if (is_std) m_stream->set_no_close();
if (m_closed) {
close();
}
cb(m_stream);
m_open_signal->fire();
release();
Expand Down Expand Up @@ -124,7 +121,7 @@ void File::open_write(bool append) {
InputContext ic;
m_f = f;
m_writing = true;
m_stream = FileStream::make(0, f.get(), &s_dp);
m_stream = FileStream::make(0, f, &s_dp);
if (is_std) m_stream->set_no_close();
if (!m_buffer.empty()) {
m_stream->input()->input(Data::make(m_buffer));
Expand Down
8 changes: 4 additions & 4 deletions src/filters/exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ bool Exec::exec_argv(const std::list<std::string> &args) {

fcntl(master_fd, F_SETFL, fcntl(master_fd, F_GETFL, 0) | O_NONBLOCK);

m_stdout = m_stdin = FileStream::make(-1, master_fd, &s_dp);
m_stdout = m_stdin = FileStream::make(-1, os::FileHandle(master_fd, "r+"), &s_dp);
m_stdout->chain(m_stdout_reader.input());

} else {
Expand All @@ -262,9 +262,9 @@ bool Exec::exec_argv(const std::list<std::string> &args) {
pipe(out);
pipe(err);

m_stdin = FileStream::make(0, in[1], &s_dp);
m_stdout = FileStream::make(-1, out[0], &s_dp);
m_stderr = FileStream::make(-1, err[0], &s_dp);
m_stdin = FileStream::make(0, os::FileHandle(in[1], "w"), &s_dp);
m_stdout = FileStream::make(-1, os::FileHandle(out[0], "r"), &s_dp);
m_stderr = FileStream::make(-1, os::FileHandle(err[0], "r"), &s_dp);
m_stdout->chain(m_stdout_reader.input());
m_stderr->chain(m_stderr_reader.input());

Expand Down
34 changes: 19 additions & 15 deletions src/fstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,35 @@

namespace pipy {

FileStream::FileStream(int read_size, handle_t fd, Data::Producer *dp)
FileStream::FileStream(int read_size, os::FileHandle fd, Data::Producer *dp)
: FlushTarget(true)
, m_stream(Net::context(), fd)
, m_stream(Net::context(), fd.get())
, m_fd(fd)
, m_dp(dp)
, m_read_size(read_size)
{
read();
}

void FileStream::close() {
if (m_closed) return;

std::error_code ec;
if (m_no_close) {
m_stream.release();
} else {
m_stream.close(ec);
}
m_stream.release();
if (!m_no_close) m_fd.close();

if (m_receiving_state == PAUSED) {
m_receiving_state = RECEIVING;
release();
}

if (ec) {
Log::error("FileStream: %p, error closing stream [fd = %d], %s", this, m_fd, ec.message().c_str());
Log::error("FileStream: %p, error closing stream [fd = %d], %s", this, m_fd.get(), ec.message().c_str());
} else if (Log::is_enabled(Log::FILES)) {
Log::debug(Log::FILES, "FileStream: %p, stream closed [fd = %d]", this, m_fd);
Log::debug(Log::FILES, "FileStream: %p, stream closed [fd = %d]", this, m_fd.get());
}

m_closed = true;
}

void FileStream::on_event(Event *evt) {
Expand Down Expand Up @@ -120,14 +122,16 @@ void FileStream::read() {
}

if (read_end || ec == asio::error::eof || ec == asio::error::broken_pipe) {
Log::debug(Log::FILES, "FileStream: %p, end of stream [fd = %d]", this, m_fd);
Log::debug(Log::FILES, "FileStream: %p, end of stream [fd = %d]", this, m_fd.get());
output(StreamEnd::make(StreamEnd::NO_ERROR));
close();
} else if (ec && ec != asio::error::operation_aborted) {
auto msg = ec.message();
Log::warn(
"FileStream: %p, error reading from stream [fd = %d]: %s",
this, m_fd, msg.c_str());
this, m_fd.get(), msg.c_str());
output(StreamEnd::make(StreamEnd::READ_ERROR));
close();

} else if (m_receiving_state == PAUSING) {
m_receiving_state = PAUSED;
Expand Down Expand Up @@ -157,13 +161,13 @@ void FileStream::read() {
}

void FileStream::write(Data *data) {
if (!m_ended) {
if (!m_closed && !m_ended) {
if (!data->empty()) {
if (!m_overflowed) {
if (m_buffer_limit > 0 && m_buffer.size() >= m_buffer_limit) {
Log::error(
"FileStream: %p, buffer overflow, size = %d, fd = %d",
this, m_fd, m_buffer.size());
this, m_fd.get(), m_buffer.size());
m_overflowed = true;
}
}
Expand All @@ -177,7 +181,7 @@ void FileStream::write(Data *data) {
}

void FileStream::end() {
if (!m_ended) {
if (!m_closed && !m_ended) {
m_ended = true;
if (m_buffer.empty()) {
close();
Expand All @@ -200,7 +204,7 @@ void FileStream::pump() {
auto msg = ec.message();
Log::warn(
"FileStream: %p, error writing to stream [fd = %d], %s",
this, m_fd, msg.c_str());
this, m_fd.get(), msg.c_str());
m_buffer.clear();

} else {
Expand Down
12 changes: 6 additions & 6 deletions src/fstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "net.hpp"
#include "event.hpp"
#include "input.hpp"
#include "os-platform.hpp"

#include <stdio.h>

Expand All @@ -49,24 +50,22 @@ class FileStream :
{
public:
#ifdef _WIN32
typedef HANDLE handle_t;
typedef asio::windows::random_access_handle stream_t;
#else
typedef int handle_t;
typedef asio::posix::stream_descriptor stream_t;
#endif

static auto make(int read_size, handle_t fd, Data::Producer *dp) -> FileStream* {
static auto make(int read_size, os::FileHandle fd, Data::Producer *dp) -> FileStream* {
return new FileStream(read_size, fd, dp);
}

auto fd() const -> handle_t { return m_fd; }
auto fd() const -> os::FileHandle { return m_fd; }
void set_no_close() { m_no_close = true; }
void set_buffer_limit(size_t size) { m_buffer_limit = size; }
void close();

private:
FileStream(int read_size, handle_t fd, Data::Producer *dp);
FileStream(int read_size, os::FileHandle fd, Data::Producer *dp);

virtual void on_event(Event *evt) override;
virtual void on_flush() override;
Expand All @@ -80,7 +79,7 @@ class FileStream :
};

stream_t m_stream;
handle_t m_fd;
os::FileHandle m_fd;
Data::Producer* m_dp;
Data m_buffer;
size_t m_buffer_limit = 0;
Expand All @@ -91,6 +90,7 @@ class FileStream :
bool m_overflowed = false;
bool m_pumping = false;
bool m_ended = false;
bool m_closed = false;

void read();
void write(Data *data);
Expand Down
4 changes: 4 additions & 0 deletions src/os-platform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ void kill(int pid, int sig) {
::kill(pid, sig);
}

FileHandle::FileHandle(int fd, const char *mode) {
m_file = fdopen(fd, mode);
}

auto FileHandle::std_input() -> FileHandle {
return FileHandle(stdin);
}
Expand Down
1 change: 1 addition & 0 deletions src/os-platform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ namespace os {
class FileHandle {
public:
FileHandle() : m_file(nullptr) {}
FileHandle(int fd, const char *mode);

static auto std_input() -> FileHandle;
static auto std_output() -> FileHandle;
Expand Down

0 comments on commit 142241a

Please sign in to comment.