Skip to content

Commit

Permalink
Enable intproxy logs by default. (#2764)
Browse files Browse the repository at this point in the history
* Enable intproxy logs by default.

* changelog
  • Loading branch information
meowjesty committed Sep 18, 2024
1 parent e713731 commit 53e8d0a
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2750.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Have intproxy log to a file in /tmp by default.
2 changes: 1 addition & 1 deletion mirrord/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ tokio-stream = { workspace = true, features = ["net"] }
tokio-retry = "0.3"
regex.workspace = true
mid = "2.1.0"

rand.workspace = true

[target.'cfg(target_os = "macos")'.dependencies]
mirrord-sip = { path = "../sip" }
Expand Down
59 changes: 39 additions & 20 deletions mirrord/cli/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use std::{
fs::OpenOptions,
io,
net::{Ipv4Addr, SocketAddr},
time::Duration,
path::PathBuf,
time::{Duration, SystemTime},
};

use mirrord_analytics::{AnalyticsReporter, CollectAnalytics, Reporter};
Expand All @@ -27,6 +28,7 @@ use mirrord_intproxy::{
};
use mirrord_protocol::{ClientMessage, DaemonMessage, LogLevel, LogMessage};
use nix::sys::resource::{setrlimit, Resource};
use rand::{distributions::Alphanumeric, Rng};
use tokio::net::TcpListener;
use tracing::{warn, Level};
use tracing_subscriber::EnvFilter;
Expand All @@ -53,25 +55,42 @@ pub(crate) async fn proxy(watch: drain::Watch) -> Result<(), InternalProxyError>

tracing::info!(?config, "internal_proxy starting");

if let Some(log_destination) = config.internal_proxy.log_destination.as_ref() {
let output_file = OpenOptions::new()
.create(true)
.append(true)
.open(log_destination)
.map_err(|e| InternalProxyError::OpenLogFile(log_destination.clone(), e))?;

let tracing_registry = tracing_subscriber::fmt()
.with_writer(output_file)
.with_ansi(false);

if let Some(log_level) = config.internal_proxy.log_level.as_ref() {
tracing_registry
.with_env_filter(EnvFilter::builder().parse_lossy(log_level))
.init();
} else {
tracing_registry.init();
}
}
// Setting up default logging for intproxy.
let log_destination = config
.internal_proxy
.log_destination
.as_ref()
.map(PathBuf::from)
.unwrap_or_else(|| {
let random_name: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
let timestamp = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs();

PathBuf::from(format!("/tmp/mirrord-intproxy-{timestamp}-{random_name}"))
});

let output_file = OpenOptions::new()
.create(true)
.append(true)
.open(&log_destination)
.map_err(|fail| {
InternalProxyError::OpenLogFile(log_destination.to_string_lossy().to_string(), fail)
})?;

let log_level = config
.internal_proxy
.log_level
.as_deref()
.unwrap_or("mirrord=info");

tracing_subscriber::fmt()
.with_writer(output_file)
.with_ansi(false)
.with_env_filter(EnvFilter::builder().parse_lossy(log_level))
.init();

// According to https://wilsonmar.github.io/maximum-limits/ this is the limit on macOS
// so we assume Linux can be higher and set to that.
Expand Down
3 changes: 2 additions & 1 deletion mirrord/intproxy/src/agent_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::{
},
};
use tokio_rustls::TlsConnector;
use tracing::Level;

use crate::{
background_tasks::{BackgroundTask, MessageBus},
Expand Down Expand Up @@ -181,7 +182,7 @@ impl AgentConnection {
Ok(Self { agent_tx, agent_rx })
}

#[tracing::instrument(level = "trace", name = "send_agent_message", skip(self), ret)]
#[tracing::instrument(level = Level::TRACE, name = "send_agent_message", skip(self), ret)]
async fn send(&self, msg: ClientMessage) -> Result<(), AgentChannelError> {
self.agent_tx.send(msg).await.map_err(|_| AgentChannelError)
}
Expand Down
3 changes: 2 additions & 1 deletion mirrord/intproxy/src/layer_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
};
use tracing::Level;

use crate::{
background_tasks::{BackgroundTask, MessageBus},
Expand Down Expand Up @@ -35,7 +36,7 @@ impl LayerConnection {
}
}

#[tracing::instrument(level = "trace", name = "send_layer_message", skip(self), fields(layer_id = self.layer_id.0), ret)]
#[tracing::instrument(level = Level::TRACE, name = "send_layer_message", skip(self), fields(layer_id = self.layer_id.0), ret)]
async fn send_and_flush(
&mut self,
msg: &LocalMessage<ProxyToLayerMessage>,
Expand Down
4 changes: 2 additions & 2 deletions mirrord/intproxy/src/layer_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use mirrord_intproxy_protocol::{
};
use thiserror::Error;
use tokio::net::{TcpListener, TcpStream};
use tracing::info;
use tracing::{info, Level};

use crate::{
background_tasks::{BackgroundTask, MessageBus},
Expand Down Expand Up @@ -43,7 +43,7 @@ impl LayerInitializer {
}

/// Initialize connection with the new layer, assigning fresh [`LayerId`].
#[tracing::instrument(level = "trace" ret)]
#[tracing::instrument(level = Level::TRACE, ret)]
async fn handle_new_stream(
&mut self,
stream: TcpStream,
Expand Down
5 changes: 3 additions & 2 deletions mirrord/intproxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use proxies::{
simple::{SimpleProxy, SimpleProxyMessage},
};
use tokio::{net::TcpListener, time};
use tracing::Level;

use crate::{
agent_conn::AgentConnection, background_tasks::TaskError, error::IntProxyError,
Expand Down Expand Up @@ -255,7 +256,7 @@ impl IntProxy {

/// Routes most messages from the agent to the correct background task.
/// Some messages are handled here.
#[tracing::instrument(level = "trace", skip(self), ret)]
#[tracing::instrument(level = Level::TRACE, skip(self), ret)]
async fn handle_agent_message(&mut self, message: DaemonMessage) -> Result<(), IntProxyError> {
match message {
DaemonMessage::Pong => self.task_txs.ping_pong.send(AgentSentPong).await,
Expand Down Expand Up @@ -332,7 +333,7 @@ impl IntProxy {
}

/// Routes a message from the layer to the correct background task.
#[tracing::instrument(level = "trace", skip(self), ret)]
#[tracing::instrument(level = Level::TRACE, skip(self), ret)]
async fn handle_layer_message(&self, message: FromLayer) -> Result<(), IntProxyError> {
let FromLayer {
message_id,
Expand Down
8 changes: 4 additions & 4 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl IncomingProxy {
const CHANNEL_SIZE: usize = 512;

/// Tries to register the new subscription in the [`SubscriptionsManager`].
#[tracing::instrument(level = "trace", skip(self, message_bus))]
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus))]
async fn handle_port_subscribe(
&mut self,
message_id: MessageId,
Expand All @@ -199,7 +199,7 @@ impl IncomingProxy {
}

/// Tries to unregister the subscription from the [`SubscriptionsManager`].
#[tracing::instrument(level = "trace", skip(self, message_bus))]
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus))]
async fn handle_port_unsubscribe(
&mut self,
layer_id: LayerId,
Expand All @@ -216,7 +216,7 @@ impl IncomingProxy {
/// Retrieves or creates an [`Interceptor`] for the given [`HttpRequestFallback`].
/// The request may or may not belong to an existing connection (when stealing with an http
/// filter, connections are created implicitly).
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = Level::TRACE, skip(self))]
fn get_interceptor_for_http_request(
&mut self,
request: &HttpRequestFallback,
Expand Down Expand Up @@ -259,7 +259,7 @@ impl IncomingProxy {
}

/// Handles all agent messages.
#[tracing::instrument(level = "trace", skip(self, message_bus))]
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus))]
async fn handle_agent_message(
&mut self,
message: DaemonTcp,
Expand Down
3 changes: 2 additions & 1 deletion mirrord/intproxy/src/proxies/incoming/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use mirrord_intproxy_protocol::{
IncomingResponse, LayerId, MessageId, PortSubscribe, PortUnsubscribe, ProxyToLayerMessage,
};
use mirrord_protocol::{BlockedAction, ClientMessage, Port, RemoteResult, ResponseError};
use tracing::Level;

use super::{port_subscription_ext::PortSubscriptionExt, IncomingProxyError};
use crate::{
Expand Down Expand Up @@ -227,7 +228,7 @@ impl SubscriptionsManager {

/// Notifies this struct about agent's response.
/// Returns messages to be sent to the layers.
#[tracing::instrument(level = "trace", ret, skip(self))]
#[tracing::instrument(level = Level::TRACE, ret, skip(self))]
pub fn agent_responded(
&mut self,
result: RemoteResult<Port>,
Expand Down
7 changes: 4 additions & 3 deletions mirrord/intproxy/src/proxies/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use mirrord_protocol::{
ConnectionId, RemoteResult, ResponseError,
};
use thiserror::Error;
use tracing::Level;

use self::interceptor::Interceptor;
use crate::{
Expand Down Expand Up @@ -103,7 +104,7 @@ impl OutgoingProxy {
/// Passes the data to the correct [`Interceptor`] task.
/// Fails when the agent sends an error, because this error cannot be traced back to an exact
/// connection.
#[tracing::instrument(level = "trace", skip(self))]
#[tracing::instrument(level = Level::TRACE, skip(self))]
async fn handle_agent_read(
&mut self,
read: RemoteResult<DaemonRead>,
Expand Down Expand Up @@ -134,7 +135,7 @@ impl OutgoingProxy {
/// Handles agent's response to a connection request.
/// Prepares a local socket and registers a new [`Interceptor`] task for this connection.
/// Replies to the layer's request.
#[tracing::instrument(level = "trace", skip(self, message_bus))]
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus))]
async fn handle_connect_response(
&mut self,
connect: RemoteResult<DaemonConnect>,
Expand Down Expand Up @@ -194,7 +195,7 @@ impl OutgoingProxy {
}

/// Saves the layer's request id and sends the connection request to the agent.
#[tracing::instrument(level = "trace", skip(self, message_bus))]
#[tracing::instrument(level = Level::TRACE, skip(self, message_bus))]
async fn handle_connect_request(
&mut self,
message_id: MessageId,
Expand Down
5 changes: 3 additions & 2 deletions mirrord/intproxy/src/request_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{collections::VecDeque, fmt};

use mirrord_intproxy_protocol::{LayerId, MessageId};
use thiserror::Error;
use tracing::Level;

/// Erorr returned when the proxy attempts to retrieve [`MessageId`] and [`LayerId`] of a request
/// corresponding to a response received from the agent, but the [`RequestQueue`] is empty. This
Expand Down Expand Up @@ -43,13 +44,13 @@ impl fmt::Debug for RequestQueue {

impl RequestQueue {
/// Save the request at the end of this queue.
#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = Level::TRACE)]
pub fn insert(&mut self, message_id: MessageId, layer_id: LayerId) {
self.inner.push_back((message_id, layer_id));
}

/// Retrieve and remove a request from the front of this queue.
#[tracing::instrument(level = "trace")]
#[tracing::instrument(level = Level::TRACE)]
pub fn get(&mut self) -> Result<(MessageId, LayerId), RequestQueueEmpty> {
self.inner.pop_front().ok_or(RequestQueueEmpty)
}
Expand Down

0 comments on commit 53e8d0a

Please sign in to comment.