Skip to content

Commit

Permalink
Adding asof joins for timevector
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Rowe committed Nov 24, 2022
1 parent 0eca871 commit b6c3c36
Showing 1 changed file with 163 additions and 1 deletion.
164 changes: 163 additions & 1 deletion extension/src/time_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use pgx::{iter::TableIterator, *};

use crate::{
aggregate_utils::in_aggregate_context,
build,
build, flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type, ron_inout_funcs,
};
Expand Down Expand Up @@ -368,6 +368,91 @@ CREATE AGGREGATE rollup(\n\
],
);

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

#[pg_extern(immutable, parallel_safe)]
pub fn asof_join<'a, 'b>(
from: Timevector_TSTZ_F64<'a>,
into: Timevector_TSTZ_F64<'b>,
) -> TableIterator<
'a,
(
name!(time, crate::raw::TimestampTz),
name!(value1, Option<f64>),
name!(value2, f64),
),
> {
let mut from = from
.into_iter()
.map(|points| (points.ts.into(), points.val))
.peekable();
let into = into.into_iter().map(|points| (points.ts, points.val));
let (mut from_time, mut from_val) = from.next().unwrap();

let mut results = vec![];
for (into_time, into_val) in into {
// Handle case where into starts before from
if into_time < from_time {
results.push((crate::raw::TimestampTz::from(into_time), None, into_val));
continue;
}

while let Some((peek_time, _)) = from.peek() {
if *peek_time > into_time {
break;
}
(from_time, from_val) = from.next().unwrap();
}

results.push((
crate::raw::TimestampTz::from(into_time),
Some(from_val),
into_val,
));
}

TableIterator::new(results.into_iter())
}

pg_type! {
#[derive(Debug)]
struct AccessorAsof<'input> {
into: Timevector_TSTZ_F64Data<'input>,
}
}

ron_inout_funcs!(AccessorAsof);

#[pg_extern(immutable, parallel_safe, name = "asof")]
pub fn accessor_asof<'a>(tv: Timevector_TSTZ_F64<'a>) -> AccessorAsof<'static> {
unsafe {
flatten! {
AccessorAsof {
into: tv.0
}
}
}
}
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_timevector_asof<'a>(
series: Timevector_TSTZ_F64<'a>,
accessor: toolkit_experimental::AccessorAsof<'a>,
) -> TableIterator<
'a,
(
name!(time, crate::raw::TimestampTz),
name!(value1, Option<f64>),
name!(value2, f64),
),
> {
toolkit_experimental::asof_join(series, accessor.into.clone().into())
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down Expand Up @@ -644,4 +729,81 @@ mod tests {
assert_eq!(tvec, expected);
})
}

#[pg_test]
fn test_asof_join() {
Spi::execute(|client| {
client.select("SET timezone TO 'UTC'", None, None);

let mut result = client.select(
"WITH s as (
SELECT timevector(time, value) AS v1 FROM
(VALUES
('2022-10-1 1:00 UTC'::TIMESTAMPTZ, 20.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 30.0),
('2022-10-1 3:00 UTC'::TIMESTAMPTZ, 40.0)
) as v(time, value)),
t as (
SELECT timevector(time, value) AS v2 FROM
(VALUES
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0),
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0)
) as v(time, value))
SELECT toolkit_experimental.asof_join(v1, v2)::TEXT
FROM s, t;",
None,
None,
);

assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 00:30:00+00\",,15)")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 02:00:00+00\",30,45)")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 03:30:00+00\",40,60)")
);
assert!(result.next().is_none());

let mut result = client.select(
"WITH s as (
SELECT timevector(time, value) AS v1 FROM
(VALUES
('2022-10-1 1:00 UTC'::TIMESTAMPTZ, 20.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 30.0),
('2022-10-1 3:00 UTC'::TIMESTAMPTZ, 40.0)
) as v(time, value)),
t as (
SELECT timevector(time, value) AS v2 FROM
(VALUES
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0),
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0)
) as v(time, value))
SELECT (v1 -> toolkit_experimental.asof(v2))::TEXT
FROM s, t;",
None,
None,
);

assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 00:30:00+00\",,15)")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 02:00:00+00\",30,45)")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(\"2022-10-01 03:30:00+00\",40,60)")
);
assert!(result.next().is_none());
})
}
}

0 comments on commit b6c3c36

Please sign in to comment.