diff --git a/include/ylt/metric/detail/ckms_quantiles.hpp b/include/ylt/metric/detail/ckms_quantiles.hpp deleted file mode 100644 index 051ef8e35..000000000 --- a/include/ylt/metric/detail/ckms_quantiles.hpp +++ /dev/null @@ -1,175 +0,0 @@ -#pragma once -#include -#include -#include - -// https://github.com/jupp0r/prometheus-cpp/blob/master/core/include/prometheus/detail/ckms_quantiles.h - -namespace ylt::metric { -class CKMSQuantiles { - public: - struct Quantile { - Quantile(double quantile, double error) - : quantile(quantile), - error(error), - u(2.0 * error / (1.0 - quantile)), - v(2.0 * error / quantile) {} - - double quantile; - double error; - double u; - double v; - }; - - private: - struct Item { - double value; - int g; - int delta; - - Item(double value, int lower_delta, int delta) - : value(value), g(lower_delta), delta(delta) {} - }; - - public: - explicit CKMSQuantiles(const std::vector& quantiles) - : quantiles_(quantiles), count_(0), buffer_{}, buffer_count_(0) {} - - void insert(double value) { - buffer_[buffer_count_] = value; - ++buffer_count_; - - if (buffer_count_ == buffer_.size()) { - insertBatch(); - compress(); - } - } - - double get(double q) { - insertBatch(); - compress(); - - if (sample_.empty()) { - return 0; - } - - int rankMin = 0; - const auto desired = static_cast(q * count_); - const auto bound = desired + (allowableError(desired) / 2); - - auto it = sample_.begin(); - decltype(it) prev; - auto cur = it++; - - while (it != sample_.end()) { - prev = cur; - cur = it++; - - rankMin += prev->g; - - if (rankMin + cur->g + cur->delta > bound) { - return prev->value; - } - } - - return sample_.back().value; - } - void reset() { - count_ = 0; - sample_.clear(); - buffer_count_ = 0; - } - - private: - double allowableError(int rank) { - auto size = sample_.size(); - double minError = size + 1; - - for (const auto& q : quantiles_.get()) { - double error; - if (rank <= q.quantile * size) { - error = q.u * (size - rank); - } - else { - error = q.v * rank; - } - if (error < minError) { - minError = error; - } - } - return minError; - } - - bool insertBatch() { - if (buffer_count_ == 0) { - return false; - } - - std::sort(buffer_.begin(), buffer_.begin() + buffer_count_); - - std::size_t start = 0; - if (sample_.empty()) { - sample_.emplace_back(buffer_[0], 1, 0); - ++start; - ++count_; - } - - std::size_t idx = 0; - std::size_t item = idx++; - - for (uint16_t i = start; i < buffer_count_; ++i) { - double v = buffer_[i]; - while (idx < sample_.size() && sample_[item].value < v) { - item = idx++; - } - - if (sample_[item].value > v) { - --idx; - } - - int delta; - if (idx - 1 == 0 || idx + 1 == sample_.size()) { - delta = 0; - } - else { - delta = static_cast(std::floor(allowableError(idx + 1))) + 1; - } - - sample_.emplace(sample_.begin() + idx, v, 1, delta); - count_++; - item = idx++; - } - - buffer_count_ = 0; - return true; - } - void compress() { - if (sample_.size() < 2) { - return; - } - - std::size_t idx = 0; - std::size_t prev; - std::size_t next = idx++; - - while (idx < sample_.size()) { - prev = next; - next = idx++; - - if (sample_[prev].g + sample_[next].g + sample_[next].delta <= - allowableError(idx - 1)) { - sample_[next].g += sample_[prev].g; - sample_.erase(sample_.begin() + prev); - } - } - } - - private: - const std::reference_wrapper> quantiles_; - - uint16_t count_; - std::vector sample_; - std::array buffer_; - uint16_t buffer_count_; -}; -} // namespace ylt::metric \ No newline at end of file diff --git a/include/ylt/metric/detail/time_window_quantiles.hpp b/include/ylt/metric/detail/time_window_quantiles.hpp deleted file mode 100644 index 07debac19..000000000 --- a/include/ylt/metric/detail/time_window_quantiles.hpp +++ /dev/null @@ -1,52 +0,0 @@ -#pragma once -#include "ckms_quantiles.hpp" -// https://github.com/jupp0r/prometheus-cpp/blob/master/core/include/prometheus/detail/time_window_quantiles.h - -namespace ylt::metric { -class TimeWindowQuantiles { - using Clock = std::chrono::steady_clock; - - public: - TimeWindowQuantiles(const std::vector& quantiles, - Clock::duration max_age_seconds, uint16_t age_buckets) - : quantiles_(quantiles), - ckms_quantiles_(age_buckets, CKMSQuantiles(quantiles_)), - current_bucket_(0), - last_rotation_(Clock::now()), - rotation_interval_(max_age_seconds / age_buckets) {} - - double get(double q) const { - CKMSQuantiles& current_bucket = rotate(); - return current_bucket.get(q); - } - void insert(double value) { - rotate(); - for (auto& bucket : ckms_quantiles_) { - bucket.insert(value); - } - } - - private: - CKMSQuantiles& rotate() const { - auto delta = Clock::now() - last_rotation_; - while (delta > rotation_interval_) { - ckms_quantiles_[current_bucket_].reset(); - - if (++current_bucket_ >= ckms_quantiles_.size()) { - current_bucket_ = 0; - } - - delta -= rotation_interval_; - last_rotation_ += rotation_interval_; - } - return ckms_quantiles_[current_bucket_]; - } - - const std::vector& quantiles_; - mutable std::vector ckms_quantiles_; - mutable uint16_t current_bucket_; - - mutable Clock::time_point last_rotation_; - const Clock::duration rotation_interval_; -}; -} // namespace ylt::metric \ No newline at end of file diff --git a/include/ylt/metric/metric.hpp b/include/ylt/metric/metric.hpp index f37de8cab..794c292c8 100644 --- a/include/ylt/metric/metric.hpp +++ b/include/ylt/metric/metric.hpp @@ -166,19 +166,6 @@ class metric_t { virtual void serialize_to_json(std::string& str) {} #endif - // only for summary - virtual async_simple::coro::Lazy serialize_async(std::string& out) { - co_return; - } - -#ifdef CINATRA_ENABLE_METRIC_JSON - // only for summary - virtual async_simple::coro::Lazy serialize_to_json_async( - std::string& out) { - co_return; - } -#endif - template T* as() { return dynamic_cast(this); diff --git a/include/ylt/metric/metric_manager.hpp b/include/ylt/metric/metric_manager.hpp index 8a48d5f12..b3b90c0f0 100644 --- a/include/ylt/metric/metric_manager.hpp +++ b/include/ylt/metric/metric_manager.hpp @@ -28,14 +28,8 @@ class manager_helper { const std::vector>& metrics) { std::string str; for (auto& m : metrics) { - if (m->metric_type() == MetricType::Summary) { - async_simple::coro::syncAwait(m->serialize_async(str)); - } - else { - m->serialize(str); - } + m->serialize(str); } - return str; } @@ -49,13 +43,7 @@ class manager_helper { str.append("["); for (auto& m : metrics) { size_t start = str.size(); - if (m->metric_type() == MetricType::Summary) { - async_simple::coro::syncAwait(m->serialize_to_json_async(str)); - } - else { - m->serialize_to_json(str); - } - + m->serialize_to_json(str); if (str.size() > start) str.append(","); } diff --git a/include/ylt/metric/summary.hpp b/include/ylt/metric/summary.hpp index 661b52eb7..497917797 100644 --- a/include/ylt/metric/summary.hpp +++ b/include/ylt/metric/summary.hpp @@ -1,9 +1,13 @@ #pragma once #include #include +#include +#include +#include -#include "detail/time_window_quantiles.hpp" +#include "counter.hpp" #include "metric.hpp" +#include "summary_impl.hpp" #if __has_include("ylt/util/concurrentqueue.h") #include "ylt/util/concurrentqueue.h" #else @@ -28,114 +32,63 @@ struct json_summary_t { YLT_REFL(json_summary_t, name, help, type, metrics); #endif -struct block_t { - std::atomic is_coro_started_ = false; - std::atomic stop_ = false; - ylt::detail::moodycamel::ConcurrentQueue sample_queue_; - std::shared_ptr quantile_values_; - std::uint64_t count_; - double sum_; -}; - class summary_t : public static_metric { public: - using Quantiles = std::vector; - summary_t(std::string name, std::string help, Quantiles quantiles, - std::chrono::milliseconds max_age = std::chrono::seconds{60}, - uint16_t age_buckets = 5) - : quantiles_{std::move(quantiles)}, - static_metric(MetricType::Summary, std::move(name), std::move(help)) { - init_no_label(max_age, age_buckets); + summary_t(std::string name, std::string help, std::vector quantiles, + std::chrono::seconds max_age = std::chrono::seconds{60}) + : static_metric(MetricType::Summary, std::move(name), std::move(help)), + quantiles_(std::move(quantiles)), + impl_(quantiles_, + std::chrono::duration_cast(max_age)) { + if (!std::is_sorted(quantiles_.begin(), quantiles_.end())) + std::sort(quantiles_.begin(), quantiles_.end()); + g_user_metric_count++; } - summary_t(std::string name, std::string help, Quantiles quantiles, + summary_t(std::string name, std::string help, std::vector quantiles, std::map static_labels, - std::chrono::milliseconds max_age = std::chrono::seconds{60}, - uint16_t age_buckets = 5) - : quantiles_{std::move(quantiles)}, - static_metric(MetricType::Summary, std::move(name), std::move(help), - std::move(static_labels)) { - init_no_label(max_age, age_buckets); - } - - ~summary_t() { - if (block_) { - block_->stop_ = true; - } + std::chrono::seconds max_age = std::chrono::seconds{60}) + : static_metric(MetricType::Summary, std::move(name), std::move(help), + std::move(static_labels)), + quantiles_(std::move(quantiles)), + impl_(quantiles_, + std::chrono::duration_cast(max_age)) { + if (!std::is_sorted(quantiles_.begin(), quantiles_.end())) + std::sort(quantiles_.begin(), quantiles_.end()); + g_user_metric_count++; } - void observe(double value) { - if (!has_observe_) [[unlikely]] { - has_observe_ = true; - } - int64_t max_limit = (std::min)(ylt_label_capacity, (int64_t)1000000); - if (block_->sample_queue_.size_approx() >= max_limit) { - g_summary_failed_count++; - return; - } - block_->sample_queue_.enqueue(value); + void observe(float value) { impl_.insert(value); } - bool expected = false; - if (block_->is_coro_started_.compare_exchange_strong(expected, true)) { - start(block_).via(excutor_->get_executor()).start([](auto &&) { - }); - } + std::vector get_rates() { + uint64_t count; + double sum; + return get_rates(sum, count); } - - async_simple::coro::Lazy> get_rates(double &sum, - uint64_t &count) { - std::vector vec; - if (quantiles_.empty()) { - co_return std::vector{}; - } - - co_await coro_io::post( - [this, &vec, &sum, &count] { - sum = block_->sum_; - count = block_->count_; - for (const auto &quantile : quantiles_) { - vec.push_back(block_->quantile_values_->get(quantile.quantile)); - } - }, - excutor_->get_executor()); - - co_return vec; + std::vector get_rates(uint64_t& count) { + double sum; + return get_rates(sum, count); } - - async_simple::coro::Lazy get_sum() { - auto ret = co_await coro_io::post( - [this] { - return block_->sum_; - }, - excutor_->get_executor()); - co_return ret.value(); + std::vector get_rates(double& sum) { + uint64_t count; + return get_rates(sum, count); } - async_simple::coro::Lazy get_count() { - auto ret = co_await coro_io::post( - [this] { - return block_->count_; - }, - excutor_->get_executor()); - co_return ret.value(); + std::vector get_rates(double& sum, uint64_t& count) { + return impl_.stat(sum, count); } - size_t size_approx() { return block_->sample_queue_.size_approx(); } - - async_simple::coro::Lazy serialize_async(std::string &str) override { + virtual void serialize(std::string& str) override { if (quantiles_.empty()) { - co_return; - } - - if (!has_observe_) { - co_return; + return; } - - serialize_head(str); - double sum = 0; uint64_t count = 0; - auto rates = co_await get_rates(sum, count); + auto rates = get_rates(sum, count); + if (count == 0) { + return; + } + serialize_head(str); for (size_t i = 0; i < quantiles_.size(); i++) { str.append(name_); @@ -146,7 +99,7 @@ class summary_t : public static_metric { } str.append("quantile=\""); - str.append(std::to_string(quantiles_[i].quantile)).append("\"} "); + str.append(std::to_string(quantiles_[i])).append("\"} "); str.append(std::to_string(rates[i])).append("\n"); } @@ -158,20 +111,19 @@ class summary_t : public static_metric { } #ifdef CINATRA_ENABLE_METRIC_JSON - async_simple::coro::Lazy serialize_to_json_async( - std::string &str) override { + virtual void serialize_to_json(std::string& str) override { if (quantiles_.empty()) { - co_return; + return; } - if (!has_observe_) { - co_return; + double sum = 0; + uint64_t count = 0; + auto rates = get_rates(sum, count); + if (count == 0) { + return; } json_summary_t summary{name_, help_, std::string(metric_name())}; - double sum = 0; - uint64_t count = 0; - auto rates = co_await get_rates(sum, count); json_summary_metric_t metric; @@ -179,7 +131,7 @@ class summary_t : public static_metric { for (size_t i = 0; i < labels_name_.size(); i++) { metric.labels[labels_name_[i]] = labels_value_[i]; } - metric.quantiles.emplace(quantiles_[i].quantile, rates[i]); + metric.quantiles.emplace(quantiles_[i], rates[i]); } metric.sum = sum; @@ -190,260 +142,84 @@ class summary_t : public static_metric { iguana::to_json(summary, str); } #endif - private: - template - void init_block(std::shared_ptr &block) { - block = std::make_shared(); - start(block).via(excutor_->get_executor()).start([](auto &&) { - }); - } - - void init_no_label(std::chrono::milliseconds max_age, uint16_t age_buckets) { - init_block(block_); - block_->quantile_values_ = - std::make_shared(quantiles_, max_age, age_buckets); - g_user_metric_count++; - } - - async_simple::coro::Lazy start(std::shared_ptr block) { - double sample; - size_t count = 100000; - while (!block->stop_) { - size_t index = 0; - while (block->sample_queue_.try_dequeue(sample)) { - block->quantile_values_->insert(sample); - block->count_ += 1; - block->sum_ += sample; - index++; - if (index == count) { - break; - } - } - - if (block->sample_queue_.size_approx() == 0) { - block_->is_coro_started_ = false; - if (block->sample_queue_.size_approx() == 0) { - break; - } - bool expected = false; - if (!block_->is_coro_started_.compare_exchange_strong(expected, true)) { - break; - } + private: + std::vector quantiles_; + ylt::metric::detail::summary_impl<> impl_; +}; - continue; +template +class basic_dynamic_summary : public dynamic_metric { + private: + auto visit(const std::array& labels_value) { + decltype(label_quantile_values_.begin()) iter; + bool has_inserted; + { + std::lock_guard guard(mutex_); + std::tie(iter, has_inserted) = + label_quantile_values_.try_emplace(labels_value, nullptr); + if (has_inserted) { + iter->second = std::make_unique>(quantiles_); } - - co_await async_simple::coro::Yield{}; } - - co_return; + return iter; } - Quantiles quantiles_; // readonly - std::shared_ptr block_; - static inline std::shared_ptr excutor_ = - coro_io::create_io_context_pool(1); - bool has_observe_ = false; -}; - -template -struct summary_label_sample { - std::array labels_value; - double value; -}; - -struct sum_and_count_t { - double sum; - uint64_t count; -}; - -template -struct labels_block_t { - summary_t::Quantiles quantiles_; // readonly - std::chrono::milliseconds max_age_; - uint16_t age_buckets_; - std::atomic is_coro_started_ = false; - std::atomic stop_ = false; - ylt::detail::moodycamel::ConcurrentQueue> - sample_queue_; - dynamic_metric_hash_map, - std::shared_ptr> - label_quantile_values_; - dynamic_metric_hash_map, sum_and_count_t> - sum_and_count_; -}; - -template -class basic_dynamic_summary : public dynamic_metric { public: - using Quantiles = std::vector; - basic_dynamic_summary( - std::string name, std::string help, Quantiles quantiles, + std::string name, std::string help, std::vector quantiles, std::array labels_name, - std::chrono::milliseconds max_age = std::chrono::seconds{60}, - uint16_t age_buckets = 5) + std::chrono::milliseconds max_age = std::chrono::seconds{60}) : dynamic_metric(MetricType::Summary, std::move(name), std::move(help), - std::move(labels_name)) { - labels_block_ = std::make_shared>(); - labels_block_->quantiles_ = std::move(quantiles); - labels_block_->max_age_ = max_age; - labels_block_->age_buckets_ = age_buckets; - - start(labels_block_).via(excutor_->get_executor()).start([](auto &&) { - }); - + std::move(labels_name)), + quantiles_(std::move(quantiles)), + max_age_(max_age) { + if (!std::is_sorted(quantiles_.begin(), quantiles_.end())) + std::sort(quantiles_.begin(), quantiles_.end()); g_user_metric_count++; } - ~basic_dynamic_summary() { - if (labels_block_) { - labels_block_->stop_ = true; - } - } - - void observe(std::array labels_value, double value) { - if (!has_observe_) [[unlikely]] { - has_observe_ = true; - } - int64_t max_limit = (std::min)(ylt_label_capacity, (int64_t)1000000); - if (labels_block_->sample_queue_.size_approx() >= max_limit) { - g_summary_failed_count++; - return; - } - labels_block_->sample_queue_.enqueue({std::move(labels_value), value}); - - bool expected = false; - if (labels_block_->is_coro_started_.compare_exchange_strong(expected, - true)) { - start(labels_block_).via(excutor_->get_executor()).start([](auto &&) { - }); - } + void observe(const std::array& labels_value, float value) { + visit(labels_value)->second->insert(value); } - size_t size_approx() { return labels_block_->sample_queue_.size_approx(); } - - size_t label_value_count() override { - auto block = labels_block_; - return async_simple::coro::syncAwait(coro_io::post([block] { - return block->sum_and_count_.size(); - })) - .value(); + std::vector get_rates(const std::array& labels_value) { + double sum; + uint64_t count; + return visit(labels_value)->second->get_rates(sum, count); } - async_simple::coro::Lazy> get_rates( - const std::array &labels_value, double &sum, - uint64_t &count) { - std::vector vec; - if (labels_block_->quantiles_.empty()) { - co_return std::vector{}; - } - - co_await coro_io::post( - [this, &vec, &sum, &count, &labels_value] { - auto it = labels_block_->label_quantile_values_.find(labels_value); - if (it == labels_block_->label_quantile_values_.end()) { - return; - } - sum = labels_block_->sum_and_count_[labels_value].sum; - count = labels_block_->sum_and_count_[labels_value].count; - for (const auto &quantile : labels_block_->quantiles_) { - vec.push_back(it->second->get(quantile.quantile)); - } - }, - excutor_->get_executor()); - - co_return vec; + std::vector get_rates(const std::array& labels_value, + uint64_t& count) { + double sum; + return visit(labels_value)->second->get_rates(sum, count); } - async_simple::coro::Lazy serialize_async(std::string &str) override { - co_await serialize_async_with_label(str); + std::vector get_rates(const std::array& labels_value, + double& sum) { + uint64_t count; + return visit(labels_value)->second->get_rates(sum, count); } -#ifdef CINATRA_ENABLE_METRIC_JSON - async_simple::coro::Lazy serialize_to_json_async( - std::string &str) override { - co_await serialize_to_json_with_label_async(str); + std::vector get_rates(const std::array& labels_value, + double& sum, uint64_t& count) { + return visit(labels_value)->second->stat(sum, count); } -#endif - private: - async_simple::coro::Lazy start( - std::shared_ptr> label_block) { - summary_label_sample sample; - size_t count = 100000; - while (!label_block->stop_) { - size_t index = 0; - while (label_block->sample_queue_.try_dequeue(sample)) { - auto &ptr = label_block->label_quantile_values_[sample.labels_value]; - - if (ptr == nullptr) { - ptr = std::make_shared( - label_block->quantiles_, label_block->max_age_, - label_block->age_buckets_); - } - - ptr->insert(sample.value); - - label_block->sum_and_count_[sample.labels_value].count += 1; - label_block->sum_and_count_[sample.labels_value].sum += sample.value; - index++; - if (index == count) { - break; - } - } - - co_await async_simple::coro::Yield{}; - if (label_block->sample_queue_.size_approx() == 0) { - label_block->is_coro_started_ = false; - if (label_block->sample_queue_.size_approx() == 0) { - break; - } - - bool expected = false; - if (!label_block->is_coro_started_.compare_exchange_strong(expected, - true)) { - break; - } - - continue; - } - co_await async_simple::coro::Yield{}; - } - - co_return; - } - - async_simple::coro::Lazy serialize_async_with_label(std::string &str) { - if (labels_block_->quantiles_.empty()) { - co_return; - } - - if (!has_observe_) { - co_return; - } - - serialize_head(str); - - auto sum_map = co_await coro_io::post( - [this] { - return labels_block_->sum_and_count_; - }, - excutor_->get_executor()); - - for (auto &[labels_value, _] : sum_map.value()) { - double sum = 0; - uint64_t count = 0; - auto rates = co_await get_rates(labels_value, sum, count); - for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) { + virtual void serialize(std::string& str) override { + double sum = 0; + uint64_t count = 0; + std::lock_guard guard(mutex_); + // TODO: copy pointer to avoid big lock + for (auto& [labels_value, summary_value] : label_quantile_values_) { + auto rates = summary_value->stat(sum, count); + for (size_t i = 0; i < quantiles_.size(); i++) { str.append(name_); str.append("{"); build_label_string(str, labels_name_, labels_value); str.append(","); str.append("quantile=\""); - str.append(std::to_string(labels_block_->quantiles_[i].quantile)) - .append("\"} "); + str.append(std::to_string(quantiles_[i])).append("\"} "); str.append(std::to_string(rates[i])).append("\n"); } @@ -462,37 +238,21 @@ class basic_dynamic_summary : public dynamic_metric { } #ifdef CINATRA_ENABLE_METRIC_JSON - async_simple::coro::Lazy serialize_to_json_with_label_async( - std::string &str) { - if (labels_block_->quantiles_.empty()) { - co_return; - } - - if (!has_observe_) { - co_return; - } - - auto sum_map = co_await coro_io::post( - [this] { - return labels_block_->sum_and_count_; - }, - excutor_->get_executor()); - + virtual void serialize_to_json(std::string& str) override { json_summary_t summary{name_, help_, std::string(metric_name())}; - for (auto &[labels_value, _] : sum_map.value()) { + for (auto& [labels_value, summary_value] : label_quantile_values_) { json_summary_metric_t metric; double sum = 0; uint64_t count = 0; - auto rates = co_await get_rates(labels_value, sum, count); + auto rates = summary_value->stat(sum, count); metric.count = count; metric.sum = sum; - for (size_t i = 0; i < labels_block_->quantiles_.size(); i++) { + for (size_t i = 0; i < quantiles_.size(); i++) { for (size_t i = 0; i < labels_value.size(); i++) { metric.labels[labels_name_[i]] = labels_value[i]; } - metric.quantiles.emplace(labels_block_->quantiles_[i].quantile, - rates[i]); + metric.quantiles.emplace(quantiles_[i], rates[i]); } summary.metrics.push_back(std::move(metric)); @@ -501,10 +261,14 @@ class basic_dynamic_summary : public dynamic_metric { } #endif - std::shared_ptr> labels_block_; - static inline std::shared_ptr excutor_ = - coro_io::create_io_context_pool(1); - bool has_observe_ = false; + private: + using hashtable_t = dynamic_metric_hash_map< + std::array, + std::unique_ptr>>; + std::mutex mutex_; + std::vector quantiles_; + std::chrono::milliseconds max_age_; + hashtable_t label_quantile_values_; }; using dynamic_summary_1 = basic_dynamic_summary<1>; diff --git a/include/ylt/metric/summary_impl.hpp b/include/ylt/metric/summary_impl.hpp new file mode 100644 index 000000000..036258d34 --- /dev/null +++ b/include/ylt/metric/summary_impl.hpp @@ -0,0 +1,366 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ylt/easylog.hpp" +#include "ylt/metric/counter.hpp" + +namespace ylt::metric::detail { + +template +class summary_impl { + constexpr static uint32_t decode_impl(uint16_t float16_value) { + float16_value <<= (8 - frac_bit); + uint32_t sign = float16_value >> 15; + uint32_t exponent = (float16_value >> 8) & 0x7F; + uint32_t fraction = (float16_value & 0xFF); + uint32_t float32_value; + if (exponent == 0) { + /*discard Denormals, in encode they may not correct so we just decode it + * as zero */ + float32_value = (sign << 31); + } + else if (exponent == 0x7F) { + /* Inf or NaN */ + /* we just use return it as value 2^64 */ + float32_value = (sign << 31) | ((127 + (127 - 63)) << 23); + } + else { + /* ordinary number */ + float32_value = + (sign << 31) | ((exponent + (127 - 63)) << 23) | (fraction << 15); + } + return float32_value; + } + + constexpr static auto generate_decode_table() { + constexpr size_t bucket_size = + 1 << (frac_bit + 1 /*sign bit*/ + 7 /*exp bit*/); + std::array table{}; + for (uint16_t i = 0; i < bucket_size; ++i) { + table[i] = decode_impl(i); + } + return table; + }; + + static auto& get_decode_table() { + static constexpr auto table = generate_decode_table(); + return table; + }; + + /*my float16: | 1bit positive/negative flag | 6bit exp | 9bit frac |*/ + static_assert(frac_bit < 8); + static constexpr float float16_max = (1ull << 63) * 2.0f; // 2^64 + + static uint16_t encode(float flt) { + unsigned int& fltInt32 = *(unsigned int*)&flt; + if (std::abs(flt) >= float16_max || std::isnan(flt)) { + flt = (fltInt32 & 0x8000'0000) ? (-float16_max) : (float16_max); + } + unsigned short fltInt16; + fltInt16 = (fltInt32 >> 31) << 7; /*float32 flag: 1bit*/ + unsigned short tmp = (fltInt32 >> 23) & 0xff; /*float32 exp: 8bit*/ + + tmp = (tmp - 0x40) & ((unsigned int)((int)(0x40 - tmp) >> 6) >> 25); + fltInt16 = (fltInt16 | tmp) << 8; + + // this step cause error denormals for flt<2^-63, but we decode it as zero + // later + fltInt16 |= (fltInt32 >> 15) & 0xff; + + auto i = fltInt16 >> (8 - frac_bit); + auto j = decode_impl(i); + return i; + } + + static float decode(uint16_t float16_value) { + static_assert(frac_bit < 8); + return *(float*)&(get_decode_table()[float16_value]); + } + + static constexpr inline size_t bucket_size = + 1 << (frac_bit + 1 /*sign bit*/ + 7 /*exp bit*/); + + static constexpr size_t piece_cnt = 1 << 7; + + struct data_t { + static constexpr size_t piece_size = bucket_size / piece_cnt; + using piece_t = std::array, piece_size>; + + std::atomic& operator[](std::size_t index) { + piece_t* piece = arr[index / piece_size]; + if (piece == nullptr) { + auto ptr = new piece_t{}; + arr[index / piece_size].compare_exchange_strong(piece, ptr); + return (*arr[index / piece_size].load())[index % piece_size]; + } + else { + return (*piece)[index % piece_size]; + } + } + void refresh() { + for (auto& piece_ptr : arr) { + delete piece_ptr.exchange(nullptr); + } + } + static uint16_t get_ordered_index(int16_t raw_index) { + return (raw_index >= bucket_size / 2) ? (bucket_size / 2 - 1 - raw_index) + : (raw_index); + } + static uint16_t get_raw_index(int16_t ordered_index) { + return (ordered_index < 0) ? (bucket_size / 2 - 1 - ordered_index) + : (ordered_index); + } + template + void stat_impl(uint64_t& count, + std::vector>& result, int i) { + auto piece = arr[i].load(std::memory_order_relaxed); + if (piece) { + if constexpr (inc_order) { + for (int j = 0; j < piece->size(); ++j) { + auto value = (*piece)[j].load(std::memory_order_relaxed); + if (value) { + result.emplace_back(get_ordered_index(i * piece_size + j), value); + count += value; + } + } + } + else { + for (int j = piece->size() - 1; j >= 0; --j) { + auto value = (*piece)[j].load(std::memory_order_relaxed); + if (value) { + result.emplace_back(get_ordered_index(i * piece_size + j), value); + count += value; + } + } + } + } + } + void stat(uint64_t& count, + std::vector>& result) { + for (int i = piece_cnt - 1; i >= piece_cnt / 2; --i) { + stat_impl(count, result, i); + } + for (int i = 0; i < piece_cnt / 2; ++i) { + stat_impl(count, result, i); + } + } + + ~data_t() { + for (auto& e : arr) { + delete e; + } + } + + std::array, piece_cnt> arr; + // fixed_thread_local_value cnt; + }; + + data_t& get_data() { + data_t* data = data_[frontend_data_index_]; + if (data == nullptr) [[unlikely]] { + auto pointer = new data_t{}; + if (!data_[frontend_data_index_].compare_exchange_strong(data, pointer)) { + delete pointer; + } + return *data_[frontend_data_index_]; + } + else { + return *data; + } + } + + static inline const unsigned long ms_count = + std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count(); + + template + void refresh() { + if (refresh_time_.count() <= 0) { + return; + } + uint64_t old_tp = tp_; + auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count(); + auto ms = (new_tp - old_tp) / ms_count; + if (; ms > refresh_time_.count()) [[unlikely]] { + if (tp_.compare_exchange_strong(old_tp, new_tp)) { + auto pos = frontend_data_index_ ^ 1; + if (auto data = data_[pos].load(); data != nullptr) { + data_delete_checker = true; + while (*data_delete_locker > 0) { + std::this_thread::yield(); + } + /*it seems dangerours, but we block the read op, and there is no write + * op in backend after refresh time*/ + if constexpr (is_read) { + delete data_[pos].exchange(nullptr); + } + else { + (*data_[pos]).refresh(); + } + data_delete_checker = false; + } + frontend_data_index_ = pos; + } + } + } + + constexpr static unsigned int near_uint32_max = 4290000000U; + + void increase(data_t& arr, uint16_t pos) { + if (++arr[pos] > near_uint32_max) /*no overflow*/ [[likely]] { + --arr[pos]; + int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size); + int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2); + for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1); + delta < lim; ++delta) { + if (pos + delta < upper) { + if (++arr[pos + delta] <= near_uint32_max) { + break; + } + --arr[pos + delta]; + } + if (pos - delta >= lower) { + if (++arr[pos - delta] <= near_uint32_max) { + break; + } + --arr[pos - delta]; + } + } + } + } + + struct data_copy_t { + std::vector> arr[2]; + int index[2] = {}, smaller_one; + void init() { + if (arr[0][0] <= arr[1][0]) { + smaller_one = 0; + } + else { + smaller_one = 1; + } + } + void inc() { + index[smaller_one]++; + if (arr[0][index[0]] <= arr[1][index[1]]) { + smaller_one = 0; + } + else { + smaller_one = 1; + } + } + int16_t value() { return arr[smaller_one][index[smaller_one]].first; } + uint32_t count() { return arr[smaller_one][index[smaller_one]].second; } + }; + + public: + void insert(float value) { + refresh(); + auto& data = get_data(); + increase(data, encode(value)); + return; + } + struct data_delete_guard { + summary_impl* self_; + data_delete_guard(summary_impl* self) : self_(self) { + if (self_->refresh_time_.count() >= 0) { + ++*(self_->data_delete_locker); + } + } + ~data_delete_guard() { + if (self_->refresh_time_.count() >= 0) { + --*(self_->data_delete_locker); + } + } + }; + + std::vector stat(double& sum, uint64_t& count) { + refresh(); + while (data_delete_checker) [[unlikely]] { + std::this_thread::yield(); + } + count = 0; + sum = 0; + data_copy_t data_copy; + { + data_delete_guard guard(this); + data_t* ar[2] = {data_[0], data_[1]}; + if (ar[0] == nullptr && ar[1] == nullptr) [[unlikely]] { + return std::vector(rate_.size(), 0.0f); + } + if (ar[0]) { + ar[0]->stat(count, data_copy.arr[0]); + } + if (ar[1]) { + ar[1]->stat(count, data_copy.arr[1]); + } + } + if (count == 0) { + return std::vector(rate_.size(), 0); + } + uint64_t count_now = 0; + data_copy.arr[0].emplace_back(bucket_size / 2, 0); + data_copy.arr[1].emplace_back(bucket_size / 2, 0); + data_copy.init(); + std::vector result; + result.reserve(rate_.size()); + float v = -float16_max; + for (auto e : rate_) { + if (e < 0) [[unlikely]] { + result.push_back(v); + continue; + } + else if (e > 1) { + e = 1; + } + auto target_count = std::min(e * count, count); + while (true) { + if (target_count <= count_now) [[unlikely]] { + result.push_back(v); + break; + } + auto tmp = data_copy.count(); + count_now += tmp; + v = decode(data_t::get_raw_index(data_copy.value())); + sum += v * tmp; + data_copy.inc(); + } + } + while (data_copy.value() < bucket_size / 2) { + sum += + decode(data_t::get_raw_index(data_copy.value())) * data_copy.count(); + data_copy.inc(); + } + return result; + } + + summary_impl(std::vector& rate, + std::chrono::seconds refresh_time = std::chrono::seconds{0}) + : rate_(rate), + refresh_time_(refresh_time.count() * 1000 / 2), + tp_(std::chrono::steady_clock::now().time_since_epoch().count()){}; + + ~summary_impl() { + for (auto& data : data_) { + delete data; + } + } + + private: + const std::chrono::milliseconds refresh_time_; + std::atomic tp_; + std::vector& rate_; + std::array, 2> data_; + std::atomic frontend_data_index_; + std::atomic data_delete_checker = false; + std::unique_ptr> data_delete_locker = + std::make_unique>(); +}; +} // namespace ylt::metric::detail diff --git a/src/metric/benchmark/main.cpp b/src/metric/benchmark/main.cpp index cb34cd0e9..287efed0e 100644 --- a/src/metric/benchmark/main.cpp +++ b/src/metric/benchmark/main.cpp @@ -145,11 +145,8 @@ void bench_many_labels_serialize(size_t COUNT, bool to_json = false) { void bench_many_labels_qps_summary(size_t thd_num, std::chrono::seconds duration) { - dynamic_summary_2 summary( - "qps2", "", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, - std::array{"method", "url"}); + dynamic_summary_2 summary("qps2", "", {0.5, 0.9, 0.95, 0.99}, + std::array{"method", "url"}); std::atomic stop = false; std::vector vec; std::array arr{"/test", "200"}; @@ -182,32 +179,11 @@ void bench_many_labels_qps_summary(size_t thd_num, for (auto& thd : vec) { thd.join(); } - - start = std::chrono::system_clock::now(); - size_t last = summary.size_approx(); - std::cout << "total size: " << last << "\n"; - while (true) { - std::this_thread::sleep_for(1s); - size_t current = summary.size_approx(); - if (current == 0) { - break; - } - - std::cout << last - current << "\n"; - last = current; - } - end = std::chrono::system_clock::now(); - elaps = std::chrono::duration_cast(end - start) - .count(); - std::cout << "consume " << elaps << "ms\n"; } void bench_many_labels_serialize_summary(size_t COUNT, bool to_json = false) { - dynamic_summary_2 summary( - "qps2", "", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, - std::array{"method", "url"}); + dynamic_summary_2 summary("qps2", "", {0.5, 0.9, 0.95, 0.005}, + std::array{"method", "url"}); std::string val(36, ' '); for (size_t i = 0; i < COUNT; i++) { strcpy(val.data(), std::to_string(i).data()); @@ -218,10 +194,10 @@ void bench_many_labels_serialize_summary(size_t COUNT, bool to_json = false) { std::string str; auto start = std::chrono::system_clock::now(); if (to_json) { - async_simple::coro::syncAwait(summary.serialize_to_json_async(str)); + summary.serialize_to_json(str); } else { - async_simple::coro::syncAwait(summary.serialize_async(str)); + summary.serialize(str); } auto end = std::chrono::system_clock::now(); diff --git a/src/metric/tests/test_metric.cpp b/src/metric/tests/test_metric.cpp index ac6b012b5..6b81c7a12 100644 --- a/src/metric/tests/test_metric.cpp +++ b/src/metric/tests/test_metric.cpp @@ -1,3 +1,4 @@ +#include #define DOCTEST_CONFIG_IMPLEMENT #include #include @@ -81,19 +82,16 @@ TEST_CASE("serialize zero") { std::map customMap = {}; auto summary = std::make_shared( - "test", "help", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, - customMap); - async_simple::coro::syncAwait(summary->serialize_async(str)); + "test", "help", std::vector{0.5, 0.9, 0.95, 0.99}, customMap); + summary->serialize(str); CHECK(str.empty()); - async_simple::coro::syncAwait(summary->serialize_to_json_async(str)); + summary->serialize_to_json(str); CHECK(str.empty()); summary->observe(0); - async_simple::coro::syncAwait(summary->serialize_async(str)); + summary->serialize(str); CHECK(!str.empty()); str.clear(); - async_simple::coro::syncAwait(summary->serialize_to_json_async(str)); + summary->serialize_to_json(str); CHECK(!str.empty()); str.clear(); } @@ -455,10 +453,7 @@ TEST_CASE("test no lable") { { std::map customMap = {}; auto summary = std::make_shared( - "test", "help", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, - customMap); + "test", "help", std::vector{0.5, 0.9, 0.95, 0.99}, customMap); summary->observe(100); } auto g_counter = g_pair.second; @@ -686,22 +681,50 @@ TEST_CASE("test histogram") { } TEST_CASE("test summary") { - summary_t summary{"test_summary", - "summary help", - {{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}}; + summary_t summary{"test_summary", "summary help", {0.5, 0.9, 0.95, 0.99}}; std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> distr(1, 100); for (int i = 0; i < 50; i++) { summary.observe(distr(gen)); } + std::string str; + summary.serialize(str); + std::cout << str; + double sum; + uint64_t cnt; + summary.get_rates(sum, cnt); + CHECK(cnt == 50); + CHECK(sum > 0); + CHECK(str.find("test_summary") != std::string::npos); + CHECK(str.find("test_summary_count") != std::string::npos); + CHECK(str.find("test_summary_sum") != std::string::npos); + CHECK(str.find("test_summary{quantile=\"") != std::string::npos); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); +#ifdef CINATRA_ENABLE_METRIC_JSON + std::string str_json; + summary.serialize_to_json(str_json); + std::cout << str_json << "\n"; + CHECK(str_json.find("\"0.9\":") != std::string::npos); +#endif +} + +TEST_CASE("test summary with INF") { + summary_t summary{"test_summary", "summary help", {0.5, 0.9, 0.95, 0.99}}; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distr(1, 100); + for (int i = 0; i < 50; i++) { + summary.observe(INFINITY); + } std::string str; - async_simple::coro::syncAwait(summary.serialize_async(str)); + summary.serialize(str); std::cout << str; - CHECK(async_simple::coro::syncAwait(summary.get_count()) == 50); - CHECK(async_simple::coro::syncAwait(summary.get_sum()) > 0); + double sum; + uint64_t cnt; + summary.get_rates(sum, cnt); + CHECK(cnt == 50); + CHECK(sum < 1e99); CHECK(str.find("test_summary") != std::string::npos); CHECK(str.find("test_summary_count") != std::string::npos); CHECK(str.find("test_summary_sum") != std::string::npos); @@ -709,7 +732,104 @@ TEST_CASE("test summary") { #ifdef CINATRA_ENABLE_METRIC_JSON std::string str_json; - async_simple::coro::syncAwait(summary.serialize_to_json_async(str_json)); + summary.serialize_to_json(str_json); + std::cout << str_json << "\n"; + CHECK(str_json.find("\"0.9\":") != std::string::npos); +#endif +} + +TEST_CASE("test summary with NAN") { + summary_t summary{"test_summary", "summary help", {0.5, 0.9, 0.95, 0.99}}; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distr(1, 100); + for (int i = 0; i < 50; i++) { + summary.observe(NAN); + } + std::string str; + summary.serialize(str); + std::cout << str; + double sum; + uint64_t cnt; + summary.get_rates(sum, cnt); + CHECK(cnt == 50); + CHECK(sum < 1e99); + CHECK(str.find("test_summary") != std::string::npos); + CHECK(str.find("test_summary_count") != std::string::npos); + CHECK(str.find("test_summary_sum") != std::string::npos); + CHECK(str.find("test_summary{quantile=\"") != std::string::npos); + +#ifdef CINATRA_ENABLE_METRIC_JSON + std::string str_json; + summary.serialize_to_json(str_json); + std::cout << str_json << "\n"; + CHECK(str_json.find("\"0.9\":") != std::string::npos); +#endif +} + +TEST_CASE("test summary with illegal quantities") { + summary_t summary{ + "test_summary", "summary help", {-1, 0.9, 0.5, 0.9, 0, 0.95, 1.1}}; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distr(1, 100); + for (int i = 0; i < 100; i++) { + summary.observe(i); + } + std::string str; + summary.serialize(str); + std::cout << str; + double sum; + uint64_t cnt; + auto result = summary.get_rates(sum, cnt); + CHECK(cnt == 100); + CHECK(sum > 0); + CHECK(str.find("test_summary") != std::string::npos); + CHECK(str.find("test_summary_count") != std::string::npos); + CHECK(str.find("test_summary_sum") != std::string::npos); + CHECK(str.find("test_summary{quantile=\"") != std::string::npos); + CHECK(result[0] < 0); + CHECK(result[1] < 0); + CHECK(result[result.size() - 1] > result[result.size() - 2]); + +#ifdef CINATRA_ENABLE_METRIC_JSON + std::string str_json; + summary.serialize_to_json(str_json); + std::cout << str_json << "\n"; + CHECK(str_json.find("\"0.9\":") != std::string::npos); +#endif +} + +TEST_CASE("test summary with many quantities") { + std::vector q; + for (int i = 0; i <= 1000; ++i) { + q.push_back(1.0 * i / 1000); + } + summary_t summary{"test_summary", "summary help", q}; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distr(1, 100); + for (int i = 0; i < 50; i++) { + summary.observe(i); + } + std::string str; + summary.serialize(str); + std::cout << str; + double sum; + uint64_t cnt; + auto result = summary.get_rates(sum, cnt); + CHECK(cnt == 50); + CHECK(sum > 0); + CHECK(str.find("test_summary") != std::string::npos); + CHECK(str.find("test_summary_count") != std::string::npos); + CHECK(str.find("test_summary_sum") != std::string::npos); + CHECK(str.find("test_summary{quantile=\"") != std::string::npos); + CHECK(result[1] == result[2]); + CHECK(result[result.size() - 2] == result[result.size() - 3]); + +#ifdef CINATRA_ENABLE_METRIC_JSON + std::string str_json; + summary.serialize_to_json(str_json); std::cout << str_json << "\n"; CHECK(str_json.find("\"0.9\":") != std::string::npos); #endif @@ -991,9 +1111,7 @@ TEST_CASE("test get metric by static labels and label") { auto map = std::map{{"method", "GET"}, {"url", "/"}}; auto [ec1, s1] = metric_mgr::instance().create_metric_static( - "http_req_static_summary", "help", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, + "http_req_static_summary", "help", std::vector{0.5, 0.9, 0.95, 0.99}, std::map{{"method", "GET"}, {"url", "/"}}); s1->observe(23); @@ -1003,9 +1121,7 @@ TEST_CASE("test get metric by static labels and label") { { using metric_mgr2 = static_metric_manager>; auto [ec, s2] = metric_mgr2::instance().create_metric_static( - "http_req_static_summary2", "help", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, + "http_req_static_summary2", "help", std::vector{0.5, 0.9, 0.95, 0.99}, map); s2->observe(23); @@ -1084,9 +1200,7 @@ TEST_CASE("test get metric by dynamic labels") { auto [ec7, s1] = metric_mgr::instance().create_metric_dynamic( - "http_req_static_summary", "help", - summary_t::Quantiles{ - {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, + "http_req_static_summary", "help", std::vector{0.5, 0.9, 0.95, 0.99}, std::array{"method", "url"}); s1->observe({"GET", "/"}, 23); @@ -1184,11 +1298,10 @@ TEST_CASE("test histogram serialize with static labels") { } TEST_CASE("test summary with dynamic labels") { - basic_dynamic_summary<2> summary{ - "test_summary", - "summary help", - {{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, - {"method", "url"}}; + basic_dynamic_summary<2> summary{"test_summary", + "summary help", + {0.5, 0.9, 0.95, 0.99}, + {"method", "url"}}; std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution<> distr(1, 100); @@ -1201,17 +1314,17 @@ TEST_CASE("test summary with dynamic labels") { double sum; uint64_t count; - auto rates = async_simple::coro::syncAwait( - summary.get_rates({"GET", "/"}, sum, count)); + + auto rates = summary.get_rates({"GET", "/"}, sum, count); std::cout << rates.size() << "\n"; std::string str; - async_simple::coro::syncAwait(summary.serialize_async(str)); + summary.serialize(str); std::cout << str; #ifdef CINATRA_ENABLE_METRIC_JSON std::string json_str; - async_simple::coro::syncAwait(summary.serialize_to_json_async(json_str)); + summary.serialize_to_json(json_str); std::cout << json_str << "\n"; #endif } @@ -1220,7 +1333,7 @@ TEST_CASE("test summary with static labels") { summary_t summary{ "test_summary", "summary help", - {{0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, + {0.5, 0.9, 0.95, 0.99}, std::map{{"method", "GET"}, {"url", "/"}}}; std::random_device rd; std::mt19937 gen(rd()); @@ -1233,19 +1346,19 @@ TEST_CASE("test summary with static labels") { double sum; uint64_t count; - auto rates = async_simple::coro::syncAwait(summary.get_rates(sum, count)); + auto rates = summary.get_rates(sum, count); std::cout << rates.size() << "\n"; - auto rates1 = async_simple::coro::syncAwait(summary.get_rates(sum, count)); + auto rates1 = summary.get_rates(sum, count); CHECK(rates == rates1); std::string str; - async_simple::coro::syncAwait(summary.serialize_async(str)); + summary.serialize(str); std::cout << str; #ifdef CINATRA_ENABLE_METRIC_JSON std::string json_str; - async_simple::coro::syncAwait(summary.serialize_to_json_async(json_str)); + summary.serialize_to_json(json_str); std::cout << json_str << "\n"; #endif }