Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Upgrade InfluxDB to 0.1.0 #340

Merged
merged 7 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ tracing-subscriber = "0.2.3"
tracing-log = "0.1.1"
tracing-futures = "0.2.3"

influxdb = { version = "0.0.6", optional = true }
influxdb = { version = "0.1.0", features = ["derive"], optional = true }
chrono = { version = "0.4", features = ["serde"], optional = true }
opentelemetry = { version = "0.2.0", optional = true }
tracing-opentelemetry = { version = "0.2.0", optional = true }
opentelemetry-jaeger = { version = "0.1.0", optional = true }
Expand All @@ -48,5 +49,5 @@ path = "src/bin/aggregator.rs"
[features]
default = []
telemetry = [ "opentelemetry", "tracing-opentelemetry", "opentelemetry-jaeger"]
influx_metrics = [ "influxdb" ]
influx_metrics = [ "influxdb" , "chrono"]
all = [ "telemetry", "influx_metrics"]
4 changes: 2 additions & 2 deletions rust/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing_futures::Instrument;

#[cfg(feature = "influx_metrics")]
use xain_fl::{
common::metric_store::influxdb::{run_metricstore, InfluxDBMetricStore},
common::metric_store::influxdb::{run_metricstore, InfluxDBConnector},
little-dude marked this conversation as resolved.
Show resolved Hide resolved
coordinator::settings::MetricStoreSettings,
};

Expand Down Expand Up @@ -100,7 +100,7 @@ async fn _main(
#[cfg(feature = "influx_metrics")]
let metric_sender = if let Some(metric_store) = metric_store {
// Start the metric store
let (influx_client, metric_sender) = InfluxDBMetricStore::new(
let (influx_client, metric_sender) = InfluxDBConnector::new(
&metric_store.database_url[..],
&metric_store.database_name[..],
);
Expand Down
103 changes: 72 additions & 31 deletions rust/src/common/metric_store/influxdb.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,84 @@
use influxdb::{Client, Query, Timestamp, Type, WriteQuery};
use chrono::{DateTime, Utc};
use influxdb::{Client, InfluxDbWriteable, Timestamp, WriteQuery};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

pub enum MetricOwner {
Coordinator,
Participant,
pub enum Measurement {
Round(RoundMeasurement),
Counters(CountersMeasurement),
}

impl From<&MetricOwner> for &'static str {
fn from(metric_owner: &MetricOwner) -> &'static str {
match metric_owner {
MetricOwner::Coordinator => "coordinator",
MetricOwner::Participant => "participant",
impl InfluxDbWriteable for Measurement {
fn into_query<I: Into<String>>(self, name: I) -> WriteQuery {
match self {
Self::Round(round) => round.into_query(name),
Self::Counters(counters) => counters.into_query(name),
}
}
}

impl ToString for MetricOwner {
fn to_string(&self) -> String {
Into::<&str>::into(self).into()
#[derive(InfluxDbWriteable)]
pub struct RoundMeasurement {
time: DateTime<Utc>,
round: u32,
}

impl RoundMeasurement {
pub fn new(round: u32) -> RoundMeasurement {
RoundMeasurement {
time: Timestamp::Now.into(),
round,
}
}
}

pub struct Metric(pub MetricOwner, pub Vec<(&'static str, Type)>);
impl From<RoundMeasurement> for Measurement {
fn from(value: RoundMeasurement) -> Self {
Self::Round(value)
}
}

pub struct InfluxDBMetricStore {
client: Client,
receiver: UnboundedReceiver<Metric>,
#[derive(InfluxDbWriteable)]
pub struct CountersMeasurement {
time: DateTime<Utc>,
number_of_selected_participants: u32,
number_of_waiting_participants: u32,
number_of_done_participants: u32,
number_of_done_inactive_participants: u32,
number_of_ignored_participants: u32,
}

pub async fn run_metricstore(mut influx_client: InfluxDBMetricStore) {
loop {
match influx_client.receiver.recv().await {
Some(Metric(metric_owner, fields)) => {
let mut write_query: WriteQuery =
Query::write_query(Timestamp::Now, metric_owner.to_string());
impl CountersMeasurement {
pub fn new(
number_of_selected_participants: u32,
number_of_waiting_participants: u32,
number_of_done_participants: u32,
number_of_done_inactive_participants: u32,
number_of_ignored_participants: u32,
) -> CountersMeasurement {
CountersMeasurement {
time: Timestamp::Now.into(),
number_of_selected_participants,
number_of_waiting_participants,
number_of_done_participants,
number_of_done_inactive_participants,
number_of_ignored_participants,
}
}
}

for (name, value) in fields {
write_query = write_query.add_field(name, value);
}
impl From<CountersMeasurement> for Measurement {
fn from(value: CountersMeasurement) -> Self {
Self::Counters(value)
}
}

// Submit the query to InfluxDB.
let _ = influx_client
pub async fn run_metricstore(mut influxdb_connector: InfluxDBConnector) {
loop {
match influxdb_connector.receiver.recv().await {
Some(measurement) => {
let _ = influxdb_connector
.client
.query(&write_query)
.query(&measurement.into_query("coordinator"))
.await
.map_err(|e| error!("{}", e));
}
Expand All @@ -54,11 +90,16 @@ pub async fn run_metricstore(mut influx_client: InfluxDBMetricStore) {
}
}

impl InfluxDBMetricStore {
pub fn new(host: &str, db_name: &str) -> (InfluxDBMetricStore, UnboundedSender<Metric>) {
pub struct InfluxDBConnector {
client: Client,
receiver: UnboundedReceiver<Measurement>,
}

impl InfluxDBConnector {
pub fn new(host: &str, db_name: &str) -> (InfluxDBConnector, UnboundedSender<Measurement>) {
let (sender, receiver) = unbounded_channel();
(
InfluxDBMetricStore {
InfluxDBConnector {
client: Client::new(host, db_name),
receiver,
},
Expand Down
48 changes: 14 additions & 34 deletions rust/src/coordinator/core/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "influx_metrics")]
use crate::common::metric_store::influxdb::{Metric, MetricOwner};
use crate::common::metric_store::influxdb::{CountersMeasurement, Measurement, RoundMeasurement};
use crate::{
aggregator,
common::client::{ClientId, Credentials, Token},
Expand All @@ -14,8 +14,6 @@ use crate::{
};
use derive_more::From;
use futures::{ready, stream::Stream};
#[cfg(feature = "influx_metrics")]
use influxdb::Type;
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -100,7 +98,7 @@ where

#[cfg(feature = "influx_metrics")]
///Metric Store
metrics_tx: Option<UnboundedSender<Metric>>,
metrics_tx: Option<UnboundedSender<Measurement>>,
}

impl<S> Service<S>
Expand All @@ -113,7 +111,7 @@ where
aggregator_url: String,
rpc_client: aggregator::rpc::Client,
requests: ServiceRequests,
#[cfg(feature = "influx_metrics")] metrics_tx: Option<UnboundedSender<Metric>>,
#[cfg(feature = "influx_metrics")] metrics_tx: Option<UnboundedSender<Measurement>>,
) -> Self {
let (heartbeat_expirations_tx, heartbeat_expirations_rx) = unbounded_channel();

Expand Down Expand Up @@ -416,41 +414,23 @@ where
#[cfg(feature = "influx_metrics")]
fn write_counter_metrics(&self) {
self.metrics_tx.as_ref().map(|tx| {
let _ = tx.send(Metric(
little-dude marked this conversation as resolved.
Show resolved Hide resolved
MetricOwner::Coordinator,
vec![
(
"number_of_selected_participants",
Type::SignedInteger(self.protocol.counters().selected as i64),
),
(
"number_of_waiting_participants",
Type::SignedInteger(self.protocol.counters().waiting as i64),
),
(
"number_of_done_participants",
Type::SignedInteger(self.protocol.counters().done as i64),
),
(
"number_of_done_inactive_participants",
Type::SignedInteger(self.protocol.counters().done_and_inactive as i64),
),
(
"number_of_ignored_participants",
Type::SignedInteger(self.protocol.counters().ignored as i64),
),
],
));
let _ = tx.send(
CountersMeasurement::new(
self.protocol.counters().selected,
self.protocol.counters().waiting,
self.protocol.counters().done,
self.protocol.counters().done_and_inactive,
self.protocol.counters().ignored,
)
.into(),
);
});
}

#[cfg(feature = "influx_metrics")]
fn write_round_metric(&self, round: u32) {
self.metrics_tx.as_ref().map(|tx| {
let _ = tx.send(Metric(
MetricOwner::Coordinator,
vec![("round", Type::UnsignedInteger(round as u64))],
));
let _ = tx.send(RoundMeasurement::new(round).into());
});
}

Expand Down