Skip to content
This repository has been archived by the owner on Sep 7, 2024. It is now read-only.

Commit

Permalink
Configuring network::message::RawNetworkMessage.from_stream buffer si…
Browse files Browse the repository at this point in the history
…ze and iterations with special struct
  • Loading branch information
dr-orlovsky committed Feb 2, 2019
1 parent 4426a14 commit 48021db
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions src/network/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ impl<D: Decoder> Decodable<D> for CommandString {
}
}

#[derive(Debug)]
/// Struct used to configure stream reader function
pub struct StreamReaderConfig {
/// Number of attempts to read data from the stream if the reader returns 0 bytes
pub iterations: usize,
/// Size of allocated buffer for a single read opetaion
pub buffer_size: usize
}

/// Defining default values
impl Default for StreamReaderConfig {
fn default() -> Self { Self { iterations: 16, buffer_size: 64 * 1024 } }
}

#[derive(Debug)]
/// A Network message
pub struct RawNetworkMessage {
Expand Down Expand Up @@ -150,28 +164,34 @@ impl RawNetworkMessage {

/// Reads stream from a TCP socket and parses first message from it, returing
/// the rest of the unparsed buffer for later usage.
pub fn from_stream(stream: &mut Read, remaining_part: &mut Vec<u8>) -> Result<Self, encode::Error> {
let mut max_iterations = 16;
while max_iterations > 0 {
max_iterations -= 1;
pub fn from_stream(stream: &mut Read, remaining_part: &mut Vec<u8>,
StreamReaderConfig { iterations, buffer_size }: StreamReaderConfig) -> Result<Self, encode::Error> {
println!("Called with {} iterations and {} ubffer size", iterations, buffer_size);
let mut iterations = iterations;
while iterations > 0 {
iterations -= 1;

if remaining_part.len() > 0 {
match encode::deserialize_partial::<RawNetworkMessage>(&remaining_part) {
// In this case we just have an incomplete data, so we need to read more
Err(encode::Error::Io(ref err)) if err.kind() == io::ErrorKind::UnexpectedEof => (),
// All other types of errors should be passed up to the caller
Err(err) => return Err(err),
// We have successfully read from the buffer
Ok((message, index)) => {
println!("Deserialized {} bytes", index);
remaining_part.drain(..index);
return Ok(message)
},
}
}

let mut new_data = vec![0u8; 1024];
let mut new_data = vec![0u8; buffer_size];
let count = stream.read(&mut new_data)?;
if count > 0 {
remaining_part.extend(new_data[0..count].iter());
}
println!("Read {} bytes, remaining part now is {} bytes length", count, remaining_part.len());
}
Err(encode::Error::ParseFailed("Zero-length input"))
}
Expand Down Expand Up @@ -394,7 +414,7 @@ mod test {
tmpfile.seek(SeekFrom::Start(0)).unwrap();

let mut buffer = vec![];
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer);
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer, Default::default());
assert!(buffer.len() > 0);
assert!(msg.is_err());
}
Expand Down Expand Up @@ -430,7 +450,7 @@ mod test {
tmpfile.seek(SeekFrom::Start(0)).unwrap();

let mut buffer = vec![];
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer).unwrap();
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer, Default::default()).unwrap();
assert!(buffer.len() > 0);
assert_eq!(msg.magic, 0xd9b4bef9);
if let NetworkMessage::Version(version_msg) = msg.payload {
Expand All @@ -446,7 +466,7 @@ mod test {
}

println!("{:?}", &buffer);
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer).unwrap();
let msg = RawNetworkMessage::from_stream(&mut tmpfile, &mut buffer,Default::default()).unwrap();
assert_eq!(buffer.len(), 0);
assert_eq!(msg.magic, 0xd9b4bef9);
if let NetworkMessage::Ping(nonce) = msg.payload {
Expand Down

0 comments on commit 48021db

Please sign in to comment.