Skip to content

Commit

Permalink
feat(rust): fixed tls tcp outlets and kafka outlets
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed May 30, 2024
1 parent aae3c41 commit bd88c5d
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 27 deletions.
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ mockall = "0.12"
multimap = "0.10.0"
ockam_macros = { path = "../ockam_macros", features = ["std"] }
ockam_transport_core = { path = "../ockam_transport_core" }
ockam_transport_tcp = { path = "../ockam_transport_tcp" }
ockam_transport_tcp = { path = "../ockam_transport_tcp", default-features = false }
once_cell = { version = "1", default-features = false }
opentelemetry_sdk = { version = "0.22.1", features = ["logs", "metrics", "trace", "rt-tokio", "testing"], default-features = false }
pretty_assertions = "1.4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ockam_core::{Address, Result};
use ockam_node::Context;
use ockam_transport_tcp::HostnamePort;
use std::net::SocketAddr;
use std::str::FromStr;

type BrokerId = i32;

Expand Down Expand Up @@ -52,14 +53,14 @@ impl KafkaOutletController {
&self,
context: &Context,
broker_id: BrokerId,
socket_addr: SocketAddr,
address: String,
) -> Result<Address> {
let outlet_address = kafka_outlet_address(broker_id);
let mut inner = self.inner.lock().await;
if !inner.broker_map.contains_key(&broker_id) {
let socket_address = Self::request_outlet_creation(
context,
socket_addr,
address,
kafka_outlet_address(broker_id),
self.policy_expression.clone(),
self.tls,
Expand All @@ -72,12 +73,12 @@ impl KafkaOutletController {

async fn request_outlet_creation(
context: &Context,
socket_address: SocketAddr,
kafka_address: String,
worker_address: Address,
policy_expression: Option<PolicyExpression>,
tls: bool,
) -> Result<SocketAddr> {
let hostname_port = HostnamePort::from_socket_addr(socket_address)?;
let hostname_port = HostnamePort::from_str(&kafka_address)?;
let mut payload = CreateOutlet::new(hostname_port, tls, Some(worker_address), false);
if let Some(expr) = policy_expression {
payload.set_policy_expression(expr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use ockam_node::Context;
use std::convert::TryFrom;
use std::io::{Error, ErrorKind};

use ockam_core::errcode::{Kind, Origin};
use ockam_core::flow_control::FlowControlId;
use tinyvec::alloc;
use tokio::net::lookup_host;
use tracing::warn;

use crate::kafka::portal_worker::InterceptError;
Expand Down Expand Up @@ -157,21 +155,9 @@ impl KafkaMessageInterceptor for OutletInterceptorImpl {

for (broker_id, metadata) in response.brokers {
let address = format!("{}:{}", metadata.host.as_str(), metadata.port);
let socket_addr = lookup_host(&address)
.await
.ok()
.and_then(|mut i| i.next())
.ok_or_else(|| {
InterceptError::Ockam(ockam_core::Error::new(
Origin::Ockam,
Kind::Invalid,
format!("cannot resolve broker {broker_id:?} address {address}"),
))
})?;

let outlet_address = self
.outlet_controller
.assert_outlet_for_broker(context, broker_id.0, socket_addr)
.assert_outlet_for_broker(context, broker_id.0, address)
.await
.map_err(InterceptError::Ockam)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ impl NodeManager {
.generate_worker_addr(worker_addr)
.await;

let socket_addr = hostname_port.to_socket_addr()?;
info!(
"Handling request to create outlet portal at {:?} with worker {:?}",
&socket_addr, worker_addr
"Handling request to create outlet portal at {}:{} with worker {:?}",
&hostname_port.hostname(),
hostname_port.port(),
worker_addr
);

// Check registry for a duplicated key
Expand Down Expand Up @@ -249,6 +250,7 @@ impl NodeManager {
}
};

let socket_addr = hostname_port.to_socket_addr()?;
let res = self
.tcp_transport
.create_tcp_outlet(worker_addr.clone(), hostname_port, options)
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_command/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ r3bl_tui = "0.5.3"
rand = "0.8"
regex = "1.10.4"
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-native-roots", "blocking"] }
rustls = "0.23.5"
rustls = { version = "0.23.5", default-features = false }
rustls-native-certs = "0.7.0"
rustls-pki-types = "1.7.0"
semver = "1.0.23"
Expand Down Expand Up @@ -114,5 +114,5 @@ time = { version = "0.3", default-features = false, features = ["std", "local-of
[features]
default = ["orchestrator", "rust-crypto"]
orchestrator = []
aws-lc = ["ockam_vault/aws-lc", "ockam_transport_tcp/aws-lc", "ockam_api/aws-lc"]
rust-crypto = ["ockam_vault/rust-crypto", "ockam_transport_tcp/ring", "ockam_api/rust-crypto"]
aws-lc = ["ockam_vault/aws-lc", "ockam_transport_tcp/aws-lc", "ockam_api/aws-lc", "rustls/aws-lc-rs"]
rust-crypto = ["ockam_vault/rust-crypto", "ockam_transport_tcp/ring", "ockam_api/rust-crypto", "rustls/ring"]
12 changes: 12 additions & 0 deletions implementations/rust/ockam/ockam_command/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ impl OckamCommand {
}));
let options = CommandGlobalOpts::new(&arguments, &self.global_args, &self.subcommand)?;

// Setup the default rustls crypto provider, this is a required step when
// multiple backends ring/aws-lc are pulled in directly, or indirectly.
#[cfg(feature = "aws-lc")]
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to install aws-lc crypto provider");

#[cfg(all(feature = "rust-crypto", not(feature = "aws-lc")))]
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install ring crypto provider");

if let Err(err) = check_if_an_upgrade_is_available(&options) {
warn!("Failed to check for upgrade, error={err}");
options
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_identity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ tracing = { version = "0.1", default_features = false }
tracing-attributes = { version = "0.1", default_features = false }

[dev-dependencies]
ockam_transport_tcp = { path = "../ockam_transport_tcp" }
ockam_transport_tcp = { path = "../ockam_transport_tcp", default-features = false }
ockam_vault = { path = "../ockam_vault" }
ockam_vault_aws = { path = "../ockam_vault_aws" }
quickcheck = "1.0.3"
Expand Down

0 comments on commit bd88c5d

Please sign in to comment.