Skip to content

Commit

Permalink
Merge #590
Browse files Browse the repository at this point in the history
590: Adding min_n and max_n aggregates r=WireBaron a=WireBaron

This adds new aggregates `min_n` and `max_n` for getting the n largest or smallest values from a column.  It will work with `integer`, `float`, and `timestamptz` values.  These functions will return an aggregate object which can be combined with other such objects via `rollup`.  The data can be extracted from the aggregate via `into_array` or `into_values` methods, which return either an array of the values, or a table containing them.

It further adds `min_n_by` and `max_n_by` functions that take one of the above types plus an associated piece of data (takes this as an `AnyElement`, so can be any type).  This will behave the same as the `min_n`/`max_n` above, but will also return the associated data for the smallest or largest elements.  `into_array` is not implemented for these aggregates, as it's not clear what that array would look like, and `into_values` will require a value of the appropriate type as an input to allow postgres to determine the function output (suggested approach is to just cast a `NULL` to the type of the associated data).

Fixes #511 


Co-authored-by: Brian Rowe <brian@timescale.com>
  • Loading branch information
bors[bot] and Brian Rowe authored Oct 29, 2022
2 parents cbbca7b + 9fe5b8f commit 6791f1b
Show file tree
Hide file tree
Showing 18 changed files with 2,936 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ This changelog should be updated as part of a PR if the work is worth noting (mo

#### New experimental features

- New min_n/max_n functions and related min_n_by/max_n_by. The former is used to get the top N values from a column while the later will also track some additional data, such as another column or even the entire row. These should give the same results as a normal select with an order by and limit, except they can be composed and combined like other toolkit aggregates.

#### Stabilized features

#### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions extension/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ approx = {version = "0.4.0", optional = true}
bincode = "1.3.1"
serde = { version = "1.0", features = ["derive"] }
once_cell = "1.8.0"
ordered-float = {version = "1.0", features = ["serde"] }
paste = "1.0"
rand = { version = "0.8.3", features = ["getrandom", "small_rng"] }
rand_distr = "0.4.0"
Expand Down
14 changes: 14 additions & 0 deletions extension/src/datum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ pub(crate) unsafe fn deep_copy_datum(datum: Datum, typoid: Oid) -> Datum {
}
}

// If datum is an alloced type, free the associated memory
pub(crate) unsafe fn free_datum(datum: Datum, typoid: Oid) {
let tentry = pg_sys::lookup_type_cache(typoid, 0_i32);
if !(*tentry).typbyval {
pg_sys::pfree(datum.cast_mut_ptr())
}
}

// TODO: is there a better place for this?
// Note that this requires an reference time to deal with variable length intervals (days or months)
pub fn interval_to_ms(ref_time: &crate::raw::TimestampTz, interval: &crate::raw::Interval) -> i64 {
Expand Down Expand Up @@ -513,6 +521,12 @@ impl<'a> DatumStore<'a> {
}
}
}

pub fn into_anyelement_iter(self) -> impl Iterator<Item = AnyElement> + 'a {
let oid: pg_sys::Oid = self.type_oid.into();
self.into_iter()
.map(move |x| unsafe { AnyElement::from_polymorphic_datum(x, false, oid) }.unwrap())
}
}

// This is essentially the same as the DatumStoreIterator except that it takes ownership of the DatumStore,
Expand Down
1 change: 1 addition & 0 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod frequency;
pub mod gauge_agg;
pub mod hyperloglog;
pub mod lttb;
pub mod nmost;
pub mod ohlc;
pub mod range;
pub mod saturation;
Expand Down
264 changes: 264 additions & 0 deletions extension/src/nmost.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use pgx::*;

use serde::{Deserialize, Serialize};

use crate::{
aggregate_utils::in_aggregate_context,
datum_utils::{deep_copy_datum, free_datum, DatumStore},
palloc::{Inner, Internal, InternalAsValue},
};

use std::collections::BinaryHeap;

mod max_float;
mod max_int;
mod max_time;
mod min_float;
mod min_int;
mod min_time;

mod max_by_float;
mod max_by_int;
mod max_by_time;
mod min_by_float;
mod min_by_int;
mod min_by_time;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NMostTransState<T: Ord> {
capacity: usize,
heap: BinaryHeap<T>,
}

impl<T: Ord> NMostTransState<T> {
fn new(capacity: usize, first_val: T) -> NMostTransState<T> {
let mut new_heap = NMostTransState {
capacity,
heap: BinaryHeap::with_capacity(capacity),
};

new_heap.new_entry(first_val);

new_heap
}

fn new_entry(&mut self, new_val: T) {
// If at capacity see if we need to replace something
if self.heap.len() == self.capacity {
if !self.belongs_in_heap(&new_val) {
return;
}

self.heap.pop();
}

self.heap.push(new_val)
}

fn belongs_in_heap(&self, val: &T) -> bool {
// Note that this will actually be '>' if T is a Reverse<...> type
val < self.heap.peek().unwrap()
}
}

impl<T: Ord + Copy> From<(&[T], usize)> for NMostTransState<T> {
fn from(input: (&[T], usize)) -> Self {
let (vals, capacity) = input;
let mut state = Self::new(capacity, vals[0]);
for val in vals[1..].iter() {
state.new_entry(*val);
}
state
}
}

fn nmost_trans_function<T: Ord>(
state: Option<Inner<NMostTransState<T>>>,
val: T,
capacity: usize,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<NMostTransState<T>>> {
unsafe {
in_aggregate_context(fcinfo, || {
if state.is_none() {
return Internal::new(NMostTransState::<T>::new(capacity, val)).to_inner();
}

let mut state = state.unwrap();
state.new_entry(val);
Some(state)
})
}
}

fn nmost_rollup_trans_function<T: Ord + Copy>(
state: Option<Inner<NMostTransState<T>>>,
sorted_vals: &[T],
capacity: usize,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<NMostTransState<T>>> {
unsafe {
in_aggregate_context(fcinfo, || {
if let Some(mut state) = state {
for val in sorted_vals {
// The values are sorted, so as soon as we find one that shouldn't be added, we're done
if !state.belongs_in_heap(val) {
return Some(state);
}
state.new_entry(*val);
}

Some(state)
} else {
Internal::new::<NMostTransState<T>>((sorted_vals, capacity).into()).to_inner()
}
})
}
}

fn nmost_trans_combine<T: Clone + Ord + Copy>(
first: Option<Inner<NMostTransState<T>>>,
second: Option<Inner<NMostTransState<T>>>,
) -> Option<Inner<NMostTransState<T>>> {
match (first, second) {
(None, None) => None,
(None, Some(only)) | (Some(only), None) => unsafe {
Internal::new(only.clone()).to_inner()
},
(Some(a), Some(b)) => {
let mut a = a.clone();
// This could be made more efficient by iterating in the appropriate order with an early exit, but would requiring ordering the other heap
for entry in b.heap.iter() {
a.new_entry(*entry);
}
unsafe { Internal::new(a).to_inner() }
}
}
}

// TODO: serialize and deserialize will need to be implemented with Datum handling code
#[derive(Clone, Debug)]
pub struct NMostByTransState<T: Ord> {
values: NMostTransState<(T, usize)>,
data: Vec<Datum>,
oid: pg_sys::Oid,
}

impl<T: Clone + Ord> NMostByTransState<T> {
fn new(capacity: usize, first_val: T, first_element: pgx::AnyElement) -> NMostByTransState<T> {
// first entry will always have index 0
let first_val = (first_val, 0);
NMostByTransState {
values: NMostTransState::new(capacity, first_val),
data: vec![unsafe { deep_copy_datum(first_element.datum(), first_element.oid()) }],
oid: first_element.oid(),
}
}

fn new_entry(&mut self, new_val: T, new_element: pgx::AnyElement) {
assert!(new_element.oid() == self.oid);
if self.data.len() < self.values.capacity {
// Not yet full, easy case
self.values.new_entry((new_val, self.data.len()));
self.data
.push(unsafe { deep_copy_datum(new_element.datum(), new_element.oid()) });
} else if self
.values
.belongs_in_heap(&(new_val.clone(), self.data.len()))
{
// Full and value belongs in the heap (using len() for this check just keeps us from
// succeeding if we tie the max heap element)

let (_, index_to_replace) = *self
.values
.heap
.peek()
.expect("Can't be empty in this case");
let old_datum = std::mem::replace(&mut self.data[index_to_replace], unsafe {
deep_copy_datum(new_element.datum(), new_element.oid())
});
unsafe { free_datum(old_datum, new_element.oid()) };
self.values.new_entry((new_val, index_to_replace));
}
}

// Sort the trans state and break it into a tuple of (capacity, values array, datum_store)
fn into_sorted_parts(self) -> (usize, Vec<T>, DatumStore<'static>) {
let values = self.values;
let heap = values.heap;
let (val_ary, idx_ary): (Vec<T>, Vec<usize>) = heap.into_sorted_vec().into_iter().unzip();

let mut mapped_data = vec![];
for i in idx_ary {
mapped_data.push(self.data[i]);
}

(
values.capacity,
val_ary,
DatumStore::from((self.oid, mapped_data)),
)
}
}

impl<T: Ord + Copy> From<(&[T], &DatumStore<'_>, usize)> for NMostByTransState<T> {
fn from(in_tuple: (&[T], &DatumStore, usize)) -> Self {
let (vals, data, capacity) = in_tuple;
let mut elements = data.clone().into_anyelement_iter();
let mut state = Self::new(capacity, vals[0], elements.next().unwrap());
for val in vals[1..].iter() {
state.new_entry(*val, elements.next().unwrap());
}
state
}
}

fn nmost_by_trans_function<T: Ord + Clone>(
state: Option<Inner<NMostByTransState<T>>>,
val: T,
data: pgx::AnyElement,
capacity: usize,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<NMostByTransState<T>>> {
unsafe {
in_aggregate_context(fcinfo, || {
if state.is_none() {
return Internal::new(NMostByTransState::<T>::new(capacity, val, data)).to_inner();
}

let mut state = state.unwrap();
state.new_entry(val, data);
Some(state)
})
}
}

fn nmost_by_rollup_trans_function<T: Ord + Copy>(
state: Option<Inner<NMostByTransState<T>>>,
sorted_vals: &[T],
datum_store: &DatumStore,
capacity: usize,
fcinfo: pg_sys::FunctionCallInfo,
) -> Option<Inner<NMostByTransState<T>>> {
unsafe {
in_aggregate_context(fcinfo, || {
if let Some(mut state) = state {
for (val, element) in sorted_vals
.iter()
.zip(datum_store.clone().into_anyelement_iter())
{
// The values are sorted, so as soon as we find one that shouldn't be added, we're done
if !state.values.belongs_in_heap(&(*val, state.values.capacity)) {
return Some(state);
}
state.new_entry(*val, element);
}

Some(state)
} else {
Internal::new::<NMostByTransState<T>>((sorted_vals, datum_store, capacity).into())
.to_inner()
}
})
}
}
Loading

0 comments on commit 6791f1b

Please sign in to comment.