From 79870afd26e8fdf4afe13b728d01cb9197ddd28f Mon Sep 17 00:00:00 2001 From: Brian Rowe Date: Fri, 24 Feb 2023 15:32:21 -0800 Subject: [PATCH] This PR remove the toolkit_experimental namespace from heartbeat_agg and adds a great deal of testing required by our stability process. --- Changelog.md | 3 + docs/test_caggs.md | 22 +- extension/src/accessors.rs | 25 + extension/src/heartbeat_agg.rs | 964 ++++++++++++++++++----- extension/src/heartbeat_agg/accessors.rs | 123 +++ extension/src/stabilization_info.rs | 64 ++ tests/update/heartbeat.md | 53 ++ 7 files changed, 1037 insertions(+), 217 deletions(-) create mode 100644 extension/src/heartbeat_agg/accessors.rs create mode 100644 tests/update/heartbeat.md diff --git a/Changelog.md b/Changelog.md index e7fb93e1..0cab3708 100644 --- a/Changelog.md +++ b/Changelog.md @@ -11,6 +11,9 @@ This changelog should be updated as part of a PR if the work is worth noting (mo #### Bug fixes - [#715](https://github.com/timescale/timescaledb-toolkit/pull/715): Fix out-of-bounds indexing error in `state_agg` rollup +#### Stabilized features +- [#722](https://github.com/timescale/timescaledb-toolkit/pull/722): Stabilize heartbeat aggregate. + #### Other notable changes - [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Add arrow operator support for counter aggregate and time-weighted aggregate interpolated accessors. - [#716](https://github.com/timescale/timescaledb-toolkit/issues/716): Remove experimental versions of interpolated accessors for counter aggregate and time-weighted aggregates. The stable versions introduced in 1.14.0 should be used instead. diff --git a/docs/test_caggs.md b/docs/test_caggs.md index 81ff2ea5..f00358d0 100644 --- a/docs/test_caggs.md +++ b/docs/test_caggs.md @@ -23,7 +23,8 @@ AS SELECT hyperloglog(64, value1) as hll, counter_agg(time, value1) as counter, stats_agg(value1, value2) as stats, - timevector(time, value2) as tvec + timevector(time, value2) as tvec, + heartbeat_agg(time, time_bucket('7 day'::interval, time), '1w', '55m') as hb FROM test GROUP BY time_bucket('7 day'::interval, time); ``` @@ -114,3 +115,22 @@ ORDER BY week; 2020-07-20 00:00:00+00 | 168 2020-07-27 00:00:00+00 | 59 ``` + +```SQL +SELECT week, uptime(hb), interpolated_uptime(hb, LAG(hb) OVER (ORDER BY week)) +FROM weekly_aggs +WHERE week > '2020-06-01' +ORDER BY week; +``` +```output + week | uptime | interpolated_uptime +------------------------+-----------------+--------------------- + 2020-06-08 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-06-15 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-06-22 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-06-29 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-07-06 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-07-13 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-07-20 00:00:00+00 | 6 days 10:00:00 | 6 days 10:00:00 + 2020-07-27 00:00:00+00 | 2 days 06:05:00 | 2 days 06:05:00 +``` diff --git a/extension/src/accessors.rs b/extension/src/accessors.rs index 40af1908..68a1a0ea 100644 --- a/extension/src/accessors.rs +++ b/extension/src/accessors.rs @@ -97,8 +97,33 @@ accessor! { open_time() } accessor! { high_time() } accessor! { low_time() } accessor! { close_time() } +accessor! { live_ranges() } +accessor! { dead_ranges() } +accessor! { uptime() } +accessor! { downtime() } + // The rest are more complex, with String or other challenges. Leaving alone for now. +pg_type! { + #[derive(Debug)] + struct AccessorLiveAt { + time: u64, + } +} + +ron_inout_funcs!(AccessorLiveAt); + +#[pg_extern(immutable, parallel_safe, name = "live_at")] +pub fn accessor_live_at(ts: crate::raw::TimestampTz) -> AccessorLiveAt<'static> { + unsafe { + flatten! { + AccessorLiveAt { + time: ts.0.value() as u64, + } + } + } +} + pg_type! { #[derive(Debug)] struct AccessorStdDev<'input> { diff --git a/extension/src/heartbeat_agg.rs b/extension/src/heartbeat_agg.rs index 0e9a6b02..1ef63413 100644 --- a/extension/src/heartbeat_agg.rs +++ b/extension/src/heartbeat_agg.rs @@ -2,6 +2,9 @@ use pgx::iter::TableIterator; use pgx::*; use crate::{ + accessors::{ + AccessorDeadRanges, AccessorDowntime, AccessorLiveAt, AccessorLiveRanges, AccessorUptime, + }, aggregate_utils::in_aggregate_context, datum_utils::interval_to_ms, flatten, @@ -13,7 +16,12 @@ use crate::{ use std::cmp::{max, min}; -use toolkit_experimental::HeartbeatAggData; +mod accessors; + +use accessors::{ + HeartbeatInterpolateAccessor, HeartbeatInterpolatedDowntimeAccessor, + HeartbeatInterpolatedUptimeAccessor, +}; const BUFFER_SIZE: usize = 1000; // How many values to absorb before consolidating @@ -180,195 +188,263 @@ impl HeartbeatTransState { } } -#[pg_schema] -mod toolkit_experimental { - use super::*; - - pg_type! { - #[derive(Debug)] - struct HeartbeatAgg<'input> - { - start_time : i64, - end_time : i64, - last_seen : i64, - interval_len : i64, - num_intervals : u64, - interval_starts : [i64; self.num_intervals], - interval_ends : [i64; self.num_intervals], - } +pg_type! { + #[derive(Debug)] + struct HeartbeatAgg<'input> + { + start_time : i64, + end_time : i64, + last_seen : i64, + interval_len : i64, + num_intervals : u64, + interval_starts : [i64; self.num_intervals], + interval_ends : [i64; self.num_intervals], } +} - ron_inout_funcs!(HeartbeatAgg); +ron_inout_funcs!(HeartbeatAgg); - impl HeartbeatAgg<'_> { - fn sum_live_intervals(self) -> i64 { - let starts = self.interval_starts.as_slice(); - let ends = self.interval_ends.as_slice(); - let mut sum = 0; - for i in 0..self.num_intervals as usize { - sum += ends[i] - starts[i]; - } - sum +impl HeartbeatAgg<'_> { + fn sum_live_intervals(self) -> i64 { + let starts = self.interval_starts.as_slice(); + let ends = self.interval_ends.as_slice(); + let mut sum = 0; + for i in 0..self.num_intervals as usize { + sum += ends[i] - starts[i]; } + sum + } - fn interpolate_start(&mut self, pred: &Self) { - // only allow interpolation of non-overlapping ranges - assert!(pred.end_time <= self.start_time); - let pred_end = pred.last_seen + self.interval_len; - - if pred_end <= self.start_time { - return; - } + fn interpolate_start(&mut self, pred: &Self) { + // only allow interpolation of non-overlapping ranges + assert!(pred.end_time <= self.start_time); + let pred_end = pred.last_seen + self.interval_len; - // If first range already covers (start_time, pred_end) return - if self - .interval_starts - .as_slice() - .first() - .filter(|v| **v == self.start_time) - .is_some() - && self - .interval_ends - .as_slice() - .first() - .filter(|v| **v >= pred_end) - .is_some() - { - return; - } + if pred_end <= self.start_time { + return; + } - if self - .interval_starts + // If first range already covers (start_time, pred_end) return + if self + .interval_starts + .as_slice() + .first() + .filter(|v| **v == self.start_time) + .is_some() + && self + .interval_ends .as_slice() .first() - .filter(|v| **v <= pred_end) + .filter(|v| **v >= pred_end) .is_some() - { - self.interval_starts.as_owned()[0] = self.start_time; - } else { - let start = self.start_time; - self.interval_starts.as_owned().insert(0, start); - self.interval_ends.as_owned().insert(0, pred_end); - self.num_intervals += 1; - } + { + return; } - } - #[pg_extern] - pub fn live_ranges( - agg: HeartbeatAgg<'static>, - ) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { - let starts = agg.interval_starts.clone(); - let ends = agg.interval_ends.clone(); - TableIterator::new( - starts - .into_iter() - .map(|x| x.into()) - .zip(ends.into_iter().map(|x| x.into())), - ) + if self + .interval_starts + .as_slice() + .first() + .filter(|v| **v <= pred_end) + .is_some() + { + self.interval_starts.as_owned()[0] = self.start_time; + } else { + let start = self.start_time; + self.interval_starts.as_owned().insert(0, start); + self.interval_ends.as_owned().insert(0, pred_end); + self.num_intervals += 1; + } } +} - #[pg_extern] - pub fn dead_ranges( - agg: HeartbeatAgg<'static>, - ) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { - if agg.num_intervals == 0 { - return TableIterator::new(std::iter::once(( - agg.start_time.into(), - agg.end_time.into(), - ))); - } +#[pg_extern] +pub fn live_ranges( + agg: HeartbeatAgg<'static>, +) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { + let starts = agg.interval_starts.clone(); + let ends = agg.interval_ends.clone(); + TableIterator::new( + starts + .into_iter() + .map(|x| x.into()) + .zip(ends.into_iter().map(|x| x.into())), + ) +} - // Dead ranges are the opposite of the intervals stored in the aggregate - let mut starts = agg.interval_ends.clone().into_vec(); - let mut ends = agg.interval_starts.clone().into_vec(); +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_live_ranges( + sketch: HeartbeatAgg<'static>, + _accessor: AccessorLiveRanges<'static>, +) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { + live_ranges(sketch) +} - // Fix the first point depending on whether the aggregate starts in a live or dead range - if ends[0] == agg.start_time { - ends.remove(0); - } else { - starts.insert(0, agg.start_time); - } +#[pg_extern] +pub fn dead_ranges( + agg: HeartbeatAgg<'static>, +) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { + if agg.num_intervals == 0 { + return TableIterator::new(std::iter::once(( + agg.start_time.into(), + agg.end_time.into(), + ))); + } - // Fix the last point depending on whether the aggregate starts in a live or dead range - if *starts.last().unwrap() == agg.end_time { - starts.pop(); - } else { - ends.push(agg.end_time); - } + // Dead ranges are the opposite of the intervals stored in the aggregate + let mut starts = agg.interval_ends.clone().into_vec(); + let mut ends = agg.interval_starts.clone().into_vec(); - TableIterator::new( - starts - .into_iter() - .map(|x| x.into()) - .zip(ends.into_iter().map(|x| x.into())), - ) + // Fix the first point depending on whether the aggregate starts in a live or dead range + if ends[0] == agg.start_time { + ends.remove(0); + } else { + starts.insert(0, agg.start_time); } - #[pg_extern] - pub fn uptime(agg: HeartbeatAgg<'static>) -> Interval { - agg.sum_live_intervals().into() + // Fix the last point depending on whether the aggregate starts in a live or dead range + if *starts.last().unwrap() == agg.end_time { + starts.pop(); + } else { + ends.push(agg.end_time); } - #[pg_extern] - pub fn interpolated_uptime( - agg: HeartbeatAgg<'static>, - pred: Option>, - ) -> Interval { - uptime(interpolate_heartbeat_agg(agg, pred)) - } + TableIterator::new( + starts + .into_iter() + .map(|x| x.into()) + .zip(ends.into_iter().map(|x| x.into())), + ) +} - #[pg_extern] - pub fn downtime(agg: HeartbeatAgg<'static>) -> Interval { - (agg.end_time - agg.start_time - agg.sum_live_intervals()).into() - } +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_dead_ranges( + sketch: HeartbeatAgg<'static>, + _accessor: AccessorDeadRanges<'static>, +) -> TableIterator<'static, (name!(start, TimestampTz), name!(end, TimestampTz))> { + dead_ranges(sketch) +} + +#[pg_extern] +pub fn uptime(agg: HeartbeatAgg<'static>) -> Interval { + agg.sum_live_intervals().into() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_uptime( + sketch: HeartbeatAgg<'static>, + _accessor: AccessorUptime<'static>, +) -> Interval { + uptime(sketch) +} + +#[pg_extern] +pub fn interpolated_uptime( + agg: HeartbeatAgg<'static>, + pred: Option>, +) -> Interval { + uptime(interpolate_heartbeat_agg(agg, pred)) +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_interpolated_uptime( + sketch: HeartbeatAgg<'static>, + accessor: HeartbeatInterpolatedUptimeAccessor<'static>, +) -> Interval { + interpolated_uptime(sketch, accessor.pred()) +} - #[pg_extern] - pub fn interpolated_downtime( - agg: HeartbeatAgg<'static>, - pred: Option>, - ) -> Interval { - downtime(interpolate_heartbeat_agg(agg, pred)) +#[pg_extern] +pub fn downtime(agg: HeartbeatAgg<'static>) -> Interval { + (agg.end_time - agg.start_time - agg.sum_live_intervals()).into() +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_downtime( + sketch: HeartbeatAgg<'static>, + _accessor: AccessorDowntime<'static>, +) -> Interval { + downtime(sketch) +} + +#[pg_extern] +pub fn interpolated_downtime( + agg: HeartbeatAgg<'static>, + pred: Option>, +) -> Interval { + downtime(interpolate_heartbeat_agg(agg, pred)) +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_interpolated_downtime( + sketch: HeartbeatAgg<'static>, + accessor: HeartbeatInterpolatedDowntimeAccessor<'static>, +) -> Interval { + interpolated_downtime(sketch, accessor.pred()) +} + +#[pg_extern] +pub fn live_at(agg: HeartbeatAgg<'static>, test: TimestampTz) -> bool { + if agg.num_intervals == 0 { + return false; } - #[pg_extern] - pub fn live_at(agg: HeartbeatAgg<'static>, test: TimestampTz) -> bool { - if agg.num_intervals == 0 { + let test = i64::from(test); + let mut start_iter = agg.interval_starts.iter().enumerate().peekable(); + while let Some((idx, val)) = start_iter.next() { + if test < val { + // Only possible if test shows up before first interval return false; } - - let test = i64::from(test); - let mut start_iter = agg.interval_starts.iter().enumerate().peekable(); - while let Some((idx, val)) = start_iter.next() { - if test < val { - // Only possible if test shows up before first interval - return false; - } - if let Some((_, next_val)) = start_iter.peek() { - if test < *next_val { - return test < agg.interval_ends.as_slice()[idx]; - } + if let Some((_, next_val)) = start_iter.peek() { + if test < *next_val { + return test < agg.interval_ends.as_slice()[idx]; } } - // Fall out the loop if test > start of last interval - return test < *agg.interval_ends.as_slice().last().unwrap(); } + // Fall out the loop if test > start of last interval + return test < *agg.interval_ends.as_slice().last().unwrap(); +} - #[pg_extern(name = "interpolate")] - fn interpolate_heartbeat_agg( - agg: HeartbeatAgg<'static>, - pred: Option>, - ) -> HeartbeatAgg<'static> { - let mut r = agg.clone(); - if let Some(pred) = pred { - r.interpolate_start(&pred); - } - r +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_live_at( + sketch: HeartbeatAgg<'static>, + accessor: AccessorLiveAt<'static>, +) -> bool { + let ts = TimestampTz(accessor.time.into()); + live_at(sketch, ts) +} + +#[pg_extern(name = "interpolate")] +fn interpolate_heartbeat_agg( + agg: HeartbeatAgg<'static>, + pred: Option>, +) -> HeartbeatAgg<'static> { + let mut r = agg.clone(); + if let Some(pred) = pred { + r.interpolate_start(&pred); } + r +} + +#[pg_operator(immutable, parallel_safe)] +#[opname(->)] +pub fn arrow_heartbeat_agg_interpolate( + sketch: HeartbeatAgg<'static>, + accessor: HeartbeatInterpolateAccessor<'static>, +) -> HeartbeatAgg<'static> { + interpolate_heartbeat_agg(sketch, accessor.pred()) } -impl From> for HeartbeatTransState { - fn from(agg: toolkit_experimental::HeartbeatAgg<'static>) -> Self { +impl From> for HeartbeatTransState { + fn from(agg: HeartbeatAgg<'static>) -> Self { HeartbeatTransState { start: agg.start_time, end: agg.end_time, @@ -384,7 +460,7 @@ impl From> for HeartbeatTransState { } } -#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)] +#[pg_extern(immutable, parallel_safe)] pub fn heartbeat_trans( state: Internal, heartbeat: TimestampTz, @@ -425,17 +501,17 @@ pub fn heartbeat_trans_inner( } } -#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)] +#[pg_extern(immutable, parallel_safe)] pub fn heartbeat_final( state: Internal, fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { +) -> Option> { heartbeat_final_inner(unsafe { state.to_inner() }, fcinfo) } pub fn heartbeat_final_inner( state: Option>, fcinfo: pg_sys::FunctionCallInfo, -) -> Option> { +) -> Option> { unsafe { in_aggregate_context(fcinfo, || { state.map(|mut s| { @@ -464,17 +540,17 @@ pub fn heartbeat_final_inner( } } -#[pg_extern(schema = "toolkit_experimental", immutable, parallel_safe)] +#[pg_extern(immutable, parallel_safe)] pub fn heartbeat_rollup_trans( state: Internal, - value: Option>, + value: Option>, fcinfo: pg_sys::FunctionCallInfo, ) -> Option { heartbeat_rollup_trans_inner(unsafe { state.to_inner() }, value, fcinfo).internal() } pub fn heartbeat_rollup_trans_inner( state: Option>, - value: Option>, + value: Option>, fcinfo: pg_sys::FunctionCallInfo, ) -> Option> { unsafe { @@ -491,12 +567,12 @@ pub fn heartbeat_rollup_trans_inner( extension_sql!( "\n\ - CREATE AGGREGATE toolkit_experimental.heartbeat_agg(\n\ + CREATE AGGREGATE heartbeat_agg(\n\ heartbeat TIMESTAMPTZ, agg_start TIMESTAMPTZ, agg_duration INTERVAL, heartbeat_liveness INTERVAL\n\ ) (\n\ - sfunc = toolkit_experimental.heartbeat_trans,\n\ + sfunc = heartbeat_trans,\n\ stype = internal,\n\ - finalfunc = toolkit_experimental.heartbeat_final\n\ + finalfunc = heartbeat_final\n\ );\n\ ", name = "heartbeat_agg", @@ -508,12 +584,12 @@ extension_sql!( extension_sql!( "\n\ - CREATE AGGREGATE toolkit_experimental.rollup(\n\ - toolkit_experimental.HeartbeatAgg\n\ + CREATE AGGREGATE rollup(\n\ + HeartbeatAgg\n\ ) (\n\ - sfunc = toolkit_experimental.heartbeat_rollup_trans,\n\ + sfunc = heartbeat_rollup_trans,\n\ stype = internal,\n\ - finalfunc = toolkit_experimental.heartbeat_final\n\ + finalfunc = heartbeat_final\n\ );\n\ ", name = "heartbeat_agg_rollup", @@ -619,97 +695,222 @@ mod tests { .unwrap(); let mut result = client.update( - "SELECT toolkit_experimental.live_ranges(toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + "SELECT live_ranges(heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT FROM liveness", None, None).unwrap(); + let mut arrow_result = client.update( + "SELECT (heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') -> live_ranges())::TEXT + FROM liveness", None, None).unwrap(); + + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:02:20+00\",\"2020-01-01 00:27:00+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:30:00+00\",\"2020-01-01 00:50:00+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:50:30+00\",\"2020-01-01 01:38:00+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 01:38:01+00\",\"2020-01-01 02:00:00+00\")" ); assert!(result.next().is_none()); + assert!(arrow_result.next().is_none()); let mut result = client.update( - "SELECT toolkit_experimental.dead_ranges(toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + "SELECT dead_ranges(heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + FROM liveness", None, None).unwrap(); + + let mut arrow_result = client.update( + "SELECT (heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') -> dead_ranges())::TEXT FROM liveness", None, None).unwrap(); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:00:00+00\",\"2020-01-01 00:02:20+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:27:00+00\",\"2020-01-01 00:30:00+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 00:50:00+00\",\"2020-01-01 00:50:30+00\")" ); + let test = arrow_result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() .unwrap() .unwrap(), + test + ); + assert_eq!( + test, "(\"2020-01-01 01:38:00+00\",\"2020-01-01 01:38:01+00\")" ); assert!(result.next().is_none()); + assert!(arrow_result.next().is_none()); + + let result = client + .update( + "SELECT uptime(heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + FROM liveness", + None, + None, + ) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!("01:54:09", result); let result = client.update( - "SELECT toolkit_experimental.uptime(toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + "SELECT (heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') -> uptime())::TEXT FROM liveness", None, None).unwrap().first().get_one::().unwrap().unwrap(); assert_eq!("01:54:09", result); + let result = client + .update( + "SELECT downtime(heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + FROM liveness", + None, + None, + ) + .unwrap() + .first() + .get_one::() + .unwrap() + .unwrap(); + assert_eq!("00:05:51", result); + let result = client.update( - "SELECT toolkit_experimental.downtime(toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m'))::TEXT + "SELECT (heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') -> downtime())::TEXT FROM liveness", None, None).unwrap().first().get_one::().unwrap().unwrap(); assert_eq!("00:05:51", result); let (result1, result2, result3) = client.update( - "WITH agg AS (SELECT toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT toolkit_experimental.live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT, - toolkit_experimental.live_at(agg, '01-01-2020 00:05:00 UTC')::TEXT, - toolkit_experimental.live_at(agg, '01-01-2020 00:30:00 UTC')::TEXT FROM agg", None, None) + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT live_at(agg, '01-01-2020 00:01:00 UTC')::TEXT, + live_at(agg, '01-01-2020 00:05:00 UTC')::TEXT, + live_at(agg, '01-01-2020 00:30:00 UTC')::TEXT FROM agg", None, None) .unwrap().first() .get_three::().unwrap(); let (result4, result5) = client.update( - "WITH agg AS (SELECT toolkit_experimental.heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) - SELECT toolkit_experimental.live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT, - toolkit_experimental.live_at(agg, '01-01-2020 02:01:00 UTC')::TEXT FROM agg", None, None) + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT live_at(agg, '01-01-2020 01:38:00 UTC')::TEXT, + live_at(agg, '01-01-2020 02:01:00 UTC')::TEXT FROM agg", None, None) + .unwrap().first() + .get_two::().unwrap(); + + assert_eq!(result1.unwrap(), "false"); // outside ranges + assert_eq!(result2.unwrap(), "true"); // inside ranges + assert_eq!(result3.unwrap(), "true"); // first point of range + assert_eq!(result4.unwrap(), "false"); // last point of range + assert_eq!(result5.unwrap(), "false"); // outside aggregate + + let (result1, result2, result3) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT (agg -> live_at('01-01-2020 00:01:00 UTC'))::TEXT, + (agg -> live_at('01-01-2020 00:05:00 UTC'))::TEXT, + (agg -> live_at('01-01-2020 00:30:00 UTC'))::TEXT FROM agg", None, None) + .unwrap().first() + .get_three::().unwrap(); + + let (result4, result5) = + client.update( + "WITH agg AS (SELECT heartbeat_agg(heartbeat, '01-01-2020 UTC', '2h', '10m') AS agg FROM liveness) + SELECT (agg -> live_at('01-01-2020 01:38:00 UTC'))::TEXT, + (agg -> live_at('01-01-2020 02:01:00 UTC'))::TEXT FROM agg", None, None) .unwrap().first() .get_two::().unwrap(); @@ -755,10 +956,10 @@ mod tests { let result = client .update( "WITH aggs AS ( - SELECT toolkit_experimental.heartbeat_agg(time, batch, '1h', '1m') + SELECT heartbeat_agg(time, batch, '1h', '1m') FROM heartbeats GROUP BY batch - ) SELECT toolkit_experimental.rollup(heartbeat_agg)::TEXT FROM aggs", + ) SELECT rollup(heartbeat_agg)::TEXT FROM aggs", None, None, ) @@ -777,15 +978,12 @@ mod tests { client.update("SET TIMEZONE to UTC", None, None).unwrap(); client - .update( - "CREATE TABLE aggs(agg toolkit_experimental.heartbeatagg)", - None, - None, - ) + .update("CREATE TABLE aggs(agg heartbeatagg)", None, None) .unwrap(); - client.update( - "INSERT INTO aggs SELECT toolkit_experimental.heartbeat_agg(hb, '01-01-2020 UTC', '1h', '10m') + client + .update( + "INSERT INTO aggs SELECT heartbeat_agg(hb, '01-01-2020 UTC', '1h', '10m') FROM (VALUES ('01-01-2020 0:2:20 UTC'::timestamptz), ('01-01-2020 0:10 UTC'::timestamptz), @@ -797,10 +995,12 @@ mod tests { ) AS _(hb)", None, None, - ).unwrap(); + ) + .unwrap(); - client.update( - "INSERT INTO aggs SELECT toolkit_experimental.heartbeat_agg(hb, '01-01-2020 0:30 UTC', '1h', '10m') + client + .update( + "INSERT INTO aggs SELECT heartbeat_agg(hb, '01-01-2020 0:30 UTC', '1h', '10m') FROM (VALUES ('01-01-2020 0:35 UTC'::timestamptz), ('01-01-2020 0:40 UTC'::timestamptz), @@ -810,10 +1010,12 @@ mod tests { ) AS _(hb)", None, None, - ).unwrap(); + ) + .unwrap(); - client.update( - "INSERT INTO aggs SELECT toolkit_experimental.heartbeat_agg(hb, '01-01-2020 1:00 UTC', '1h', '10m') + client + .update( + "INSERT INTO aggs SELECT heartbeat_agg(hb, '01-01-2020 1:00 UTC', '1h', '10m') FROM (VALUES ('01-01-2020 1:00 UTC'::timestamptz), ('01-01-2020 1:28 UTC'::timestamptz), @@ -826,14 +1028,17 @@ mod tests { ) AS _(hb)", None, None, - ).unwrap(); + ) + .unwrap(); - let mut result = client.update( - "SELECT toolkit_experimental.dead_ranges(toolkit_experimental.rollup(agg))::TEXT + let mut result = client + .update( + "SELECT dead_ranges(rollup(agg))::TEXT FROM aggs", - None, - None, - ).unwrap(); + None, + None, + ) + .unwrap(); assert_eq!( result.next().unwrap()[1] @@ -909,17 +1114,22 @@ mod tests { ) .unwrap(); - let mut result = client.update( - "WITH s AS ( + let mut result = client + .update( + "WITH s AS ( SELECT start, - toolkit_experimental.heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg FROM liveness GROUP BY start), t AS ( SELECT start, - toolkit_experimental.interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg + interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg FROM s) - SELECT toolkit_experimental.downtime(agg)::TEXT FROM t;", None, None).unwrap(); + SELECT downtime(agg)::TEXT FROM t;", + None, + None, + ) + .unwrap(); assert_eq!( result.next().unwrap()[1] @@ -951,17 +1161,22 @@ mod tests { ); assert!(result.next().is_none()); - let mut result = client.update( - "WITH s AS ( + let mut result = client + .update( + "WITH s AS ( SELECT start, - toolkit_experimental.heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg FROM liveness GROUP BY start), t AS ( SELECT start, - toolkit_experimental.interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg + interpolate(agg, LAG (agg) OVER (ORDER BY start)) AS agg FROM s) - SELECT toolkit_experimental.live_ranges(agg)::TEXT FROM t;", None, None).unwrap(); + SELECT live_ranges(agg)::TEXT FROM t;", + None, + None, + ) + .unwrap(); assert_eq!( result.next().unwrap()[1] .value::() @@ -1006,14 +1221,123 @@ mod tests { ); assert!(result.next().is_none()); - let mut result = client.update( - "WITH s AS ( + let mut result = client + .update( + "WITH s AS ( + SELECT start, + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness + GROUP BY start), + t AS ( + SELECT start, + agg -> interpolate(LAG (agg) OVER (ORDER BY start)) AS agg + FROM s) + SELECT live_ranges(agg)::TEXT FROM t;", + None, + None, + ) + .unwrap(); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 00:02:20+00\",\"2020-01-01 00:27:00+00\")" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 00:30:00+00\",\"2020-01-01 00:50:00+00\")" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 00:50:30+00\",\"2020-01-01 01:00:00+00\")" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 01:00:00+00\",\"2020-01-01 01:30:00+00\")" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 01:30:00+00\",\"2020-01-01 01:38:00+00\")" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "(\"2020-01-01 01:38:01+00\",\"2020-01-01 02:00:00+00\")" + ); + assert!(result.next().is_none()); + + let mut result = client + .update( + "WITH s AS ( + SELECT start, + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness + GROUP BY start) + SELECT interpolated_uptime(agg, LAG (agg) OVER (ORDER BY start))::TEXT + FROM s", + None, + None, + ) + .unwrap(); + + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:24:40" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:29:30" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:30:00" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:29:59" + ); + assert!(result.next().is_none()); + + let mut result = client + .update( + "WITH s AS ( SELECT start, - toolkit_experimental.heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg FROM liveness GROUP BY start) - SELECT toolkit_experimental.interpolated_uptime(agg, LAG (agg) OVER (ORDER BY start))::TEXT - FROM s", None, None).unwrap(); + SELECT (agg -> interpolated_uptime(LAG (agg) OVER (ORDER BY start)))::TEXT + FROM s", + None, + None, + ) + .unwrap(); assert_eq!( result.next().unwrap()[1] @@ -1044,6 +1368,214 @@ mod tests { "00:29:59" ); assert!(result.next().is_none()); + + let mut result = client + .update( + "WITH s AS ( + SELECT start, + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness + GROUP BY start) + SELECT interpolated_downtime(agg, LAG (agg) OVER (ORDER BY start))::TEXT + FROM s", + None, + None, + ) + .unwrap(); + + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:05:20" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:30" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:00" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:01" + ); + assert!(result.next().is_none()); + + let mut result = client + .update( + "WITH s AS ( + SELECT start, + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness + GROUP BY start) + SELECT (agg -> interpolated_downtime(LAG (agg) OVER (ORDER BY start)))::TEXT + FROM s", + None, + None, + ) + .unwrap(); + + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:05:20" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:30" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:00" + ); + assert_eq!( + result.next().unwrap()[1] + .value::() + .unwrap() + .unwrap(), + "00:00:01" + ); + assert!(result.next().is_none()); }) } + + #[pg_test] + fn test_heartbeat_agg_text_io() { + Spi::connect(|mut client| { + client.update("SET TIMEZONE to UTC", None, None).unwrap(); + + client + .update("CREATE TABLE liveness(heartbeat TIMESTAMPTZ)", None, None) + .unwrap(); + + client + .update( + "INSERT INTO liveness VALUES + ('01-01-2020 0:2:20 UTC'), + ('01-01-2020 0:10 UTC'), + ('01-01-2020 0:17 UTC') + ", + None, + None, + ) + .unwrap(); + + let output = client + .update( + "SELECT heartbeat_agg(heartbeat, '01-01-2020', '30m', '5m')::TEXT + FROM liveness;", + None, + None, + ) + .unwrap() + .first() + .get_one::() + .unwrap(); + + let expected = "(version:1,start_time:631152000000000,end_time:631153800000000,last_seen:631153020000000,interval_len:300000000,num_intervals:3,interval_starts:[631152140000000,631152600000000,631153020000000],interval_ends:[631152440000000,631152900000000,631153320000000])"; + + assert_eq!(output, Some(expected.into())); + + let estimate = client + .update( + &format!("SELECT uptime('{}'::heartbeatagg)::TEXT", expected), + None, + None, + ) + .unwrap() + .first() + .get_one::() + .unwrap(); + assert_eq!(estimate.unwrap().as_str(), "00:15:00"); + }); + } + + #[pg_test] + fn test_heartbeat_agg_byte_io() { + use std::ptr; + + // Create a heartbeat agg from 0 to 250 with intervals from 40-50, 60-85, and 100-110 + let state = heartbeat_trans_inner( + None, + 40.into(), + 0.into(), + 250.into(), + 10.into(), + ptr::null_mut(), + ); + let state = heartbeat_trans_inner( + state, + 60.into(), + 0.into(), + 250.into(), + 10.into(), + ptr::null_mut(), + ); + let state = heartbeat_trans_inner( + state, + 65.into(), + 0.into(), + 250.into(), + 10.into(), + ptr::null_mut(), + ); + let state = heartbeat_trans_inner( + state, + 75.into(), + 0.into(), + 250.into(), + 10.into(), + ptr::null_mut(), + ); + let state = heartbeat_trans_inner( + state, + 100.into(), + 0.into(), + 250.into(), + 10.into(), + ptr::null_mut(), + ); + + let agg = heartbeat_final_inner(state, ptr::null_mut()) + .expect("failed to build finalized heartbeat_agg"); + let serial = agg.to_pg_bytes(); + + let expected = [ + 128, 1, 0, 0, // header + 1, // version + 0, 0, 0, // padding + 0, 0, 0, 0, 0, 0, 0, 0, // start_time + 250, 0, 0, 0, 0, 0, 0, 0, // end_time + 100, 0, 0, 0, 0, 0, 0, 0, // last_seen + 10, 0, 0, 0, 0, 0, 0, 0, // interval_len + 3, 0, 0, 0, 0, 0, 0, 0, // num_intervals + 40, 0, 0, 0, 0, 0, 0, 0, // interval_starts[0] + 60, 0, 0, 0, 0, 0, 0, 0, // interval_starts[1] + 100, 0, 0, 0, 0, 0, 0, 0, // interval_starts[2] + 50, 0, 0, 0, 0, 0, 0, 0, // interval_ends[0] + 85, 0, 0, 0, 0, 0, 0, 0, // interval_ends[1] + 110, 0, 0, 0, 0, 0, 0, 0, // interval_ends[2] + ]; + assert_eq!(serial, expected); + } } diff --git a/extension/src/heartbeat_agg/accessors.rs b/extension/src/heartbeat_agg/accessors.rs new file mode 100644 index 00000000..e60b7c95 --- /dev/null +++ b/extension/src/heartbeat_agg/accessors.rs @@ -0,0 +1,123 @@ +use pgx::*; + +use crate::{ + flatten, + heartbeat_agg::{HeartbeatAgg, HeartbeatAggData}, + pg_type, ron_inout_funcs, +}; + +fn empty_agg<'a>() -> HeartbeatAgg<'a> { + unsafe { + flatten!(HeartbeatAgg { + start_time: 0, + end_time: 0, + last_seen: 0, + interval_len: 0, + num_intervals: 0, + interval_starts: vec!().into(), + interval_ends: vec!().into(), + }) + } +} + +pg_type! { + struct HeartbeatInterpolatedUptimeAccessor<'input> { + has_prev : u64, + prev : HeartbeatAggData<'input>, + } +} + +ron_inout_funcs!(HeartbeatInterpolatedUptimeAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolated_uptime")] +fn heartbeat_agg_interpolated_uptime_accessor<'a>( + prev: Option>, +) -> HeartbeatInterpolatedUptimeAccessor<'a> { + let has_prev = u64::from(prev.is_some()); + let prev = prev.unwrap_or_else(empty_agg).0; + + crate::build! { + HeartbeatInterpolatedUptimeAccessor { + has_prev, + prev, + } + } +} + +impl<'a> HeartbeatInterpolatedUptimeAccessor<'a> { + pub fn pred(&self) -> Option> { + if self.has_prev == 0 { + None + } else { + Some(self.prev.clone().into()) + } + } +} + +pg_type! { + struct HeartbeatInterpolatedDowntimeAccessor<'input> { + has_prev : u64, + prev : HeartbeatAggData<'input>, + } +} + +ron_inout_funcs!(HeartbeatInterpolatedDowntimeAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolated_downtime")] +fn heartbeat_agg_interpolated_downtime_accessor<'a>( + prev: Option>, +) -> HeartbeatInterpolatedDowntimeAccessor<'a> { + let has_prev = u64::from(prev.is_some()); + let prev = prev.unwrap_or_else(empty_agg).0; + + crate::build! { + HeartbeatInterpolatedDowntimeAccessor { + has_prev, + prev, + } + } +} + +impl<'a> HeartbeatInterpolatedDowntimeAccessor<'a> { + pub fn pred(&self) -> Option> { + if self.has_prev == 0 { + None + } else { + Some(self.prev.clone().into()) + } + } +} + +pg_type! { + struct HeartbeatInterpolateAccessor<'input> { + has_prev : u64, + prev : HeartbeatAggData<'input>, + } +} + +ron_inout_funcs!(HeartbeatInterpolateAccessor); + +#[pg_extern(immutable, parallel_safe, name = "interpolate")] +fn heartbeat_agg_interpolate_accessor<'a>( + prev: Option>, +) -> HeartbeatInterpolateAccessor<'a> { + let has_prev = u64::from(prev.is_some()); + let prev = prev.unwrap_or_else(empty_agg).0; + + crate::build! { + HeartbeatInterpolateAccessor { + has_prev, + prev, + } + } +} + +impl<'a> HeartbeatInterpolateAccessor<'a> { + pub fn pred(&self) -> Option> { + if self.has_prev == 0 { + None + } else { + Some(self.prev.clone().into()) + } + } +} diff --git a/extension/src/stabilization_info.rs b/extension/src/stabilization_info.rs index 1fc8dd3d..bd6f165b 100644 --- a/extension/src/stabilization_info.rs +++ b/extension/src/stabilization_info.rs @@ -24,6 +24,53 @@ crate::functions_stabilized_at! { interpolated_rate(timestamp with time zone,interval,countersummary,countersummary), timeweightinterpolatedaverageaccessor_in(cstring), timeweightinterpolatedaverageaccessor_out(timeweightinterpolatedaverageaccessor), + dead_ranges(heartbeatagg), + downtime(heartbeatagg), + heartbeat_agg(timestamp with time zone,timestamp with time zone,interval,interval), + heartbeat_final(internal), + heartbeat_rollup_trans(internal,heartbeatagg), + heartbeat_trans(internal,timestamp with time zone,timestamp with time zone,interval,interval), + heartbeatagg_in(cstring), + heartbeatagg_out(heartbeatagg), + interpolate(heartbeatagg,heartbeatagg), + interpolated_downtime(heartbeatagg,heartbeatagg), + interpolated_uptime(heartbeatagg,heartbeatagg), + live_at(heartbeatagg,timestamp with time zone), + live_ranges(heartbeatagg), + rollup(heartbeatagg), + uptime(heartbeatagg), + accessordeadranges_in(cstring), + accessordeadranges_out(accessordeadranges), + accessordowntime_in(cstring), + accessordowntime_out(accessordowntime), + accessorliveat_in(cstring), + accessorliveat_out(accessorliveat), + accessorliveranges_in(cstring), + accessorliveranges_out(accessorliveranges), + accessoruptime_in(cstring), + accessoruptime_out(accessoruptime), + arrow_heartbeat_agg_dead_ranges(heartbeatagg,accessordeadranges), + arrow_heartbeat_agg_downtime(heartbeatagg,accessordowntime), + arrow_heartbeat_agg_live_at(heartbeatagg,accessorliveat), + arrow_heartbeat_agg_live_ranges(heartbeatagg,accessorliveranges), + arrow_heartbeat_agg_uptime(heartbeatagg,accessoruptime), + dead_ranges(), + downtime(), + live_at(timestamp with time zone), + live_ranges(), + uptime(), + arrow_heartbeat_agg_interpolate(heartbeatagg,heartbeatinterpolateaccessor), + arrow_heartbeat_agg_interpolated_downtime(heartbeatagg,heartbeatinterpolateddowntimeaccessor), + arrow_heartbeat_agg_interpolated_uptime(heartbeatagg,heartbeatinterpolateduptimeaccessor), + heartbeatinterpolateaccessor_in(cstring), + heartbeatinterpolateaccessor_out(heartbeatinterpolateaccessor), + heartbeatinterpolateddowntimeaccessor_in(cstring), + heartbeatinterpolateddowntimeaccessor_out(heartbeatinterpolateddowntimeaccessor), + heartbeatinterpolateduptimeaccessor_in(cstring), + heartbeatinterpolateduptimeaccessor_out(heartbeatinterpolateduptimeaccessor), + interpolate(heartbeatagg), + interpolated_downtime(heartbeatagg), + interpolated_uptime(heartbeatagg), } "1.14.0" => { interpolated_average(timeweightsummary,timestamp with time zone,interval,timeweightsummary,timeweightsummary), @@ -510,6 +557,15 @@ crate::types_stabilized_at! { counterinterpolateddeltaaccessor, counterinterpolatedrateaccessor, timeweightinterpolatedaverageaccessor, + heartbeatagg, + accessordeadranges, + accessordowntime, + accessorliveat, + accessorliveranges, + accessoruptime, + heartbeatinterpolateaccessor, + heartbeatinterpolateddowntimeaccessor, + heartbeatinterpolateduptimeaccessor, } "1.14.0" => { candlestick, @@ -604,6 +660,14 @@ crate::operators_stabilized_at! { "->"(countersummary,counterinterpolateddeltaaccessor), "->"(countersummary,counterinterpolatedrateaccessor), "->"(timeweightsummary,timeweightinterpolatedaverageaccessor), + "->"(heartbeatagg,accessordeadranges), + "->"(heartbeatagg,accessordowntime), + "->"(heartbeatagg,accessorliveat), + "->"(heartbeatagg,accessorliveranges), + "->"(heartbeatagg,accessoruptime), + "->"(heartbeatagg,heartbeatinterpolateaccessor), + "->"(heartbeatagg,heartbeatinterpolateddowntimeaccessor), + "->"(heartbeatagg,heartbeatinterpolateduptimeaccessor), } "1.14.0" => { "->"(candlestick,accessorclose), diff --git a/tests/update/heartbeat.md b/tests/update/heartbeat.md new file mode 100644 index 00000000..aad0552e --- /dev/null +++ b/tests/update/heartbeat.md @@ -0,0 +1,53 @@ +# Candlestick Tests + +## Get candlestick values from tick data + + +```sql,creation,min-toolkit-version=1.15.0 +CREATE TABLE liveness(heartbeat TIMESTAMPTZ, start TIMESTAMPTZ); +INSERT INTO liveness VALUES + ('01-01-2020 0:2:20 UTC', '01-01-2020 0:0 UTC'), + ('01-01-2020 0:10 UTC', '01-01-2020 0:0 UTC'), + ('01-01-2020 0:17 UTC', '01-01-2020 0:0 UTC'), + ('01-01-2020 0:30 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:35 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:40 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:35 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:40 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:40 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 0:50:30 UTC', '01-01-2020 0:30 UTC'), + ('01-01-2020 1:00:30 UTC', '01-01-2020 1:00 UTC'), + ('01-01-2020 1:08 UTC', '01-01-2020 1:00 UTC'), + ('01-01-2020 1:18 UTC', '01-01-2020 1:00 UTC'), + ('01-01-2020 1:28 UTC', '01-01-2020 1:00 UTC'), + ('01-01-2020 1:38:01 UTC', '01-01-2020 1:30 UTC'), + ('01-01-2020 1:40 UTC', '01-01-2020 1:30 UTC'), + ('01-01-2020 1:40:01 UTC', '01-01-2020 1:30 UTC'), + ('01-01-2020 1:50:01 UTC', '01-01-2020 1:30 UTC'), + ('01-01-2020 1:57 UTC', '01-01-2020 1:30 UTC'), + ('01-01-2020 1:59:50 UTC', '01-01-2020 1:30 UTC'); + +CREATE MATERIALIZED VIEW hb AS + SELECT start, + heartbeat_agg(heartbeat, start, '30m', '10m') AS agg + FROM liveness + GROUP BY start; +``` + +```sql,validation,min-toolkit-version=1.15.0 +SELECT + start, + uptime(agg), + interpolated_uptime(agg, LAG(agg) OVER (ORDER by start)) +FROM hb +ORDER BY start; +``` + +```output + start | uptime | interpolated_uptime +------------------------+----------+--------------------- + 2020-01-01 00:00:00+00 | 00:24:40 | 00:24:40 + 2020-01-01 00:30:00+00 | 00:29:30 | 00:29:30 + 2020-01-01 01:00:00+00 | 00:29:30 | 00:30:00 + 2020-01-01 01:30:00+00 | 00:21:59 | 00:29:59 + ```