Skip to content

Commit

Permalink
resolve issues with configuration change responses
Browse files Browse the repository at this point in the history
Incorporate the fix for #331 and re-structure the code a little.

Framework for statistic reporting is now working reliably with router
configuration changes. Main outstanding work is:
 - Implement normalization
 - Implement real statistics
 - Add support for OTLP configuration
  • Loading branch information
garypen committed Jan 18, 2022
1 parent 63391e4 commit 8a049fa
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 28 deletions.
12 changes: 11 additions & 1 deletion apollo-router/src/apollo_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,21 @@ impl PipelineBuilder {
let provider = provider_builder.build();

let tracer = provider.tracer("apollo-opentelemetry", Some(env!("CARGO_PKG_VERSION")));
let _prev_global_provider = global::set_tracer_provider(provider);
// The call to set_tracer_provider() manipulate a sync RwLock.
// Even though this code is sync, it is called from within an
// async context. If we don't do this in a separate thread,
// it will cause issues with the async runtime that prevents
// the router from working correctly.
let _prev_global_provider = std::thread::spawn(|| {
opentelemetry::global::set_tracer_provider(provider);
})
.join();

Ok(tracer)
}

// XXX CANNOT USE SIMPLE WITH OUR IMPLEMENTATION AS NO RUNTIME EXISTS
// WHEN TRYING TO EXPORT...
/// Install the apollo telemetry exporter pipeline with the recommended defaults.
#[allow(dead_code)]
pub fn install_simple(mut self) -> Result<sdk::trace::Tracer, ApolloError> {
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ mod tests {
}

#[test(tokio::test)]
#[ignore]
async fn config_by_file_watching() {
let (path, mut file) = create_temp_file();
let contents = include_str!("testdata/supergraph_config.yaml");
Expand Down
51 changes: 27 additions & 24 deletions apollo-router/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use futures::channel::mpsc;
use futures::prelude::*;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use usage_agent::server::ReportServer;
use Event::{NoMoreConfiguration, NoMoreSchema, Shutdown};
Expand Down Expand Up @@ -54,7 +53,6 @@ where
http_server_factory: S,
state_listener: Option<mpsc::Sender<State>>,
router_factory: FA,
tx: UnboundedSender<bool>,
phantom: PhantomData<(Router, PreparedQuery)>,
}

Expand Down Expand Up @@ -92,6 +90,19 @@ where
state_listener: Option<mpsc::Sender<State>>,
router_factory: FA,
) -> Self {
Self {
http_server_factory,
state_listener,
router_factory,
phantom: Default::default(),
}
}

pub(crate) async fn process_events(
mut self,
mut messages: impl Stream<Item = Event> + Unpin,
) -> Result<(), FederatedServerError> {
tracing::debug!("Starting");
// Studio Agent Relay listener
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

Expand Down Expand Up @@ -148,20 +159,6 @@ where
tracing::info!("terminating relay loop");
});

Self {
http_server_factory,
state_listener,
router_factory,
tx,
phantom: Default::default(),
}
}

pub(crate) async fn process_events(
mut self,
mut messages: impl Stream<Item = Event> + Unpin,
) -> Result<(), FederatedServerError> {
tracing::debug!("Starting");
let mut state = Startup {
configuration: None,
schema: None,
Expand All @@ -188,10 +185,13 @@ where
}
// Startup: Handle schema updates, maybe transition to running.
(Startup { schema, .. }, UpdateConfiguration(new_configuration)) => {
if new_configuration.studio.is_some() {
self.tx
.send(new_configuration.studio.as_ref().unwrap().external_agent)
.expect("XXX FIX LATER");
match &new_configuration.studio {
Some(v) => {
tx.send(v.external_agent).expect("XXX FIX LATER");
}
None => {
tx.send(false).expect("XXX FIX LATER");
}
}
self.maybe_transition_to_running(Startup {
configuration: Some(*new_configuration),
Expand Down Expand Up @@ -315,10 +315,13 @@ where
}
}
Ok(()) => {
if new_configuration.studio.is_some() {
self.tx
.send(new_configuration.studio.as_ref().unwrap().external_agent)
.expect("XXX FIX LATER");
match &new_configuration.studio {
Some(v) => {
tx.send(v.external_agent).expect("XXX FIX LATER");
}
None => {
tx.send(false).expect("XXX FIX LATER");
}
}
let derived_configuration = Arc::new(derived_configuration);
let router = Arc::new(
Expand Down
12 changes: 9 additions & 3 deletions apollo-router/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,19 @@ pub(crate) fn try_initialize_subscriber(
};
let provider = builder
.with_span_processor(batch)
// .with_simple_exporter(apollo_exporter)
.with_batch_exporter(apollo_exporter, opentelemetry::runtime::Tokio)
.build();

let tracer = provider.tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
let _ = opentelemetry::global::set_tracer_provider(provider);
// The call to set_tracer_provider() manipulate a sync RwLock.
// Even though this code is sync, it is called from within an
// async context. If we don't do this in a separate thread,
// it will cause issues with the async runtime that prevents
// the router from working correctly.
let _ = std::thread::spawn(|| {
opentelemetry::global::set_tracer_provider(provider);
})
.join();

let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);

Expand All @@ -108,7 +115,6 @@ pub(crate) fn try_initialize_subscriber(
// Add studio agent as an OT pipeline
let tracer = match new_pipeline()
.with_studio_config(studio_config)
// .install_simple()
.install_batch()
{
Ok(t) => t,
Expand Down

0 comments on commit 8a049fa

Please sign in to comment.