diff --git a/Cargo.toml b/Cargo.toml index 9f24a301f..bc5cf567d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,24 +5,24 @@ authors = ["little-dude "] edition = "2018" [dependencies] -uuid = { version = "0.8.1", features = ["v4", "serde"] } -log = "0.4.8" -futures = "0.3.4" -tokio = { version = "0.2.11", features = ["full"] } -warp = "0.2.1" -derive_more = { version = "0.99.2", default-features = false, features = [ "display", "from" ] } -rand = "0.7.3" -env_logger = "0.7.1" -tarpc = { version = "0.20.0", features = [ "full" ] } -serde = { version = "1.0.104", features = [ "derive" ] } -pyo3 = "0.9.0-alpha.1" -tokio-serde = { version = "0.6.0", features = [ "json" ] } -futures-retry = "0.4.0" -stubborn-io = "0.1.8" -bytes = "0.5.4" -config = "0.10.1" -clap = "2.33.0" -influxdb = "0.0.6" +uuid = { version = "0.8.1", features = ["v4", "serde"] } # Apache-2.0 OR MIT +log = "0.4.8" # Apache-2.0 OR MIT +futures = "0.3.4" # Apache-2.0 OR MIT +tokio = { version = "0.2.11", features = ["full"] } # MIT +warp = "0.2.1" # MIT +derive_more = { version = "0.99.2", default-features = false, features = [ "display", "from" ] } # MIT +rand = "0.7.3" # Apache-2.0 OR MIT +env_logger = "0.7.1" # Apache-2.0 OR MIT +tarpc = { version = "0.20.0", features = [ "full" ] } # MIT +serde = { version = "1.0.104", features = [ "derive" ] } # Apache-2.0 OR MIT +pyo3 = "0.9.0-alpha.1" # Apache-2.0 +tokio-serde = { version = "0.6.0", features = [ "json" ] } # Apache-2.0 OR MIT +futures-retry = "0.4.0" # Apache-2.0 OR MIT +stubborn-io = "0.1.8" # MIT +bytes = "0.5.4" # MIT +config = "0.10.1" # Apache-2.0 OR MIT +clap = "2.33.0" # MIT +influxdb = "0.0.6" # MIT [[bin]] name = "coordinator" diff --git a/src/coordinator/core/protocol.rs b/src/coordinator/core/protocol.rs index fb5e4bd48..85cd818e8 100644 --- a/src/coordinator/core/protocol.rs +++ b/src/coordinator/core/protocol.rs @@ -330,6 +330,10 @@ impl Protocol { pub fn next_event(&mut self) -> Option { self.events.pop_front() } + + pub fn get_current_round(&self) -> u32 { + self.current_round + } } impl FederatedLearningSettings { diff --git a/src/coordinator/core/service.rs b/src/coordinator/core/service.rs index 6755de929..3f4fd0a69 100644 --- a/src/coordinator/core/service.rs +++ b/src/coordinator/core/service.rs @@ -292,6 +292,10 @@ where let metric_store = self.metric_store.clone(); let counter_selected = self.protocol.counters().selected; let counter_waiting = self.protocol.counters().waiting; + let counter_done = self.protocol.counters().done; + let counter_done_inactive = self.protocol.counters().done_and_inactive; + let counter_ignored = self.protocol.counters().ignored; + let current_round = self.protocol.get_current_round(); tokio::spawn(async move { metric_store @@ -299,13 +303,29 @@ where MetricOwner::Coordinator, vec![ ( - String::from("number_of_selected_participants"), + "number_of_selected_participants", Type::SignedInteger(counter_selected.try_into().unwrap()), ), ( - String::from("number_of_waiting_participants"), + "number_of_waiting_participants", Type::SignedInteger(counter_waiting.try_into().unwrap()), ), + ( + "number_of_done_participants", + Type::SignedInteger(counter_done.try_into().unwrap()), + ), + ( + "number_of_done_inactive_participants", + Type::SignedInteger(counter_done_inactive.try_into().unwrap()), + ), + ( + "number_of_ignored_participants", + Type::SignedInteger(counter_ignored.try_into().unwrap()), + ), + ( + "round", + Type::UnsignedInteger(current_round as u64), + ), ], ) .await; diff --git a/src/metric_store/metric_store.rs b/src/metric_store/metric_store.rs index c0536dd85..c7b025020 100644 --- a/src/metric_store/metric_store.rs +++ b/src/metric_store/metric_store.rs @@ -32,7 +32,7 @@ impl InfluxDBMetricStore { } } - pub async fn write(&self, metrics_owner: MetricOwner, fields: Vec<(String, Type)>) -> () { + pub async fn write(&self, metrics_owner: MetricOwner, fields: Vec<(&'static str, Type)>) -> () { let mut write_query: WriteQuery = Query::write_query(Timestamp::Now, metrics_owner.to_string());