-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding asof joins for timevector #635
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,8 +7,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo | |
## Next Release (Date TBD) | ||
|
||
#### New experimental features | ||
- [#615](https://github.com/timescale/timescaledb-toolkit/pull/615): Heartbeat aggregate | ||
|
||
- [#615](https://github.com/timescale/timescaledb-toolkit/pull/615): Heatbeat aggregate | ||
Users can use the new `heartbeat_agg(timestamp, start_time, agg_interval, heartbeat_interval)` to track the liveness of a system in the range (`start_time`, `start_time` + `agg_interval`). Each timestamp seen in that range is assumed to indicate system liveness for the following `heartbeat_interval`. | ||
|
||
Once constructed, users can query heartbeat aggregates for `uptime` and `downtime`, as well as query for `live_ranges` or `dead_ranges`. Users can also check for `live_at(timestamp)`. | ||
|
@@ -23,6 +22,10 @@ This changelog should be updated as part of a PR if the work is worth noting (mo | |
|
||
[Examples](docs/examples/) | ||
|
||
- [#635](https://github.com/timescale/timescaledb-toolkit/pull/635): AsOf joins for timevectors | ||
|
||
This allows users to join two timevectors with the following semantics `timevectorA -> asof(timevectorB)`. This will return records with the LOCF value from timevectorA at the timestamps from timevectorB. Specifically the returned records contain, for each value in timevectorB, {the LOCF value from timevectorA, the value from timevectorB, the timestamp from timevectorB}. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without a blank line between lines 15 and 16 here, Github squishes it all into one paragraph. The blank does not disrupt the list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed this and merged your fix for the above. Will merge once I verify everything looks good. |
||
|
||
#### Bug fixes | ||
|
||
#### Other notable changes | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,7 +4,7 @@ use pgx::{iter::TableIterator, *}; | |||||
|
||||||
use crate::{ | ||||||
aggregate_utils::in_aggregate_context, | ||||||
build, | ||||||
build, flatten, | ||||||
palloc::{Inner, Internal, InternalAsValue, ToInternal}, | ||||||
pg_type, ron_inout_funcs, | ||||||
}; | ||||||
|
@@ -368,6 +368,95 @@ CREATE AGGREGATE rollup(\n\ | |||||
], | ||||||
); | ||||||
|
||||||
#[pg_schema] | ||||||
pub mod toolkit_experimental { | ||||||
use super::*; | ||||||
|
||||||
// Only making this available through the arrow operator right now, as the semantics are cleaner that way | ||||||
pub fn asof_join<'a, 'b>( | ||||||
from: Timevector_TSTZ_F64<'a>, | ||||||
into: Timevector_TSTZ_F64<'b>, | ||||||
) -> TableIterator< | ||||||
'a, | ||||||
( | ||||||
name!(value1, Option<f64>), | ||||||
name!(value2, f64), | ||||||
name!(time, crate::raw::TimestampTz), | ||||||
), | ||||||
> { | ||||||
assert!( | ||||||
from.num_points > 0 && into.num_points > 0, | ||||||
"both timevectors must be populated for an asof join" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we test these two errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added...it's actually surprisingly difficult to create an empty timevector. |
||||||
); | ||||||
let mut from = from | ||||||
.into_iter() | ||||||
.map(|points| (points.ts.into(), points.val)) | ||||||
.peekable(); | ||||||
let into = into.into_iter().map(|points| (points.ts, points.val)); | ||||||
let (mut from_time, mut from_val) = from.next().unwrap(); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default error message from
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, however it's also going to have potentially ugly behavior if the into vector is empty, so I added an assert that neither is empty to the start of this function. |
||||||
|
||||||
let mut results = vec![]; | ||||||
for (into_time, into_val) in into { | ||||||
// Handle case where into starts before from | ||||||
if into_time < from_time { | ||||||
results.push((None, into_val, crate::raw::TimestampTz::from(into_time))); | ||||||
continue; | ||||||
} | ||||||
|
||||||
while let Some((peek_time, _)) = from.peek() { | ||||||
if *peek_time > into_time { | ||||||
break; | ||||||
} | ||||||
(from_time, from_val) = from.next().unwrap(); | ||||||
} | ||||||
|
||||||
results.push(( | ||||||
Some(from_val), | ||||||
into_val, | ||||||
crate::raw::TimestampTz::from(into_time), | ||||||
)); | ||||||
} | ||||||
|
||||||
TableIterator::new(results.into_iter()) | ||||||
} | ||||||
|
||||||
pg_type! { | ||||||
#[derive(Debug)] | ||||||
struct AccessorAsof<'input> { | ||||||
into: Timevector_TSTZ_F64Data<'input>, | ||||||
} | ||||||
} | ||||||
|
||||||
ron_inout_funcs!(AccessorAsof); | ||||||
|
||||||
#[pg_extern(immutable, parallel_safe, name = "asof")] | ||||||
pub fn accessor_asof<'a>(tv: Timevector_TSTZ_F64<'a>) -> AccessorAsof<'static> { | ||||||
unsafe { | ||||||
flatten! { | ||||||
AccessorAsof { | ||||||
into: tv.0 | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
#[pg_operator(immutable, parallel_safe)] | ||||||
#[opname(->)] | ||||||
pub fn arrow_timevector_asof<'a>( | ||||||
series: Timevector_TSTZ_F64<'a>, | ||||||
accessor: toolkit_experimental::AccessorAsof<'a>, | ||||||
) -> TableIterator< | ||||||
'a, | ||||||
( | ||||||
name!(value1, Option<f64>), | ||||||
name!(value2, f64), | ||||||
name!(time, crate::raw::TimestampTz), | ||||||
), | ||||||
> { | ||||||
toolkit_experimental::asof_join(series, accessor.into.clone().into()) | ||||||
} | ||||||
|
||||||
#[cfg(any(test, feature = "pg_test"))] | ||||||
#[pg_schema] | ||||||
mod tests { | ||||||
|
@@ -644,4 +733,86 @@ mod tests { | |||||
assert_eq!(tvec, expected); | ||||||
}) | ||||||
} | ||||||
|
||||||
#[pg_test] | ||||||
fn test_asof_join() { | ||||||
Spi::execute(|client| { | ||||||
client.select("SET timezone TO 'UTC'", None, None); | ||||||
|
||||||
let mut result = client.select( | ||||||
"WITH s as ( | ||||||
SELECT timevector(time, value) AS v1 FROM | ||||||
(VALUES | ||||||
('2022-10-1 1:00 UTC'::TIMESTAMPTZ, 20.0), | ||||||
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 30.0), | ||||||
('2022-10-1 3:00 UTC'::TIMESTAMPTZ, 40.0) | ||||||
) as v(time, value)), | ||||||
t as ( | ||||||
SELECT timevector(time, value) AS v2 FROM | ||||||
(VALUES | ||||||
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0), | ||||||
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0), | ||||||
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0) | ||||||
) as v(time, value)) | ||||||
SELECT (v1 -> toolkit_experimental.asof(v2))::TEXT | ||||||
FROM s, t;", | ||||||
None, | ||||||
None, | ||||||
); | ||||||
|
||||||
assert_eq!( | ||||||
result.next().unwrap()[1].value(), | ||||||
Some("(,15,\"2022-10-01 00:30:00+00\")") | ||||||
); | ||||||
assert_eq!( | ||||||
result.next().unwrap()[1].value(), | ||||||
Some("(30,45,\"2022-10-01 02:00:00+00\")") | ||||||
); | ||||||
assert_eq!( | ||||||
result.next().unwrap()[1].value(), | ||||||
Some("(40,60,\"2022-10-01 03:30:00+00\")") | ||||||
); | ||||||
assert!(result.next().is_none()); | ||||||
}) | ||||||
} | ||||||
|
||||||
#[pg_test(error = "both timevectors must be populated for an asof join")] | ||||||
fn test_asof_none() { | ||||||
Spi::execute(|client| { | ||||||
client.select("SET timezone TO 'UTC'", None, None); | ||||||
|
||||||
client.select( | ||||||
"WITH s as ( | ||||||
SELECT timevector(now(), 0) -> toolkit_experimental.filter($$ $value != 0 $$) AS empty), | ||||||
t as ( | ||||||
SELECT timevector(time, value) AS valid FROM | ||||||
(VALUES | ||||||
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0), | ||||||
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0), | ||||||
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0) | ||||||
) as v(time, value)) | ||||||
SELECT (valid -> toolkit_experimental.asof(empty)) | ||||||
FROM s, t;", None, None); | ||||||
}) | ||||||
} | ||||||
|
||||||
#[pg_test(error = "both timevectors must be populated for an asof join")] | ||||||
fn test_none_asof() { | ||||||
Spi::execute(|client| { | ||||||
client.select("SET timezone TO 'UTC'", None, None); | ||||||
|
||||||
client.select( | ||||||
"WITH s as ( | ||||||
SELECT timevector(now(), 0) -> toolkit_experimental.filter($$ $value != 0 $$) AS empty), | ||||||
t as ( | ||||||
SELECT timevector(time, value) AS valid FROM | ||||||
(VALUES | ||||||
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0), | ||||||
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0), | ||||||
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0) | ||||||
) as v(time, value)) | ||||||
SELECT (empty -> toolkit_experimental.asof(valid)) | ||||||
FROM s, t;", None, None); | ||||||
}) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe we caught the same thing! Also "heatbeat" though, and also there are some rendering problems without blank lines. I sent a pull request for these.