Skip to content

Commit

Permalink
rumqttd: adding optional native-tls support.
Browse files Browse the repository at this point in the history
Readjusting acceptor into the new task as well. That way this loop doesn't get blocked by
setting up the connection.
  • Loading branch information
jaredwolff committed Mar 4, 2021
1 parent fa4beab commit 0d994fe
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions rumqttd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Server {
}

#[cfg(feature = "use-native-tls")]
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
fn tls(&self) -> Result<Option<Arc<TlsAcceptor>>, Error> {
match (
self.config.pkcs12_path.clone(),
self.config.pkcs12_pass.clone(),
Expand All @@ -268,14 +268,14 @@ impl Server {

// Create acceptor
let acceptor = TlsAcceptor::from(builder);
Ok(Some(acceptor))
Ok(Some(Arc::new(acceptor)))
}
_ => Ok(None),
}
}

#[cfg(feature = "use-rustls")]
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
fn tls(&self) -> Result<Option<Arc<TlsAcceptor>>, Error> {
let (certs, key) = match self.config.cert_path.clone() {
Some(cert) => {
// Get certificates
Expand Down Expand Up @@ -319,46 +319,60 @@ impl Server {

server_config.set_single_cert(certs, key)?;
let acceptor = TlsAcceptor::from(Arc::new(server_config));
Ok(Some(acceptor))
Ok(Some(Arc::new(acceptor)))
}

async fn start(&self) -> Result<(), Error> {
let addr = format!("0.0.0.0:{}", self.config.port);

let listener = TcpListener::bind(&addr).await?;
let delay = Duration::from_millis(self.config.next_connection_delay_ms);
let mut count = 0;
let mut count: u32 = 0;

let config = Arc::new(self.config.connections.clone());
let acceptor = self.tls()?;
let max_incoming_size = config.max_payload_size;
let acceptor = self.tls()?;

info!("Waiting for connections on {}. Server = {}", addr, self.id);
loop {
// Accept incoming connection
let (stream, addr) = listener.accept().await?;
let network = match &acceptor {
Some(acceptor) => {
info!("{}. Accepting TLS connection from: {}", count, addr);
let sock = acceptor.accept(stream).await?;
Network::new(sock, max_incoming_size)
}
None => {
info!("{}. Accepting TCP connection from: {}", count, addr);
Network::new(stream, max_incoming_size)
}
};

count += 1;
// Router tx needs to be outside
let router_tx = self.router_tx.clone();

// Acceptor cloned
let acceptor = acceptor.clone();

// Cloneconfig
let config = config.clone();
let router_tx = self.router_tx.clone();
task::spawn(async {

// Then spawn a new thread to handle the connection
task::spawn(async move {
let network = match acceptor {
Some(acceptor) => {
info!("{}. Accepting TLS connection from: {}", count, addr);
let sock = acceptor.accept(stream).await.unwrap();
Network::new(sock, max_incoming_size)
}
None => {
info!("{}. Accepting TCP connection from: {}", count, addr);
Network::new(stream, max_incoming_size)
}
};

let config = config.clone();

let connector = Connector::new(config, router_tx);
if let Err(e) = connector.new_connection(network).await {
error!("Dropping link task!! Result = {:?}", e);
}
});

// Increment count
count += 1;

// Wait a certain amount between connection attempts.
time::sleep(delay).await;
}
}
Expand Down

0 comments on commit 0d994fe

Please sign in to comment.