Skip to content

Commit

Permalink
Merge #524
Browse files Browse the repository at this point in the history
524: Updating Toolkit To Start Using Cargo Fmt r=thatzopoulos a=thatzopoulos

Part 1 of updating toolkit to use cargo fmt. 
Once this is merged, we will need to store the merge commit from this PR in a file in the repo so that we can set up git blame to ignore this commit:

Create file .git-blame-ignore-revs:
```
# Example Commit
864343a
```
You can use git config to set git to always use the ignoreRevsFile in a project with the following command:

```git config blame.ignoreRevsFile .git-blame-ignore-revs```

You could also just pass it in as a flag to git blame but then you have to remember to always include the flag.
```
git blame --ignore-revs-file .git-blame-ignore-revs
```

Co-authored-by: Thomas Hatzopoulos <thomas@timescale.com>
  • Loading branch information
bors[bot] and thatzopoulos authored Sep 12, 2022
2 parents b6baa47 + 11edf5b commit b743334
Show file tree
Hide file tree
Showing 59 changed files with 5,270 additions and 3,811 deletions.
62 changes: 34 additions & 28 deletions crates/asap/src/fft.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@

// based on https://github.com/stanford-futuredata/ASAP/blob/8b39db4bc92590cbe5b44ddace9b7bb1d677248b/ASAP-optimized.js
// orginal copyright notice as follows
//
//
// Free FFT and convolution (JavaScript)
//
//
// Copyright (c) 2014 Project Nayuki
// https://www.nayuki.io/page/free-small-fft-in-multiple-languages
//
//
// (MIT License)
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -29,7 +28,7 @@

use std::f64::consts::PI;

/*
/*
* Computes the discrete Fourier transform (DFT) of the given complex vector, storing the result back into the vector.
* The vector can have any length. This is a wrapper function.
*/
Expand All @@ -38,46 +37,45 @@ pub fn transform(real: &mut [f64], imag: &mut [f64]) {

let n = real.len();
if n == 0 {
}
else if n & (n-1) == 0 { // Is power of 2
} else if n & (n - 1) == 0 {
// Is power of 2
transform_radix2(real, imag);
}
else { // More complicated algorithm for arbitrary sizes
} else {
// More complicated algorithm for arbitrary sizes
transform_bluestein(real, imag);
}
}


/*
/*
* Computes the inverse discrete Fourier transform (IDFT) of the given complex vector, storing the result back into the vector.
* The vector can have any length. This is a wrapper function. This transform does not perform scaling, so the inverse is not a true inverse.
*/
pub fn inverse_transform(real: &mut [f64], imag: &mut [f64]) {
transform(imag, real);
}


/*
/*
* Computes the discrete Fourier transform (DFT) of the given complex vector, storing the result back into the vector.
* The vector's length must be a power of 2. Uses the Cooley-Tukey decimation-in-time radix-2 algorithm.
*/
fn transform_radix2(real: &mut [f64], imag: &mut [f64]) {
// Initialization
let n = real.len();
if n == 1 { // Trivial transform
if n == 1 {
// Trivial transform
return;
}
let mut levels = 100;
for i in 0..32 {
if 1 << i == n {
levels = i; // Equal to log2(n)
levels = i; // Equal to log2(n)
}
}
debug_assert!(levels < 32);

let mut cos_table = vec![0.0; n / 2];
let mut sin_table = vec![0.0; n / 2];
for i in 0..n/2 {
for i in 0..n / 2 {
cos_table[i] = (2.0 * PI * i as f64 / n as f64).cos();
sin_table[i] = (2.0 * PI * i as f64 / n as f64).sin();
}
Expand All @@ -99,9 +97,9 @@ fn transform_radix2(real: &mut [f64], imag: &mut [f64]) {
for i in (0..n).step_by(size) {
let mut j = i;
let mut k = 0;
while j < i + halfsize {
let tpre = real[j+halfsize] * cos_table[k] + imag[j+halfsize] * sin_table[k];
let tpim = -real[j+halfsize] * sin_table[k] + imag[j+halfsize] * cos_table[k];
while j < i + halfsize {
let tpre = real[j + halfsize] * cos_table[k] + imag[j + halfsize] * sin_table[k];
let tpim = -real[j + halfsize] * sin_table[k] + imag[j + halfsize] * cos_table[k];
real[j + halfsize] = real[j] - tpre;
imag[j + halfsize] = imag[j] - tpim;
real[j] += tpre;
Expand Down Expand Up @@ -142,7 +140,7 @@ fn transform_bluestein(real: &mut [f64], imag: &mut [f64]) {
let mut cos_table = vec![0.0; n];
let mut sin_table = vec![0.0; n];
for i in 0..n {
let j = (i * i % (n * 2)) as f64; // This is more accurate than j = i * i
let j = (i * i % (n * 2)) as f64; // This is more accurate than j = i * i
cos_table[i] = (PI * j / n as f64).cos();
sin_table[i] = (PI * j / n as f64).sin();
}
Expand All @@ -151,7 +149,7 @@ fn transform_bluestein(real: &mut [f64], imag: &mut [f64]) {
let mut areal = vec![0.0; m];
let mut aimag = vec![0.0; m];
for i in 0..n {
areal[i] = real[i] * cos_table[i] + imag[i] * sin_table[i];
areal[i] = real[i] * cos_table[i] + imag[i] * sin_table[i];
aimag[i] = -real[i] * sin_table[i] + imag[i] * cos_table[i];
}
for i in n..m {
Expand All @@ -169,24 +167,25 @@ fn transform_bluestein(real: &mut [f64], imag: &mut [f64]) {
bimag[i] = sin_table[i];
bimag[m - i] = sin_table[i];
}
for i in n..=(m-n) {
for i in n..=(m - n) {
breal[i] = 0.0;
bimag[i] = 0.0;
}

// Convolution
let mut creal = vec![0.0; m];
let mut cimag = vec![0.0; m];
convolve_complex(&mut areal, &mut aimag, &mut breal, &mut bimag, &mut creal, &mut cimag);
convolve_complex(
&mut areal, &mut aimag, &mut breal, &mut bimag, &mut creal, &mut cimag,
);

// Postprocessing
for i in 0..n {
real[i] = creal[i] * cos_table[i] + cimag[i] * sin_table[i];
real[i] = creal[i] * cos_table[i] + cimag[i] * sin_table[i];
imag[i] = -creal[i] * sin_table[i] + cimag[i] * cos_table[i];
}
}


// /*
// * Computes the circular convolution of the given real vectors. Each vector's length must be the same.
// */
Expand All @@ -199,11 +198,17 @@ fn transform_bluestein(real: &mut [f64], imag: &mut [f64]) {
// convolve_complex(x, zeros, y, zeros.slice(), out, zeros.slice());
// }


// /*
// * Computes the circular convolution of the given complex vectors. Each vector's length must be the same.
// */
fn convolve_complex(xreal: &mut [f64], ximag: &mut [f64], yreal: &mut [f64], yimag: &mut [f64], outreal: &mut [f64], outimag: &mut [f64]) {
fn convolve_complex(
xreal: &mut [f64],
ximag: &mut [f64],
yreal: &mut [f64],
yimag: &mut [f64],
outreal: &mut [f64],
outimag: &mut [f64],
) {
let n = xreal.len();

transform(xreal, ximag);
Expand All @@ -214,7 +219,8 @@ fn convolve_complex(xreal: &mut [f64], ximag: &mut [f64], yreal: &mut [f64], yim
xreal[i] = temp;
}
inverse_transform(xreal, ximag);
for i in 0..n { // Scaling (because this FFT implementation omits it)
for i in 0..n {
// Scaling (because this FFT implementation omits it)
outreal[i] = xreal[i] / n as f64;
outimag[i] = ximag[i] / n as f64;
}
Expand Down
75 changes: 46 additions & 29 deletions crates/asap/src/lib.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion crates/count-min-sketch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub struct CountMinHashFn {
const SEED: u64 = 0x517cc1b727220a95; // from FxHash

impl CountMinHashFn {

/// Creates a new CountMinHashFn whose hash function key is equal to `key`.
pub fn with_key(key: u64) -> Self {
Self { key }
Expand Down
84 changes: 44 additions & 40 deletions crates/counter-agg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@

use tspoint::TSPoint;
use stats_agg::{XYPair, stats2d::StatsSummary2D};
use serde::{Deserialize, Serialize};
use stats_agg::{stats2d::StatsSummary2D, XYPair};
use std::fmt;

use tspoint::TSPoint;

pub mod range;

#[cfg(test)]
mod tests;

#[derive(Debug, PartialEq, Eq)]
pub enum CounterError{
pub enum CounterError {
OrderError,
BoundsInvalid,
}
Expand Down Expand Up @@ -51,23 +49,23 @@ pub struct MetricSummary {
// you can always subtract a common near value from all your times, then add it back in, the regression analysis will be unchanged.
// Note that convert the timestamp into seconds rather than microseconds here so that the slope and any other regression analysis, is done on a per-second basis.
// For instance, the slope will be the per-second slope, not the per-microsecond slope. The x intercept value will need to be converted back to microseconds so you get a timestamp out.
fn ts_to_xy(pt: TSPoint) -> XYPair{
XYPair{
fn ts_to_xy(pt: TSPoint) -> XYPair {
XYPair {
x: to_seconds(pt.ts as f64),
y: pt.val,
}
}

fn to_seconds(t: f64)-> f64{
fn to_seconds(t: f64) -> f64 {
t / 1_000_000_f64 // by default postgres timestamps have microsecond precision
}

/// MetricSummary tracks monotonically increasing counters that may reset, ie every time the value decreases
/// it is treated as a reset of the counter and the previous value is added to the "true value" of the
/// counter at that timestamp.
impl MetricSummary {
pub fn new(pt: &TSPoint, bounds:Option<range::I64Range>) -> MetricSummary {
let mut n = MetricSummary{
pub fn new(pt: &TSPoint, bounds: Option<range::I64Range>) -> MetricSummary {
let mut n = MetricSummary {
first: *pt,
second: *pt,
penultimate: *pt,
Expand All @@ -83,15 +81,14 @@ impl MetricSummary {
}

fn reset(&mut self, incoming: &TSPoint) {
if incoming.val < self.last.val {
if incoming.val < self.last.val {
self.reset_sum += self.last.val;
self.num_resets += 1;
}
}

// expects time-ordered input
fn add_point(&mut self, incoming: &TSPoint) -> Result<(), CounterError>{

fn add_point(&mut self, incoming: &TSPoint) -> Result<(), CounterError> {
if incoming.ts < self.last.ts {
return Err(CounterError::OrderError);
}
Expand Down Expand Up @@ -143,7 +140,12 @@ impl MetricSummary {
}
let mut stats = incoming.stats;
// have to offset based on our reset_sum, including the amount we added based on any resets that happened at the boundary (but before we add in the incoming reset_sum)
stats.offset(XYPair{x:0.0, y: self.reset_sum}).unwrap();
stats
.offset(XYPair {
x: 0.0,
y: self.reset_sum,
})
.unwrap();
self.last = incoming.last;
self.reset_sum += incoming.reset_sum;
self.num_resets += incoming.num_resets;
Expand All @@ -154,7 +156,7 @@ impl MetricSummary {
Ok(())
}

pub fn time_delta(&self) -> f64{
pub fn time_delta(&self) -> f64 {
to_seconds((self.last.ts - self.first.ts) as f64)
}

Expand Down Expand Up @@ -187,15 +189,15 @@ impl MetricSummary {
}
}

pub fn irate_left(&self) -> Option<f64>{
if self.single_value(){
pub fn irate_left(&self) -> Option<f64> {
if self.single_value() {
None
} else {
Some(self.idelta_left() / to_seconds((self.second.ts - self.first.ts) as f64))
}
}

pub fn irate_right(&self) -> Option<f64>{
pub fn irate_right(&self) -> Option<f64> {
if self.single_value() {
None
} else {
Expand All @@ -204,16 +206,16 @@ impl MetricSummary {
}

pub fn bounds_valid(&self) -> bool {
match self.bounds{
None => true, // unbounded contains everything
Some(b) => b.contains(self.last.ts) && b.contains(self.first.ts)
match self.bounds {
None => true, // unbounded contains everything
Some(b) => b.contains(self.last.ts) && b.contains(self.first.ts),
}
}

fn bounds_extend(&mut self, in_bounds:Option<range::I64Range>){
fn bounds_extend(&mut self, in_bounds: Option<range::I64Range>) {
match (self.bounds, in_bounds) {
(None, _) => {self.bounds = in_bounds},
(_, None) => {},
(None, _) => self.bounds = in_bounds,
(_, None) => {}
(Some(mut a), Some(b)) => {
a.extend(&b);
self.bounds = Some(a);
Expand All @@ -223,23 +225,26 @@ impl MetricSummary {

// based on: https://github.com/timescale/promscale_extension/blob/d51a0958442f66cb78d38b584a10100f0d278298/src/lib.rs#L208,
// which is based on: // https://github.com/prometheus/prometheus/blob/e5ffa8c9a08a5ee4185271c8c26051ddc1388b7a/promql/functions.go#L59
pub fn prometheus_delta(&self) -> Result<Option<f64>, CounterError>{
if self.bounds.is_none() || !self.bounds_valid() || self.bounds.unwrap().has_infinite() {
pub fn prometheus_delta(&self) -> Result<Option<f64>, CounterError> {
if self.bounds.is_none() || !self.bounds_valid() || self.bounds.unwrap().has_infinite() {
return Err(CounterError::BoundsInvalid);
}
//must have at least 2 values
if self.single_value() || self.bounds.unwrap().is_singleton() { //technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point)
if self.single_value() || self.bounds.unwrap().is_singleton() {
//technically, the is_singleton check is redundant, it's included for clarity (any singleton bound that is valid can only be one point)
return Ok(None);
}

let mut result_val = self.delta();

// all calculated durations in seconds in Prom implementation, so we'll do that here.
// we can unwrap all of the bounds accesses as they are guaranteed to be there from the checks above
let mut duration_to_start = to_seconds((self.first.ts - self.bounds.unwrap().left.unwrap()) as f64);
let mut duration_to_start =
to_seconds((self.first.ts - self.bounds.unwrap().left.unwrap()) as f64);

/* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. Subtract an extra ms, ours is in microseconds. */
let duration_to_end = to_seconds((self.bounds.unwrap().right.unwrap() - self.last.ts - 1_000) as f64);
let duration_to_end =
to_seconds((self.bounds.unwrap().right.unwrap() - self.last.ts - 1_000) as f64);
let sampled_interval = self.time_delta();
let avg_duration_between_samples = sampled_interval / (self.stats.n - 1) as f64; // don't have to worry about divide by zero because we know we have at least 2 values from the above.

Expand Down Expand Up @@ -276,15 +281,14 @@ impl MetricSummary {
Ok(Some(result_val))
}

pub fn prometheus_rate(&self) -> Result<Option<f64>, CounterError>{
let delta = self.prometheus_delta()?;
pub fn prometheus_rate(&self) -> Result<Option<f64>, CounterError> {
let delta = self.prometheus_delta()?;
if delta.is_none() {
return Ok(None);
}
let delta = delta.unwrap();
let bounds = self.bounds.unwrap() ; // if we got through delta without error then we have bounds
/* bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. So subtract an extra ms from the duration*/
let duration = bounds.duration().unwrap() - 1_000;
let bounds = self.bounds.unwrap(); // if we got through delta without error then we have bounds
let duration = bounds.duration().unwrap() - 1_000; // bounds stores [L,H), but Prom takes the duration using the inclusive range [L, H-1ms]. So subtract an extra ms from the duration
if duration <= 0 {
return Ok(None); // if we have a total duration under a ms, it's less than prom could deal with so we return none.
}
Expand All @@ -293,13 +297,13 @@ impl MetricSummary {
}

impl fmt::Display for CounterError {
fn fmt(&self, f: &mut fmt::Formatter<'_>)
-> Result<(), fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
CounterError::OrderError =>
write!(f, "out of order points: points must be submitted in time-order"),
CounterError::BoundsInvalid =>
write!(f, "cannot calculate delta without valid bounds"),
CounterError::OrderError => write!(
f,
"out of order points: points must be submitted in time-order"
),
CounterError::BoundsInvalid => write!(f, "cannot calculate delta without valid bounds"),
}
}
}
Expand Down
Loading

0 comments on commit b743334

Please sign in to comment.