From 0d994fe09195b17c3460b75f92010483b07ea0d5 Mon Sep 17 00:00:00 2001 From: Jared Wolff Date: Wed, 3 Mar 2021 11:57:06 -0500 Subject: [PATCH] rumqttd: adding optional native-tls support. Readjusting acceptor into the new task as well. That way this loop doesn't get blocked by setting up the connection. --- rumqttd/src/lib.rs | 54 +++++++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/rumqttd/src/lib.rs b/rumqttd/src/lib.rs index 74e94a3d..5e6eeb58 100644 --- a/rumqttd/src/lib.rs +++ b/rumqttd/src/lib.rs @@ -242,7 +242,7 @@ impl Server { } #[cfg(feature = "use-native-tls")] - fn tls(&self) -> Result, Error> { + fn tls(&self) -> Result>, Error> { match ( self.config.pkcs12_path.clone(), self.config.pkcs12_pass.clone(), @@ -268,14 +268,14 @@ impl Server { // Create acceptor let acceptor = TlsAcceptor::from(builder); - Ok(Some(acceptor)) + Ok(Some(Arc::new(acceptor))) } _ => Ok(None), } } #[cfg(feature = "use-rustls")] - fn tls(&self) -> Result, Error> { + fn tls(&self) -> Result>, Error> { let (certs, key) = match self.config.cert_path.clone() { Some(cert) => { // Get certificates @@ -319,7 +319,7 @@ impl Server { server_config.set_single_cert(certs, key)?; let acceptor = TlsAcceptor::from(Arc::new(server_config)); - Ok(Some(acceptor)) + Ok(Some(Arc::new(acceptor))) } async fn start(&self) -> Result<(), Error> { @@ -327,38 +327,52 @@ impl Server { let listener = TcpListener::bind(&addr).await?; let delay = Duration::from_millis(self.config.next_connection_delay_ms); - let mut count = 0; + let mut count: u32 = 0; let config = Arc::new(self.config.connections.clone()); - let acceptor = self.tls()?; let max_incoming_size = config.max_payload_size; + let acceptor = self.tls()?; info!("Waiting for connections on {}. Server = {}", addr, self.id); loop { + // Accept incoming connection let (stream, addr) = listener.accept().await?; - let network = match &acceptor { - Some(acceptor) => { - info!("{}. Accepting TLS connection from: {}", count, addr); - let sock = acceptor.accept(stream).await?; - Network::new(sock, max_incoming_size) - } - None => { - info!("{}. Accepting TCP connection from: {}", count, addr); - Network::new(stream, max_incoming_size) - } - }; - count += 1; + // Router tx needs to be outside + let router_tx = self.router_tx.clone(); + // Acceptor cloned + let acceptor = acceptor.clone(); + + // Cloneconfig let config = config.clone(); - let router_tx = self.router_tx.clone(); - task::spawn(async { + + // Then spawn a new thread to handle the connection + task::spawn(async move { + let network = match acceptor { + Some(acceptor) => { + info!("{}. Accepting TLS connection from: {}", count, addr); + let sock = acceptor.accept(stream).await.unwrap(); + Network::new(sock, max_incoming_size) + } + None => { + info!("{}. Accepting TCP connection from: {}", count, addr); + Network::new(stream, max_incoming_size) + } + }; + + let config = config.clone(); + let connector = Connector::new(config, router_tx); if let Err(e) = connector.new_connection(network).await { error!("Dropping link task!! Result = {:?}", e); } }); + // Increment count + count += 1; + + // Wait a certain amount between connection attempts. time::sleep(delay).await; } }