Skip to content

Commit

Permalink
adding the capability of creating an alarm with a given scope
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk committed Aug 20, 2019
1 parent 817b2e3 commit 19c6422
Show file tree
Hide file tree
Showing 55 changed files with 651 additions and 578 deletions.
13 changes: 11 additions & 2 deletions include/envoy/event/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
#include "envoy/common/time.h"

namespace Envoy {

class ScopeTrackedObject;

namespace Event {

class Dispatcher;

/**
* Callback invoked when a timer event fires.
*/
Expand All @@ -30,8 +35,12 @@ class Timer {

/**
* Enable a pending timeout. If a timeout is already pending, it will be reset to the new timeout.
*
* @param ms supplies the duration of the alarm in milliseconds.
* @param object supplies an optional scope for the duration of the alarm.
*/
virtual void enableTimer(const std::chrono::milliseconds& d) PURE;
virtual void enableTimer(const std::chrono::milliseconds& ms,
const ScopeTrackedObject* object = nullptr) PURE;

/**
* Return whether the timer is currently armed.
Expand All @@ -48,7 +57,7 @@ class Scheduler {
/**
* Creates a timer.
*/
virtual TimerPtr createTimer(const TimerCb& cb) PURE;
virtual TimerPtr createTimer(const TimerCb& cb, Dispatcher& dispatcher) PURE;
};

using SchedulerPtr = std::unique_ptr<Scheduler>;
Expand Down
1 change: 1 addition & 0 deletions source/common/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ envoy_cc_library(
":event_impl_base_lib",
":libevent_lib",
"//include/envoy/event:timer_interface",
"//source/common/common:scope_tracker",
],
)

Expand Down
2 changes: 1 addition & 1 deletion source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TimerPtr DispatcherImpl::createTimer(TimerCb cb) { return createTimerInternal(cb

TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
ASSERT(isThreadSafe());
return scheduler_->createTimer(cb);
return scheduler_->createTimer(cb, *this);
}

void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
Expand Down
4 changes: 2 additions & 2 deletions source/common/event/libevent_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ LibeventScheduler::LibeventScheduler() : libevent_(event_base_new()) {
RELEASE_ASSERT(Libevent::Global::initialized(), "");
}

TimerPtr LibeventScheduler::createTimer(const TimerCb& cb) {
return std::make_unique<TimerImpl>(libevent_, cb);
TimerPtr LibeventScheduler::createTimer(const TimerCb& cb, Dispatcher& dispatcher) {
return std::make_unique<TimerImpl>(libevent_, cb, dispatcher);
};

void LibeventScheduler::run(Dispatcher::RunType mode) {
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/libevent_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class LibeventScheduler : public Scheduler {
LibeventScheduler();

// Scheduler
TimerPtr createTimer(const TimerCb& cb) override;
TimerPtr createTimer(const TimerCb& cb, Dispatcher& dispatcher) override;

/**
* Runs the event loop.
Expand Down
4 changes: 3 additions & 1 deletion source/common/event/real_time_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ namespace {
class RealScheduler : public Scheduler {
public:
RealScheduler(Scheduler& base_scheduler) : base_scheduler_(base_scheduler) {}
TimerPtr createTimer(const TimerCb& cb) override { return base_scheduler_.createTimer(cb); };
TimerPtr createTimer(const TimerCb& cb, Dispatcher& d) override {
return base_scheduler_.createTimer(cb, d);
};

private:
Scheduler& base_scheduler_;
Expand Down
18 changes: 15 additions & 3 deletions source/common/event/timer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,28 @@ void TimerUtils::millisecondsToTimeval(const std::chrono::milliseconds& d, timev
tv.tv_usec = usecs.count();
}

TimerImpl::TimerImpl(Libevent::BasePtr& libevent, TimerCb cb) : cb_(cb) {
TimerImpl::TimerImpl(Libevent::BasePtr& libevent, TimerCb cb, Dispatcher& dispatcher)
: cb_(cb), dispatcher_(dispatcher) {
ASSERT(cb_);
evtimer_assign(
&raw_event_, libevent.get(),
[](evutil_socket_t, short, void* arg) -> void { static_cast<TimerImpl*>(arg)->cb_(); }, this);
[](evutil_socket_t, short, void* arg) -> void {
TimerImpl* timer = static_cast<TimerImpl*>(arg);
if (timer->object_ == nullptr) {
timer->cb_();
return;
}
ScopeTrackerScopeState scope(timer->object_, timer->dispatcher_);
timer->object_ = nullptr;
timer->cb_();
},
this);
}

void TimerImpl::disableTimer() { event_del(&raw_event_); }

void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
void TimerImpl::enableTimer(const std::chrono::milliseconds& d, const ScopeTrackedObject* object) {
object_ = object;
if (d.count() == 0) {
event_active(&raw_event_, EV_TIMEOUT, 0);
} else {
Expand Down
7 changes: 5 additions & 2 deletions source/common/event/timer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/event/timer.h"

#include "common/common/scope_tracker.h"
#include "common/event/event_impl_base.h"
#include "common/event/libevent.h"

Expand All @@ -23,15 +24,17 @@ class TimerUtils {
*/
class TimerImpl : public Timer, ImplBase {
public:
TimerImpl(Libevent::BasePtr& libevent, TimerCb cb);
TimerImpl(Libevent::BasePtr& libevent, TimerCb cb, Event::Dispatcher& dispatcher);

// Timer
void disableTimer() override;
void enableTimer(const std::chrono::milliseconds& d) override;
void enableTimer(const std::chrono::milliseconds& d, const ScopeTrackedObject* scope) override;
bool enabled() override;

private:
TimerCb cb_;
Dispatcher& dispatcher_;
const ScopeTrackedObject* object_{};
};

} // namespace Event
Expand Down
8 changes: 4 additions & 4 deletions test/common/access_log/access_log_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ TEST_F(AccessLogManagerImplTest, flushToLogFilePeriodically) {
EXPECT_CALL(*file_, open_()).WillOnce(Return(ByMove(Filesystem::resultSuccess<bool>(true))));
AccessLogFileSharedPtr log_file = access_log_manager_.createAccessLog("foo");

EXPECT_CALL(*timer, enableTimer(timeout_40ms_));
EXPECT_CALL(*timer, enableTimer(timeout_40ms_, _));
EXPECT_CALL(*file_, write_(_))
.WillOnce(Invoke([](absl::string_view data) -> Api::IoCallSizeResult {
EXPECT_EQ(0, data.compare("test"));
Expand All @@ -83,7 +83,7 @@ TEST_F(AccessLogManagerImplTest, flushToLogFilePeriodically) {

// make sure timer is re-enabled on callback call
log_file->write("test2");
EXPECT_CALL(*timer, enableTimer(timeout_40ms_));
EXPECT_CALL(*timer, enableTimer(timeout_40ms_, _));
timer->invokeCallback();

{
Expand All @@ -101,7 +101,7 @@ TEST_F(AccessLogManagerImplTest, flushToLogFileOnDemand) {
EXPECT_CALL(*file_, open_()).WillOnce(Return(ByMove(Filesystem::resultSuccess<bool>(true))));
AccessLogFileSharedPtr log_file = access_log_manager_.createAccessLog("foo");

EXPECT_CALL(*timer, enableTimer(timeout_40ms_));
EXPECT_CALL(*timer, enableTimer(timeout_40ms_, _));

// The first write to a given file will start the flush thread, which can flush
// immediately (race on whether it will or not). So do a write and flush to
Expand Down Expand Up @@ -146,7 +146,7 @@ TEST_F(AccessLogManagerImplTest, flushToLogFileOnDemand) {

// make sure timer is re-enabled on callback call
log_file->write("test2");
EXPECT_CALL(*timer, enableTimer(timeout_40ms_));
EXPECT_CALL(*timer, enableTimer(timeout_40ms_, _));
timer->invokeCallback();
expected_writes++;

Expand Down
2 changes: 1 addition & 1 deletion test/common/config/delta_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class DeltaSubscriptionTestHarness : public SubscriptionTestHarness {

void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override {
init_timeout_timer_ = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout)));
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout), _));
}

void expectDisableInitFetchTimeoutTimer() override {
Expand Down
7 changes: 4 additions & 3 deletions test/common/config/grpc_mux_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ TEST_F(GrpcMuxImplTest, ResetStream) {
.Times(3);
EXPECT_CALL(random_, random());
ASSERT_TRUE(timer != nullptr); // initialized from dispatcher mock.
EXPECT_CALL(*timer, enableTimer(_));
EXPECT_CALL(*timer, enableTimer(_, _));
grpc_mux_->grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled, "");
EXPECT_EQ(0, control_plane_connected_state_.value());
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(&async_stream_));
Expand Down Expand Up @@ -481,7 +481,7 @@ TEST_F(GrpcMuxImplTestWithMockTimeSystem, TooManyRequestsWithEmptyRateLimitSetti
grpc_mux_->start();

// Validate that drain_request_timer is enabled when there are no tokens.
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(100)));
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(100), _));
onReceiveMessage(99);
EXPECT_EQ(1, stats_.counter("control_plane.rate_limit_enforced").value());
EXPECT_EQ(
Expand Down Expand Up @@ -541,7 +541,8 @@ TEST_F(GrpcMuxImplTest, TooManyRequestsWithCustomRateLimitSettings) {
EXPECT_EQ(0, stats_.counter("control_plane.rate_limit_enforced").value());

// Validate that drain_request_timer is enabled when there are no tokens.
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(500))).Times(AtLeast(1));
EXPECT_CALL(*drain_request_timer, enableTimer(std::chrono::milliseconds(500), _))
.Times(AtLeast(1));
onReceiveMessage(160);
EXPECT_EQ(12, stats_.counter("control_plane.rate_limit_enforced").value());
Stats::Gauge& pending_requests =
Expand Down
4 changes: 2 additions & 2 deletions test/common/config/grpc_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
EXPECT_CALL(random_, random());
EXPECT_CALL(*timer_, enableTimer(_));
EXPECT_CALL(*timer_, enableTimer(_, _));
subscription_->start({"cluster0", "cluster1"});
EXPECT_TRUE(statsAre(2, 0, 0, 1, 0, 0));
// Ensure this doesn't cause an issue by sending a request, since we don't
Expand All @@ -40,7 +40,7 @@ TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) {
EXPECT_TRUE(statsAre(1, 0, 0, 0, 0, 0));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
EXPECT_CALL(*timer_, enableTimer(_));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(random_, random());
subscription_->grpcMux().grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled,
"");
Expand Down
2 changes: 1 addition & 1 deletion test/common/config/grpc_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness {

void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override {
init_timeout_timer_ = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout)));
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout), _));
}

void expectDisableInitFetchTimeoutTimer() override {
Expand Down
4 changes: 2 additions & 2 deletions test/common/config/http_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class HttpSubscriptionImplTest : public testing::Test, public HttpSubscriptionTe
TEST_F(HttpSubscriptionImplTest, OnRequestReset) {
startSubscription({"cluster0", "cluster1"});
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
http_callbacks_->onFailure(Http::AsyncClient::FailureReason::Reset);
Expand All @@ -32,7 +32,7 @@ TEST_F(HttpSubscriptionImplTest, BadJsonRecovery) {
Http::MessagePtr message{new Http::ResponseMessageImpl(std::move(response_headers))};
message->body() = std::make_unique<Buffer::OwnedImpl>(";!@#badjso n");
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
http_callbacks_->onSuccess(std::move(message));
Expand Down
4 changes: 2 additions & 2 deletions test/common/config/http_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness {
Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, _));
}
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_));
EXPECT_CALL(*timer_, enableTimer(_, _));
http_callbacks_->onSuccess(std::move(message));
if (accept) {
version_ = version;
Expand All @@ -159,7 +159,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness {

void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override {
init_timeout_timer_ = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout)));
EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout), _));
}

void expectDisableInitFetchTimeoutTimer() override {
Expand Down
10 changes: 5 additions & 5 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ TEST_F(AsyncClientImplTest, StreamTimeout) {

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true));
timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40)));
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _));
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));

TestHeaderMapImpl expected_timeout{
Expand Down Expand Up @@ -754,7 +754,7 @@ TEST_F(AsyncClientImplTest, StreamTimeoutHeadReply) {
HttpTestUtility::addDefaultHeaders(message->headers(), "HEAD");
EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message->headers()), true));
timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40)));
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _));
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));

TestHeaderMapImpl expected_timeout{
Expand All @@ -779,7 +779,7 @@ TEST_F(AsyncClientImplTest, RequestTimeout) {
EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true));
expectSuccess(504);
timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40)));
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _));
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));
client_.send(std::move(message_), callbacks_,
AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(40)));
Expand All @@ -804,7 +804,7 @@ TEST_F(AsyncClientImplTest, DisableTimer) {

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true));
timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(200)));
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(200), _));
EXPECT_CALL(*timer_, disableTimer());
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));
AsyncClient::Request* request =
Expand All @@ -823,7 +823,7 @@ TEST_F(AsyncClientImplTest, DisableTimerWithStream) {

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&message_->headers()), true));
timer_ = new NiceMock<Event::MockTimer>(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40)));
EXPECT_CALL(*timer_, enableTimer(std::chrono::milliseconds(40), _));
EXPECT_CALL(*timer_, disableTimer());
EXPECT_CALL(stream_encoder_.stream_, resetStream(_));
EXPECT_CALL(stream_callbacks_, onReset());
Expand Down
Loading

0 comments on commit 19c6422

Please sign in to comment.