Skip to content

Commit

Permalink
fix rate limiter to avoid starvation
Browse files Browse the repository at this point in the history
Summary:
The current implementation of rate limiter has the possibility to introduce resource starvation when change its limit.
This diff aims to fix this problem by consuming request bytes partially.

Test Plan:
```
./rate_limiter_test
[==========] Running 4 tests from 1 test case.
[----------] Global test environment set-up.
[----------] 4 tests from RateLimiterTest
[ RUN      ] RateLimiterTest.OverflowRate
[       OK ] RateLimiterTest.OverflowRate (0 ms)
[ RUN      ] RateLimiterTest.StartStop
[       OK ] RateLimiterTest.StartStop (0 ms)
[ RUN      ] RateLimiterTest.Rate
request size [1 - 1023], limit 10 KB/sec, actual rate: 10.355712 KB/sec, elapsed 2.00 seconds
request size [1 - 1023], limit 20 KB/sec, actual rate: 19.136564 KB/sec, elapsed 2.00 seconds
request size [1 - 2047], limit 20 KB/sec, actual rate: 20.783976 KB/sec, elapsed 2.10 seconds
request size [1 - 2047], limit 40 KB/sec, actual rate: 39.308144 KB/sec, elapsed 2.10 seconds
request size [1 - 4095], limit 40 KB/sec, actual rate: 40.318349 KB/sec, elapsed 2.20 seconds
request size [1 - 4095], limit 80 KB/sec, actual rate: 79.667396 KB/sec, elapsed 2.20 seconds
request size [1 - 8191], limit 80 KB/sec, actual rate: 81.807158 KB/sec, elapsed 2.30 seconds
request size [1 - 8191], limit 160 KB/sec, actual rate: 160.659761 KB/sec, elapsed 2.20 seconds
request size [1 - 16383], limit 160 KB/sec, actual rate: 160.700990 KB/sec, elapsed 3.00 seconds
request size [1 - 16383], limit 320 KB/sec, actual rate: 317.639481 KB/sec, elapsed 2.50 seconds
[       OK ] RateLimiterTest.Rate (22618 ms)
[ RUN      ] RateLimiterTest.LimitChangeTest
[COMPLETE] request size 10 KB, new limit 20KB/sec, refill period 1000 ms
[COMPLETE] request size 10 KB, new limit 5KB/sec, refill period 1000 ms
[COMPLETE] request size 20 KB, new limit 40KB/sec, refill period 1000 ms
[COMPLETE] request size 20 KB, new limit 10KB/sec, refill period 1000 ms
[COMPLETE] request size 40 KB, new limit 80KB/sec, refill period 1000 ms
[COMPLETE] request size 40 KB, new limit 20KB/sec, refill period 1000 ms
[COMPLETE] request size 80 KB, new limit 160KB/sec, refill period 1000 ms
[COMPLETE] request size 80 KB, new limit 40KB/sec, refill period 1000 ms
[COMPLETE] request size 160 KB, new limit 320KB/sec, refill period 1000 ms
[COMPLETE] request size 160 KB, new limit 80KB/sec, refill period 1000 ms
[       OK ] RateLimiterTest.LimitChangeTest (5002 ms)
[----------] 4 tests from RateLimiterTest (27620 ms total)

[----------] Global test environment tear-down
[==========] 4 tests from 1 test case ran. (27621 ms total)
[  PASSED  ] 4 tests.
```

Reviewers: sdong, IslamAbdelRahman, yiwu, andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60207
  • Loading branch information
lightmark committed Jul 1, 2016
1 parent 6b71676 commit cb2476a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
15 changes: 11 additions & 4 deletions util/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
#include "util/rate_limiter.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/sync_point.h"

namespace rocksdb {


// Pending request
struct GenericRateLimiter::Req {
explicit Req(int64_t _bytes, port::Mutex* _mu)
: bytes(_bytes), cv(_mu), granted(false) {}
: request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
int64_t request_bytes;
int64_t bytes;
port::CondVar cv;
bool granted;
Expand Down Expand Up @@ -70,7 +72,7 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {

void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));

TEST_SYNC_POINT("GenericRateLimiter::Request");
MutexLock g(&request_mutex_);
if (stop_) {
return;
Expand Down Expand Up @@ -175,6 +177,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) {
}

void GenericRateLimiter::Refill() {
TEST_SYNC_POINT("GenericRateLimiter::Refill");
next_refill_us_ = env_->NowMicros() + refill_period_us_;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
Expand All @@ -189,10 +192,14 @@ void GenericRateLimiter::Refill() {
auto* queue = &queue_[use_pri];
while (!queue->empty()) {
auto* next_req = queue->front();
if (available_bytes_ < next_req->bytes) {
if (available_bytes_ < next_req->request_bytes) {
// avoid starvation
next_req->request_bytes -= available_bytes_;
available_bytes_ = 0;
break;
}
available_bytes_ -= next_req->bytes;
available_bytes_ -= next_req->request_bytes;
next_req->request_bytes = 0;
total_bytes_through_[use_pri] += next_req->bytes;
queue->pop_front();

Expand Down
56 changes: 53 additions & 3 deletions util/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
#define __STDC_FORMAT_MACROS
#endif

#include "util/rate_limiter.h"
#include <inttypes.h>
#include <limits>
#include "util/testharness.h"
#include "util/rate_limiter.h"
#include "util/random.h"
#include "rocksdb/env.h"
#include "util/random.h"
#include "util/sync_point.h"
#include "util/testharness.h"

namespace rocksdb {

Expand Down Expand Up @@ -92,6 +93,55 @@ TEST_F(RateLimiterTest, Rate) {
}
}

TEST_F(RateLimiterTest, LimitChangeTest) {
// starvation test when limit changes to a smaller value
int64_t refill_period = 1000 * 1000;
auto* env = Env::Default();
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
struct Arg {
Arg(int32_t _request_size, Env::IOPriority _pri,
std::shared_ptr<RateLimiter> _limiter)
: request_size(_request_size), pri(_pri), limiter(_limiter) {}
int32_t request_size;
Env::IOPriority pri;
std::shared_ptr<RateLimiter> limiter;
};

auto writer = [](void* p) {
auto* arg = static_cast<Arg*>(p);
arg->limiter->Request(arg->request_size, arg->pri);
};

for (uint32_t i = 1; i <= 16; i <<= 1) {
int32_t target = i * 1024 * 10;
// refill per second
for (int iter = 0; iter < 2; iter++) {
std::shared_ptr<RateLimiter> limiter =
std::make_shared<GenericRateLimiter>(target, refill_period, 10);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"GenericRateLimiter::Request",
"RateLimiterTest::LimitChangeTest:changeLimitStart"},
{"RateLimiterTest::LimitChangeTest:changeLimitEnd",
"GenericRateLimiter::Refill"}});
Arg arg(target, Env::IO_HIGH, limiter);
// The idea behind is to start a request first, then before it refills,
// update limit to a different value (2X/0.5X). No starvation should
// be guaranteed under any situation
// TODO(lightmark): more test cases are welcome.
env->StartThread(writer, &arg);
int32_t new_limit = (target << 1) >> (iter << 1);
TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
arg.limiter->SetBytesPerSecond(new_limit);
TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
env->WaitForJoin();
fprintf(stderr,
"[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
"KB/sec, refill period %" PRIi64 " ms\n",
target / 1024, new_limit / 1024, refill_period / 1000);
}
}
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down

0 comments on commit cb2476a

Please sign in to comment.