Skip to content

Commit

Permalink
Merge #724
Browse files Browse the repository at this point in the history
724: Stabilizing integral and interpolated integral r=thatzopoulos a=thatzopoulos

Fixes #719 

Co-authored-by: Thomas Hatzopoulos <thomas@timescale.com>
  • Loading branch information
bors[bot] and thatzopoulos authored Mar 6, 2023
2 parents 27d77f6 + 79ccfbb commit f518404
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 49 deletions.
1 change: 1 addition & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo

#### Stabilized features
- [#722](https://github.com/timescale/timescaledb-toolkit/pull/722): Stabilize heartbeat aggregate.
- [#724](https://github.com/timescale/timescaledb-toolkit/pull/724): Stabilize integral and interpolated_integral for time-weighted-average.

#### Other notable changes
- [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Add arrow operator support for counter aggregate and time-weighted aggregate interpolated accessors.
Expand Down
39 changes: 19 additions & 20 deletions extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,32 +551,31 @@ pub fn accessor_unnest() -> AccessorUnnest<'static> {
}
}

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
struct AccessorIntegral<'input> {
len: u32,
bytes: [u8; self.len],
}
pg_type! {
#[derive(Debug)]
struct AccessorIntegral<'input> {
len: u32,
bytes: [u8; self.len],
}
}

// FIXME string IO
ron_inout_funcs!(AccessorIntegral);
// FIXME string IO
ron_inout_funcs!(AccessorIntegral);

#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
unsafe {
flatten! {
AccessorIntegral {
len: unit.len().try_into().unwrap(),
bytes: unit.as_bytes().into(),
}
#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
unsafe {
flatten! {
AccessorIntegral {
len: unit.len().try_into().unwrap(),
bytes: unit.as_bytes().into(),
}
}
}
}
#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
Expand Down
16 changes: 15 additions & 1 deletion extension/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since
//! the length of days varies when in a timezone with daylight savings time.

use core::fmt::{self, Formatter};

// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DurationUnit {
Expand All @@ -14,7 +16,7 @@ pub enum DurationUnit {
}

impl DurationUnit {
fn microseconds(self) -> u32 {
pub fn microseconds(self) -> u32 {
match self {
Self::Microsec => 1,
Self::Millisec => 1000,
Expand Down Expand Up @@ -46,6 +48,18 @@ impl DurationUnit {
}
}

impl fmt::Display for DurationUnit {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
DurationUnit::Microsec => write!(f, "microsecond"),
DurationUnit::Millisec => write!(f, "millisecond"),
DurationUnit::Second => write!(f, "second"),
DurationUnit::Minute => write!(f, "minute"),
DurationUnit::Hour => write!(f, "hour"),
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
14 changes: 14 additions & 0 deletions extension/src/stabilization_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ crate::functions_stabilized_at! {
interpolated_rate(timestamp with time zone,interval,countersummary,countersummary),
timeweightinterpolatedaverageaccessor_in(cstring),
timeweightinterpolatedaverageaccessor_out(timeweightinterpolatedaverageaccessor),
accessorintegral_in(cstring),
accessorintegral_out(accessorintegral),
arrow_time_weighted_average_integral(timeweightsummary,accessorintegral),
arrow_time_weighted_average_interpolated_integral(timeweightsummary,timeweightinterpolatedintegralaccessor),
integral(text),
integral(timeweightsummary,text),
interpolated_integral(timestamp with time zone,interval,timeweightsummary,timeweightsummary,text),
interpolated_integral(timeweightsummary,timestamp with time zone, interval,timeweightsummary,timeweightsummary,text),
timeweightinterpolatedintegralaccessor_in(cstring),
timeweightinterpolatedintegralaccessor_out(timeweightinterpolatedintegralaccessor),
dead_ranges(heartbeatagg),
downtime(heartbeatagg),
heartbeat_agg(timestamp with time zone,timestamp with time zone,interval,interval),
Expand Down Expand Up @@ -557,6 +567,8 @@ crate::types_stabilized_at! {
counterinterpolateddeltaaccessor,
counterinterpolatedrateaccessor,
timeweightinterpolatedaverageaccessor,
timeweightinterpolatedintegralaccessor,
accessorintegral,
heartbeatagg,
accessordeadranges,
accessordowntime,
Expand Down Expand Up @@ -660,6 +672,8 @@ crate::operators_stabilized_at! {
"->"(countersummary,counterinterpolateddeltaaccessor),
"->"(countersummary,counterinterpolatedrateaccessor),
"->"(timeweightsummary,timeweightinterpolatedaverageaccessor),
"->"(timeweightsummary,timeweightinterpolatedintegralaccessor),
"->"(timeweightsummary,accessorintegral),
"->"(heartbeatagg,accessordeadranges),
"->"(heartbeatagg,accessordowntime),
"->"(heartbeatagg,accessorliveat),
Expand Down
84 changes: 56 additions & 28 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use serde::{Deserialize, Serialize};

use crate::{
accessors::{
toolkit_experimental, AccessorAverage, AccessorFirstTime, AccessorFirstVal,
AccessorLastTime, AccessorLastVal,
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorIntegral, AccessorLastTime,
AccessorLastVal,
},
aggregate_utils::in_aggregate_context,
duration::DurationUnit,
Expand All @@ -25,7 +25,7 @@ use crate::raw::bytea;

mod accessors;

use accessors::TimeWeightInterpolatedAverageAccessor;
use accessors::{TimeWeightInterpolatedAverageAccessor, TimeWeightInterpolatedIntegralAccessor};

pg_type! {
#[derive(Debug)]
Expand Down Expand Up @@ -429,7 +429,7 @@ pub fn arrow_time_weighted_average_average<'a>(
#[opname(->)]
pub fn arrow_time_weighted_average_integral<'a>(
tws: Option<TimeWeightSummary<'a>>,
accessor: toolkit_experimental::AccessorIntegral<'a>,
accessor: AccessorIntegral<'a>,
) -> Option<f64> {
time_weighted_average_integral(
tws,
Expand All @@ -455,12 +455,7 @@ pub fn time_weighted_average_average<'a>(tws: Option<TimeWeightSummary<'a>>) ->
}
}

#[pg_extern(
immutable,
parallel_safe,
name = "integral",
schema = "toolkit_experimental"
)]
#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn time_weighted_average_integral<'a>(
tws: Option<TimeWeightSummary<'a>>,
unit: default!(String, "'second'"),
Expand Down Expand Up @@ -530,12 +525,7 @@ pub fn arrow_time_weighted_average_interpolated_average<'a>(
)
}

#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_integral",
schema = "toolkit_experimental"
)]
#[pg_extern(immutable, parallel_safe, name = "interpolated_integral")]
pub fn time_weighted_average_interpolated_integral<'a>(
tws: Option<TimeWeightSummary<'a>>,
start: crate::raw::TimestampTz,
Expand All @@ -548,6 +538,44 @@ pub fn time_weighted_average_interpolated_integral<'a>(
time_weighted_average_integral(target, unit)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_time_weighted_average_interpolated_integral<'a>(
tws: Option<TimeWeightSummary<'a>>,
accessor: TimeWeightInterpolatedIntegralAccessor<'a>,
) -> Option<f64> {
let prev = if accessor.flags & 1 == 1 {
Some(accessor.prev.clone().into())
} else {
None
};
let next = if accessor.flags & 2 == 2 {
Some(accessor.next.clone().into())
} else {
None
};

// Convert from num of milliseconds to DurationUnit and then to string
let unit = match accessor.unit {
1 => DurationUnit::Microsec,
1000 => DurationUnit::Millisec,
1_000_000 => DurationUnit::Second,
60_000_000 => DurationUnit::Minute,
3_600_000_000 => DurationUnit::Hour,
_ => todo!(), // This should never be reached, the accessor gets these numbers from microseconds() in duration.rs, which only matches on valid enum values
}
.to_string();

time_weighted_average_interpolated_integral(
tws,
accessor.start.into(),
accessor.interval.into(),
prev,
next,
unit,
)
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down Expand Up @@ -576,9 +604,9 @@ mod tests {
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)";
client.update(stmt, None, None).unwrap();

let stmt = "SELECT toolkit_experimental.integral(time_weight('Trapezoidal', ts, val), 'hrs') FROM test";
let stmt = "SELECT integral(time_weight('Trapezoidal', ts, val), 'hrs') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);

// add another point
Expand Down Expand Up @@ -622,9 +650,9 @@ mod tests {
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'mins') FROM test";
let stmt = "SELECT integral(time_weight('Linear', ts, val), 'mins') FROM test";
assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'hour') FROM test";
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'hour') FROM test";
assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON);

//non-evenly spaced values
Expand All @@ -640,23 +668,23 @@ mod tests {
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
let stmt = "SELECT integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->toolkit_experimental.integral('microseconds') \
->integral('microseconds') \
FROM test";
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->toolkit_experimental.integral() \
->integral() \
FROM test";
assert!((select_one!(client, stmt, f64) - 25500.00).abs() < f64::EPSILON);

let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
// expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
let stmt = "SELECT integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON);

//make sure this works with whatever ordering we throw at it
Expand All @@ -665,9 +693,9 @@ mod tests {
let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
let stmt = "SELECT integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
let stmt = "SELECT integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON);

// make sure we get the same result if we do multi-level aggregation
Expand Down Expand Up @@ -898,7 +926,7 @@ mod tests {
let mut integrals = client
.update(
r#"SELECT
toolkit_experimental.interpolated_integral(
interpolated_integral(
agg,
bucket,
'1 day'::interval,
Expand All @@ -919,7 +947,7 @@ mod tests {
client
.update(
r#"SELECT
toolkit_experimental.interpolated_integral(
interpolated_integral(
agg,
bucket,
'1 day'::interval,
Expand Down
59 changes: 59 additions & 0 deletions extension/src/time_weighted_average/accessors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use pgx::*;

use crate::time_weighted_average::DurationUnit;
use crate::{
datum_utils::interval_to_ms,
flatten, pg_type, ron_inout_funcs,
Expand Down Expand Up @@ -54,3 +55,61 @@ fn time_weight_interpolated_average_accessor<'a>(
}
}
}

pg_type! {
#[derive(Debug)]
struct TimeWeightInterpolatedIntegralAccessor {
start : i64,
interval : i64,
prev : TimeWeightSummaryData,
pad : [u8;3],
unit : u32,
flags: u64,
next : TimeWeightSummaryData,
}
}

ron_inout_funcs!(TimeWeightInterpolatedIntegralAccessor);

#[pg_extern(immutable, parallel_safe, name = "interpolated_integral")]
fn time_weight_interpolated_integral_accessor<'a>(
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary<'a>>,
next: Option<TimeWeightSummary<'a>>,
unit: default!(String, "'second'"),
) -> TimeWeightInterpolatedIntegralAccessor<'static> {
fn empty_summary<'b>() -> Option<TimeWeightSummary<'b>> {
Some(unsafe {
flatten!(TimeWeightSummary {
first: TSPoint { ts: 0, val: 0.0 },
last: TSPoint { ts: 0, val: 0.0 },
weighted_sum: 0.0,
method: TimeWeightMethod::LOCF,
})
})
}

let unit = match DurationUnit::from_str(&unit) {
Some(unit) => unit.microseconds(),
None => pgx::error!(
"Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour",
unit,
),
};
let flags = u64::from(prev.is_some()) + if next.is_some() { 2 } else { 0 };
let prev = prev.or_else(empty_summary).unwrap().0;
let next = next.or_else(empty_summary).unwrap().0;
let interval = interval_to_ms(&start, &interval);
crate::build! {
TimeWeightInterpolatedIntegralAccessor {
start: start.into(),
interval,
prev,
pad : [0,0,0],
unit,
flags,
next,
}
}
}
Loading

0 comments on commit f518404

Please sign in to comment.