Skip to content

Commit

Permalink
[filter] Allow handleXXX() and replaceXXX() filter to block multiple …
Browse files Browse the repository at this point in the history
…times for a series of events
  • Loading branch information
pajama-coder committed Sep 10, 2024
1 parent 94295fb commit dc48280
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 5 deletions.
15 changes: 15 additions & 0 deletions src/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,21 @@ class EventBuffer {
}
}

void flush_until(const std::function<bool(Event*)> &out) {
while (auto e = m_events.head()) {
m_events.remove(e);
e->m_in_buffer = false;
if (m_stats) {
if (auto data = e->as<Data>()) {
m_stats->size -= data->size();
}
}
auto ret = out(e);
e->release();
if (ret) break;
}
}

void clear() {
List<Event> events(std::move(m_events));
while (auto e = events.head()) {
Expand Down
3 changes: 2 additions & 1 deletion src/filters/handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ bool Handle::on_callback_return(const pjs::Value &result) {
Filter::output(m_deferred_event);
m_deferred_event = nullptr;
}
m_event_buffer.flush([this](Event *evt) {
m_event_buffer.flush_until([this](Event *evt) {
handle(evt);
return m_waiting;
});
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/filters/on-body.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ void OnBody::handle(Event *evt) {
} else if (evt->is<MessageEnd>() || evt->is<StreamEnd>()) {
if (m_started) {
auto body = m_body_buffer.flush();
m_started = false;
if (Handle::callback(body)) {
Handle::defer(evt);
}
m_started = false;
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/filters/on-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ void OnMessage::handle(Event *evt) {
}
auto body = m_body_buffer.flush();
pjs::Ref<Message> msg(Message::make(m_start->head(), body, tail, payload)), result;
m_start = nullptr;
if (Handle::callback(msg)) {
Handle::defer(evt);
}
m_start = nullptr;
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/filters/replace-body.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ void ReplaceBody::handle(Event *evt) {

} else if (evt->is<MessageEnd>() || evt->is<StreamEnd>()) {
pjs::Ref<Data> body = m_body_buffer.flush();
if (!Replace::callback(body)) return;
m_started = false;
if (!Replace::callback(body)) return;
Replace::pass(evt);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/filters/replace-message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ void ReplaceMessage::handle(Event *evt) {
payload = end->payload();
}
pjs::Ref<Message> msg(Message::make(m_start->head(), m_body_buffer.flush(), tail, payload));
if (!Replace::callback(msg)) return;
m_start = nullptr;
if (!Replace::callback(msg)) return;
}
}
}
Expand Down

0 comments on commit dc48280

Please sign in to comment.