Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integral function for time_weight #526

Merged
merged 3 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/time-weighted-average/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ impl TimeWeightSummary {
let duration = (self.last.ts - self.first.ts) as f64;
Ok(self.w_sum / duration)
}

/// Evaluate the integral in microseconds.
pub fn time_weighted_integral(&self) -> f64 {
if self.last.ts == self.first.ts {
// the integral of a duration of zero width is zero
0.0
} else {
self.w_sum
}
}
}

impl TimeWeightMethod {
Expand Down
28 changes: 28 additions & 0 deletions extension/src/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,31 @@ pub fn accessor_unnest() -> AccessorUnnest<'static> {
}
}
}

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

pg_type! {
#[derive(Debug)]
struct AccessorIntegral<'input> {
len: u32,
bytes: [u8; self.len],
}
}

// FIXME string IO
ron_inout_funcs!(AccessorIntegral);

#[pg_extern(immutable, parallel_safe, name = "integral")]
pub fn accessor_integral(unit: default!(&str, "'second'")) -> AccessorIntegral<'static> {
unsafe {
flatten! {
AccessorIntegral {
len: unit.len().try_into().unwrap(),
bytes: unit.as_bytes().into(),
}
}
}
}
}
75 changes: 75 additions & 0 deletions extension/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Utilities for working with durations. Parsing of duration units is intended to match how
//! PostgreSQL parses duration units. Currently units longer than an hour are unsupported since
//! the length of days varies when in a timezone with daylight savings time.

// Canonical PostgreSQL units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/include/utils/datetime.h#L48-L60
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum DurationUnit {
// units should be ordered smallest -> largest
Microsec,
Millisec,
Second,
Minute,
Hour,
}

impl DurationUnit {
fn microseconds(self) -> u32 {
match self {
Self::Microsec => 1,
Self::Millisec => 1000,
Self::Second => 1_000_000,
Self::Minute => 60_000_000,
Self::Hour => 3_600_000_000,
}
}

/// Convert `amount` of a unit to another unit.
pub fn convert_unit(self, amount: f64, to: Self) -> f64 {
let microseconds = amount * (self.microseconds() as f64);
microseconds / (to.microseconds() as f64)
}

/// Tries to get a duration unit from a string, returning `None` if no known unit matched.
pub fn from_str(s: &str) -> Option<Self> {
// Aliases for canonical units: https://github.com/postgres/postgres/blob/b76fb6c2a99eb7d49f96e56599fef1ffc1c134c9/src/backend/utils/adt/datetime.c#L187-L247
match s.to_lowercase().as_str() {
"usecond" | "microsecond" | "microseconds" | "microsecon" | "us" | "usec"
| "useconds" | "usecs" => Some(Self::Microsec),
"msecond" | "millisecond" | "milliseconds" | "millisecon" | "ms" | "msec"
| "mseconds" | "msecs" => Some(Self::Millisec),
"second" | "s" | "sec" | "seconds" | "secs" => Some(Self::Second),
"minute" | "m" | "min" | "mins" | "minutes" => Some(Self::Minute),
"hour" | "hours" | "h" | "hr" | "hrs" => Some(Self::Hour),
_ => None,
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn convert_unit() {
let load_time_secs = 75.0;
let load_time_mins =
DurationUnit::convert_unit(DurationUnit::Second, load_time_secs, DurationUnit::Minute);
assert_eq!(load_time_mins, 1.25);
}

#[test]
fn parse_unit() {
assert_eq!(
DurationUnit::from_str("usecs"),
Some(DurationUnit::Microsec)
);
assert_eq!(DurationUnit::from_str("MINUTE"), Some(DurationUnit::Minute));
assert_eq!(
DurationUnit::from_str("MiLlIsEcOn"),
Some(DurationUnit::Millisec)
);
assert_eq!(DurationUnit::from_str("pahar"), None);
assert_eq!(DurationUnit::from_str(""), None);
}
}
1 change: 1 addition & 0 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod utilities;

mod aggregate_utils;
mod datum_utils;
mod duration;
mod palloc;
mod pg_any_element;
mod raw;
Expand Down
149 changes: 138 additions & 11 deletions extension/src/time_weighted_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use serde::{Deserialize, Serialize};

use crate::{
accessors::{
AccessorAverage, AccessorFirstTime, AccessorFirstVal, AccessorLastTime, AccessorLastVal,
toolkit_experimental, AccessorAverage, AccessorFirstTime, AccessorFirstVal,
AccessorLastTime, AccessorLastVal,
},
aggregate_utils::in_aggregate_context,
duration::DurationUnit,
flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type, ron_inout_funcs,
Expand Down Expand Up @@ -177,7 +179,7 @@ pub fn time_weight_trans_inner(
point_buffer: vec![],
// TODO technically not portable to ASCII-compatible charsets
method: match method.trim().to_lowercase().as_str() {
"linear" => TimeWeightMethod::Linear,
"linear" | "trapezoidal" => TimeWeightMethod::Linear,
"locf" => TimeWeightMethod::LOCF,
_ => panic!("unknown method"),
},
Expand Down Expand Up @@ -401,6 +403,18 @@ pub fn arrow_time_weighted_average_average(
time_weighted_average_average(sketch)
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
accessor: toolkit_experimental::AccessorIntegral,
) -> Option<f64> {
time_weighted_average_integral(
tws,
String::from_utf8_lossy(accessor.bytes.as_slice()).to_string(),
)
}

#[pg_extern(immutable, parallel_safe, name = "average")]
pub fn time_weighted_average_average(tws: Option<TimeWeightSummary>) -> Option<f64> {
match tws {
Expand All @@ -422,27 +436,75 @@ pub fn time_weighted_average_average(tws: Option<TimeWeightSummary>) -> Option<f
#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_average",
name = "integral",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_average(
pub fn time_weighted_average_integral(
tws: Option<TimeWeightSummary>,
unit: default!(String, "'second'"),
) -> Option<f64> {
let unit = match DurationUnit::from_str(&unit) {
Some(unit) => unit,
None => pgx::error!(
"Unrecognized duration unit: {}. Valid units are: usecond, msecond, second, minute, hour",
unit,
),
};
let integral_microsecs = tws?.internal().time_weighted_integral();
Some(DurationUnit::Microsec.convert_unit(integral_microsecs, unit))
}

fn interpolate<'a>(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> Option<f64> {
let target = match tws {
) -> Option<TimeWeightSummary<'a>> {
match tws {
None => None,
Some(tws) => {
let interval = crate::datum_utils::interval_to_ms(&start, &interval);
Some(tws.interpolate(start.into(), interval, prev, next))
}
};
}
}

#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_average",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_average(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
) -> Option<f64> {
let target = interpolate(tws, start, interval, prev, next);
time_weighted_average_average(target)
}

#[pg_extern(
immutable,
parallel_safe,
name = "interpolated_integral",
schema = "toolkit_experimental"
)]
pub fn time_weighted_average_interpolated_integral(
tws: Option<TimeWeightSummary>,
start: crate::raw::TimestampTz,
interval: crate::raw::Interval,
prev: Option<TimeWeightSummary>,
next: Option<TimeWeightSummary>,
unit: String,
) -> Option<f64> {
let target = interpolate(tws, start, interval, prev, next);
time_weighted_average_integral(target, unit)
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand All @@ -465,8 +527,17 @@ mod tests {
"CREATE TABLE test(ts timestamptz, val DOUBLE PRECISION); SET TIME ZONE 'UTC'";
client.select(stmt, None, None);

// add a couple points
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0), ('2020-01-01 00:01:00+00', 20.0)";
// add a point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:00:00+00', 10.0)";
client.select(stmt, None, None);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Trapezoidal', ts, val), 'hrs') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'msecond') FROM test";
assert_eq!(select_one!(client, stmt, f64), 0.0);

// add another point
let stmt = "INSERT INTO test VALUES('2020-01-01 00:01:00+00', 20.0)";
client.select(stmt, None, None);

// test basic with 2 points
Expand Down Expand Up @@ -506,6 +577,11 @@ mod tests {
let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
assert!((select_one!(client, stmt, f64) - 15.0).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'mins') FROM test";
assert!((select_one!(client, stmt, f64) - 60.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'hour') FROM test";
assert!((select_one!(client, stmt, f64) - 1.0).abs() < f64::EPSILON);

//non-evenly spaced values
let stmt = "INSERT INTO test VALUES('2020-01-01 00:08:00+00', 30.0), ('2020-01-01 00:10:00+00', 10.0), ('2020-01-01 00:10:30+00', 20.0), ('2020-01-01 00:20:00+00', 30.0)";
client.select(stmt, None, None);
Expand All @@ -519,16 +595,36 @@ mod tests {
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val), 'microseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->toolkit_experimental.integral('microseconds') \
FROM test";
// arrow syntax should be the same
assert!((select_one!(client, stmt, f64) - 25500000000.00).abs() < f64::EPSILON);
let stmt = "SELECT time_weight('Linear', ts, val) \
->toolkit_experimental.integral() \
FROM test";
assert!((select_one!(client, stmt, f64) - 25500.00).abs() < f64::EPSILON);

let stmt = "SELECT average(time_weight('LOCF', ts, val)) FROM test";
// expected = (10 + 20 + 10 + 20 + 10*4 + 30*2 +10*.5 + 20*9.5) / 20 = 17.75 using last value and carrying for each point
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val), 'milliseconds') FROM test";
assert!((select_one!(client, stmt, f64) - 21300000.0).abs() < f64::EPSILON);

//make sure this works with whatever ordering we throw at it
let stmt = "SELECT average(time_weight('Linear', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
let stmt = "SELECT average(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 17.75).abs() < f64::EPSILON);

let stmt = "SELECT toolkit_experimental.integral(time_weight('Linear', ts, val ORDER BY random()), 'seconds') FROM test";
assert!((select_one!(client, stmt, f64) - 25500.0).abs() < f64::EPSILON);
let stmt = "SELECT toolkit_experimental.integral(time_weight('LOCF', ts, val ORDER BY random())) FROM test";
assert!((select_one!(client, stmt, f64) - 21300.0).abs() < f64::EPSILON);

// make sure we get the same result if we do multi-level aggregation
let stmt = "WITH t AS (SELECT date_trunc('minute', ts), time_weight('Linear', ts, val) AS tws FROM test GROUP BY 1) SELECT average(rollup(tws)) FROM t";
assert!((select_one!(client, stmt, f64) - 21.25).abs() < f64::EPSILON);
Expand Down Expand Up @@ -720,8 +816,8 @@ mod tests {
toolkit_experimental.interpolated_average(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the trailing whitespace here for consistency with the similar SQL for getting the integrals.

'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket)
) FROM (
SELECT bucket, time_weight('LOCF', time, value) as agg
Expand All @@ -732,23 +828,54 @@ mod tests {
None,
None,
);
let mut integrals = client.select(
r#"SELECT
toolkit_experimental.interpolated_integral(
agg,
bucket,
'1 day'::interval,
LAG(agg) OVER (ORDER BY bucket),
LEAD(agg) OVER (ORDER BY bucket),
'hours'
) FROM (
SELECT bucket, time_weight('LOCF', time, value) as agg
FROM test
GROUP BY bucket
) s
ORDER BY bucket"#,
None,
None,
);

// Day 1, 4 hours @ 10, 4 @ 40, 8 @ 20
assert_eq!(
averages.next().unwrap()[1].value(),
Some((4. * 10. + 4. * 40. + 8. * 20.) / 16.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(4. * 10. + 4. * 40. + 8. * 20.)
);
// Day 2, 2 hours @ 20, 10 @ 15, 8 @ 50, 4 @ 25
assert_eq!(
averages.next().unwrap()[1].value(),
Some((2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.) / 24.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(2. * 20. + 10. * 15. + 8. * 50. + 4. * 25.)
);
// Day 3, 10 hours @ 25, 2 @ 30, 4 @ 0
assert_eq!(
averages.next().unwrap()[1].value(),
Some((10. * 25. + 2. * 30.) / 16.)
);
assert_eq!(
integrals.next().unwrap()[1].value(),
Some(10. * 25. + 2. * 30.)
);
assert!(averages.next().is_none());
assert!(integrals.next().is_none());
});
}
}