Skip to content

Commit

Permalink
Fix observable Gauge metrics generation (open-telemetry#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Oct 6, 2022
1 parent a64ac09 commit 79a9471
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 44 deletions.
10 changes: 6 additions & 4 deletions sdk/src/metrics/aggregation/lastvalue_aggregation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void LongLastValueAggregation::Aggregate(long value,
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
point_data_.is_lastvalue_valid_ = true;
point_data_.value_ = value;
point_data_.sample_ts_ = std::chrono::system_clock::now();
}

std::unique_ptr<Aggregation> LongLastValueAggregation::Merge(
Expand Down Expand Up @@ -93,6 +94,7 @@ void DoubleLastValueAggregation::Aggregate(double value,
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
point_data_.is_lastvalue_valid_ = true;
point_data_.value_ = value;
point_data_.sample_ts_ = std::chrono::system_clock::now();
}

std::unique_ptr<Aggregation> DoubleLastValueAggregation::Merge(
Expand All @@ -102,12 +104,12 @@ std::unique_ptr<Aggregation> DoubleLastValueAggregation::Merge(
nostd::get<LastValuePointData>(delta.ToPoint()).sample_ts_.time_since_epoch())
{
LastValuePointData merge_data = std::move(nostd::get<LastValuePointData>(ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(merge_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(merge_data)));
}
else
{
LastValuePointData merge_data = std::move(nostd::get<LastValuePointData>(delta.ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(merge_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(merge_data)));
}
}

Expand All @@ -118,12 +120,12 @@ std::unique_ptr<Aggregation> DoubleLastValueAggregation::Diff(
nostd::get<LastValuePointData>(next.ToPoint()).sample_ts_.time_since_epoch())
{
LastValuePointData diff_data = std::move(nostd::get<LastValuePointData>(ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(diff_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(diff_data)));
}
else
{
LastValuePointData diff_data = std::move(nostd::get<LastValuePointData>(next.ToPoint()));
return std::unique_ptr<Aggregation>(new LongLastValueAggregation(std::move(diff_data)));
return std::unique_ptr<Aggregation>(new DoubleLastValueAggregation(std::move(diff_data)));
}
}

Expand Down
122 changes: 82 additions & 40 deletions sdk/test/metrics/async_metric_storage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,49 +46,12 @@ class MockCollectorHandle : public CollectorHandle
class WritableMetricStorageTestFixture : public ::testing::TestWithParam<AggregationTemporality>
{};

class MeasurementFetcher
{
public:
static void Fetcher(opentelemetry::metrics::ObserverResult observer_result, void * /*state*/)
{
fetch_count++;
if (fetch_count == 1)
{
opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "GET"}});
opentelemetry::nostd::get<0>(observer_result)->Observe(10l, {{"RequestType", "PUT"}});
number_of_get += 20l;
number_of_put += 10l;
}
else if (fetch_count == 2)
{
opentelemetry::nostd::get<0>(observer_result)->Observe(40l, {{"RequestType", "GET"}});
opentelemetry::nostd::get<0>(observer_result)->Observe(20l, {{"RequestType", "PUT"}});
number_of_get += 40l;
number_of_put += 20l;
}
}

static void init_values()
{
fetch_count = 0;
number_of_get = 0;
number_of_put = 0;
}

static size_t fetch_count;
static long number_of_get;
static long number_of_put;
static const size_t number_of_attributes = 2;
};

size_t MeasurementFetcher::fetch_count;
long MeasurementFetcher::number_of_get;
long MeasurementFetcher::number_of_put;
const size_t MeasurementFetcher::number_of_attributes;
class WritableMetricStorageTestObservableGaugeFixture
: public ::testing::TestWithParam<AggregationTemporality>
{};

TEST_P(WritableMetricStorageTestFixture, TestAggregation)
{
MeasurementFetcher::init_values();
AggregationTemporality temporality = GetParam();

InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter,
Expand Down Expand Up @@ -180,4 +143,83 @@ INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
{
AggregationTemporality temporality = GetParam();

InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableGauge,
InstrumentValueType::kLong};

auto sdk_start_ts = std::chrono::system_clock::now();
// Some computation here
auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);

std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
std::vector<std::shared_ptr<CollectorHandle>> collectors;
collectors.push_back(collector);

std::unique_ptr<AttributesProcessor> default_attributes_processor{
new DefaultAttributesProcessor{}};
opentelemetry::sdk::metrics::AsyncMetricStorage storage(
instr_desc, AggregationType::kLastValue, default_attributes_processor.get(),
std::shared_ptr<opentelemetry::sdk::metrics::AggregationConfig>{});
long freq_cpu0 = 3l;
long freq_cpu1 = 5l;
size_t attribute_count = 2;
std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements1 = {
{{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
storage.RecordLong(measurements1,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));

storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
"0")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu0);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("CPU")->second) == "1")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu1);
}
}
return true;
});

freq_cpu0 = 6l;
freq_cpu1 = 8l;

std::unordered_map<MetricAttributes, long, AttributeHashGenerator> measurements2 = {
{{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
storage.RecordLong(measurements2,
opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
storage.Collect(
collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData data) {
for (auto data_attr : data.point_data_attr_)
{
auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
"0")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu0);
}
else if (opentelemetry::nostd::get<std::string>(
data_attr.attributes.find("CPU")->second) == "1")
{
EXPECT_EQ(opentelemetry::nostd::get<long>(data.value_), freq_cpu1);
}
}
return true;
});
}

INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestObservableGaugeFixtureLong,
WritableMetricStorageTestObservableGaugeFixture,
::testing::Values(AggregationTemporality::kCumulative,
AggregationTemporality::kDelta));

#endif

0 comments on commit 79a9471

Please sign in to comment.