diff --git a/filecoin-proofs/Cargo.toml b/filecoin-proofs/Cargo.toml index ba90f2ca70..0cd7ef2b16 100644 --- a/filecoin-proofs/Cargo.toml +++ b/filecoin-proofs/Cargo.toml @@ -36,7 +36,6 @@ anyhow = "1.0.23" rand_xorshift = "0.2.0" sha2 = "0.9.1" typenum = "1.11.2" -bitintr = "0.3.0" gperftools = { version = "0.2", optional = true } generic-array = "0.14.4" structopt = "0.3.12" @@ -45,6 +44,7 @@ indicatif = "0.15.0" groupy = "0.3.0" dialoguer = "0.7.1" clap = "2.33.3" +byte-slice-cast = "1.0.0" [dependencies.reqwest] version = "0.10" diff --git a/filecoin-proofs/benches/preprocessing.rs b/filecoin-proofs/benches/preprocessing.rs index defad71894..ad66aaf2db 100644 --- a/filecoin-proofs/benches/preprocessing.rs +++ b/filecoin-proofs/benches/preprocessing.rs @@ -2,7 +2,7 @@ use std::io::{self, Read}; use std::time::Duration; use criterion::{criterion_group, criterion_main, Criterion, ParameterizedBenchmark, Throughput}; -use filecoin_proofs::fr32_reader::Fr32Reader; +use filecoin_proofs::{add_piece, fr32_reader::Fr32Reader, PaddedBytesAmount, UnpaddedBytesAmount}; use rand::{thread_rng, Rng}; #[cfg(feature = "cpu-profile")] @@ -52,6 +52,7 @@ fn preprocessing_benchmark(c: &mut Criterion) { let mut reader = Fr32Reader::new(io::Cursor::new(&data)); reader.read_to_end(&mut buf).expect("in memory read error"); assert!(buf.len() >= data.len()); + buf.clear(); }); stop_profile(); }, @@ -63,5 +64,37 @@ fn preprocessing_benchmark(c: &mut Criterion) { ); } -criterion_group!(benches, preprocessing_benchmark); +fn add_piece_benchmark(c: &mut Criterion) { + c.bench( + "preprocessing", + ParameterizedBenchmark::new( + "add_piece", + |b, size| { + let padded_size = PaddedBytesAmount(*size as u64); + let unpadded_size: UnpaddedBytesAmount = padded_size.into(); + let data = random_data(unpadded_size.0 as usize); + let mut buf = Vec::with_capacity(*size); + + start_profile(&format!("add_piece_{}", *size)); + b.iter(|| { + add_piece( + io::Cursor::new(&data), + &mut buf, + unpadded_size, + &[unpadded_size][..], + ) + .unwrap(); + buf.clear(); + }); + stop_profile(); + }, + vec![512, 256 * 1024, 512 * 1024, 1024 * 1024, 2 * 1024 * 1024], + ) + .sample_size(10) + .throughput(|s| Throughput::Bytes(*s as u64)) + .warm_up_time(Duration::from_secs(1)), + ); +} + +criterion_group!(benches, preprocessing_benchmark, add_piece_benchmark); criterion_main!(benches); diff --git a/filecoin-proofs/src/fr32_reader.rs b/filecoin-proofs/src/fr32_reader.rs index df6e5b3da1..f5de6faeb9 100644 --- a/filecoin-proofs/src/fr32_reader.rs +++ b/filecoin-proofs/src/fr32_reader.rs @@ -1,157 +1,105 @@ +use byte_slice_cast::*; use std::io; -const DATA_BITS: u64 = 254; -const TARGET_BITS: u64 = 256; - -#[derive(Debug)] pub struct Fr32Reader { /// The source being padded. source: R, - /// How much of the target already was `read` from, in bits. - target_offset: u64, - /// Currently read byte. - buffer: Buffer, + /// Currently read block. + in_buffer: [u8; NUM_U128S_PER_BLOCK * 16], + /// Currently writing out block. + out_buffer: [u128; NUM_U128S_PER_BLOCK], + /// How many blocks are left in the buffers. + to_process: usize, + /// How many bytes of the out_buffer are already read out. + out_offset: usize, /// Are we done reading? done: bool, } +const NUM_U128S_PER_BLOCK: usize = 8; +const MASK_SKIP_HIGH_2: u128 = 0b0011_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111; + impl Fr32Reader { pub fn new(source: R) -> Self { Fr32Reader { source, - target_offset: 0, - buffer: Default::default(), + in_buffer: [0; NUM_U128S_PER_BLOCK * 16], + out_buffer: [0; NUM_U128S_PER_BLOCK], + to_process: 0, + out_offset: 0, done: false, } } - fn read_u8_no_pad(&mut self, target: &mut [u8]) -> io::Result { - target[0] = self.buffer.read_u8(); - self.target_offset += 8; - - Ok(1) - } - - fn read_u16_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u16_into(&mut target[..2]); - self.target_offset += 16; - - Ok(2) - } - - fn read_u32_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u32_into(&mut target[..4]); - self.target_offset += 32; - - Ok(4) - } - - fn read_u64_no_pad(&mut self, target: &mut [u8]) -> io::Result { - self.buffer.read_u64_into(&mut target[..8]); - self.target_offset += 64; - - Ok(8) - } - - /// Read up to 8 bytes into the targets first element. - /// Assumes that target is not empty. - fn read_bytes(&mut self, target: &mut [u8]) -> io::Result { - let bit_pos = self.target_offset % TARGET_BITS; - let bits_to_padding = if bit_pos < DATA_BITS { - DATA_BITS as usize - bit_pos as usize - } else { - 0 - }; + /// Processes a single block in in_buffer, writing the result to out_buffer. + fn process_block(&mut self) { + let in_buffer: &[u128] = self.in_buffer.as_slice_of::().unwrap(); + let out_buffer = &mut self.out_buffer; - if bits_to_padding >= 8 { - self.fill_buffer()?; + // 0..254 + { + out_buffer[0] = in_buffer[0]; + out_buffer[1] = in_buffer[1] & MASK_SKIP_HIGH_2; } - - let available = self.buffer.available(); - if available > 0 { - let target_len = target.len(); - // Try to avoid padding, and copy as much as possible over at once. - - if bits_to_padding >= 64 && available >= 64 && target_len >= 8 { - return self.read_u64_no_pad(target); - } - - if bits_to_padding >= 32 && available >= 32 && target_len >= 4 { - return self.read_u32_no_pad(target); - } - - if bits_to_padding >= 16 && available >= 16 && target_len >= 2 { - return self.read_u16_no_pad(target); - } - - if bits_to_padding >= 8 && available >= 8 && target_len >= 1 { - return self.read_u8_no_pad(target); - } + // 254..508 + { + out_buffer[2] = in_buffer[1] >> 126; // top 2 bits + out_buffer[2] |= in_buffer[2] << 2; // low 126 bits + out_buffer[3] = in_buffer[2] >> 126; // top 2 bits + out_buffer[3] |= in_buffer[3] << 2; // low 124 bits + out_buffer[3] &= MASK_SKIP_HIGH_2; // zero high 2 bits + } + // 508..762 + { + out_buffer[4] = in_buffer[3] >> 124; // top 4 bits + out_buffer[4] |= in_buffer[4] << 4; // low 124 bits + out_buffer[5] = in_buffer[4] >> 124; // top 4 bits + out_buffer[5] |= in_buffer[5] << 4; // low 122 bits + out_buffer[5] &= MASK_SKIP_HIGH_2; // zero high 2 bits + } + // 762..1016 + { + out_buffer[6] = in_buffer[5] >> 122; // top 6 bits + out_buffer[6] |= in_buffer[6] << 6; // low 122 bits + out_buffer[7] = in_buffer[6] >> 122; // top 6 bits + out_buffer[7] |= in_buffer[7] << 6; // low 120 bits + out_buffer[7] &= MASK_SKIP_HIGH_2; // zero high 2 bits } - - self.read_u8_padded(target, bits_to_padding, available) } - fn read_u8_padded( - &mut self, - target: &mut [u8], - bits_to_padding: usize, - available: u64, - ) -> io::Result { - target[0] = 0; - - if available >= 6 { - match bits_to_padding { - 6 => { - target[0] = self.buffer.read_u8_range(6); - self.target_offset += 8; - return Ok(1); + fn fill_buffer(&mut self) -> io::Result { + let mut bytes_read = 0; + let mut buf = &mut self.in_buffer[..127]; + + while !buf.is_empty() { + match self.source.read(buf) { + Ok(0) => { + break; } - 5 => { - target[0] = self.buffer.read_u8_range(5); - if self.buffer.read_bit() { - set_bit(&mut target[0], 7); - } - self.target_offset += 8; - return Ok(1); + Ok(n) => { + let tmp = buf; + buf = &mut tmp[n..]; + bytes_read += n; } - _ => {} + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), } } - for i in 0..8 { - if self.target_offset % TARGET_BITS < DATA_BITS { - if !self.fill_buffer()? { - if i > 0 { - return Ok(1); - } else { - return Ok(0); - } - } - - if self.buffer.read_bit() { - set_bit(&mut target[0], i); - } - }; - - self.target_offset += 1; + // clear unfilled memory + for val in &mut self.in_buffer[bytes_read..127] { + *val = 0; } - Ok(1) + Ok(bytes_read) } +} - /// Fill the inner buffer, only if necessary. Returns `true` if more data is available. - fn fill_buffer(&mut self) -> io::Result { - if self.buffer.available() > 0 { - // Nothing to do, already some data available. - return Ok(true); - } - - let read = self.source.read(&mut self.buffer[..])?; - self.buffer.reset_available(read as u64 * 8); - - Ok(read > 0) - } +/// Division of x by y, rounding up. +/// x must be > 0 +#[inline] +const fn div_ceil(x: usize, y: usize) -> usize { + 1 + ((x - 1) / y) } impl io::Read for Fr32Reader { @@ -160,146 +108,44 @@ impl io::Read for Fr32Reader { return Ok(0); } - let mut read = 0; - while read < target.len() { - let current_read = self.read_bytes(&mut target[read..])?; - read += current_read; - - if current_read == 0 { - self.done = true; - break; - } - } + let num_bytes = target.len(); - Ok(read) - } -} - -fn set_bit(x: &mut u8, bit: usize) { - *x |= 1 << bit -} - -use std::ops::{Deref, DerefMut}; - -#[derive(Debug, Default, Clone, Copy)] -struct Buffer { - data: u64, - /// Bits already consumed. - pos: u64, - /// Bits available. - avail: u64, -} - -impl Deref for Buffer { - type Target = [u8; 8]; - - fn deref(&self) -> &Self::Target { - unsafe { &*(&self.data as *const u64 as *const [u8; 8]) } - } -} - -impl DerefMut for Buffer { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut *(&mut self.data as *mut u64 as *mut [u8; 8]) } - } -} - -impl Buffer { - /// How many bits are available to read. - #[inline] - pub fn available(&self) -> u64 { - self.avail - self.pos - } - - pub fn reset_available(&mut self, bits: u64) { - self.pos = 0; - self.avail = bits; - } - - /// Read a single bit at the current position. - pub fn read_bit(&mut self) -> bool { - let res = self.data & (1 << self.pos) != 0; - debug_assert!(self.available() >= 1); - self.pos += 1; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u8_range(&mut self, len: u64) -> u8 { - use bitintr::Bextr; - debug_assert!(self.available() >= len); - let res = self.data.bextr(self.pos, len) as u8; - self.pos += len; - res - } - - #[cfg(target_endian = "little")] - pub fn read_u8(&mut self) -> u8 { - use bitintr::Bextr; - debug_assert!(self.available() >= 8); - let res = self.data.bextr(self.pos, 8) as u8; - self.pos += 8; - res - } + let mut read = 0; - #[cfg(target_endian = "little")] - pub fn read_u16(&mut self) -> u16 { - debug_assert!(self.available() >= 16); + while read < num_bytes { + // load new block + if self.to_process == 0 { + let bytes_read = self.fill_buffer()?; - use bitintr::Bextr; - let res = self.data.bextr(self.pos, 16) as u16; - self.pos += 16; - res - } + // read all data, no new data in the buffer + if bytes_read == 0 { + self.done = true; + break; + } - #[cfg(target_endian = "little")] - pub fn read_u16_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 2); + self.process_block(); + self.to_process += div_ceil(bytes_read * 8, 254); + self.out_offset = 0; + } - let value = self.read_u16().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - } + // write out result + let available_bytes = self.to_process * (256 / 8); - #[cfg(target_endian = "little")] - pub fn read_u32(&mut self) -> u32 { - debug_assert!(self.available() >= 32); + let start = read; + let end = std::cmp::min(start + available_bytes, num_bytes); + let len = end - start; - use bitintr::Bextr; - let res = self.data.bextr(self.pos, 32) as u32; - self.pos += 32; - res - } + let out_start = self.out_offset; + let out_end = out_start + len; - #[cfg(target_endian = "little")] - pub fn read_u32_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 4); - let value = self.read_u32().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - target[2] = value[2]; - target[3] = value[3]; - } - - pub fn read_u64(&mut self) -> u64 { - debug_assert!(self.available() >= 64); - - self.pos += 64; - self.data - } + target[start..end] + .copy_from_slice(&self.out_buffer.as_byte_slice()[out_start..out_end]); + read += len; + self.out_offset += len; + self.to_process -= div_ceil(len * 8, 256); + } - #[cfg(target_endian = "little")] - pub fn read_u64_into(&mut self, target: &mut [u8]) { - assert!(target.len() >= 8); - let value = self.read_u64().to_le_bytes(); - target[0] = value[0]; - target[1] = value[1]; - target[2] = value[2]; - target[3] = value[3]; - target[4] = value[4]; - target[5] = value[5]; - target[6] = value[6]; - target[7] = value[7]; + Ok(read) } } @@ -309,67 +155,8 @@ mod tests { use pretty_assertions::assert_eq; use std::io::Read; - #[test] - fn test_buffer_read_bit() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for i in 0..8 { - assert_eq!(buffer.read_bit(), 0 != val[0] & (1 << i)); - } - } - - #[test] - fn test_buffer_read_u8() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for (i, &byte) in val.iter().enumerate().take(8) { - let read = buffer.read_u8(); - assert_eq!(read, byte, "failed to read byte {}", i); - } - } - - #[test] - fn test_buffer_read_u16() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for val in val.chunks(2) { - let read = buffer.read_u16(); - assert_eq!(read, u16::from_le_bytes([val[0], val[1]])); - } - } - - #[test] - fn test_buffer_read_u32() { - let mut buffer = Buffer::default(); - let val = 12345u64.to_le_bytes(); - buffer.copy_from_slice(&val[..]); - buffer.reset_available(64); - - for val in val.chunks(4) { - let read = buffer.read_u32(); - assert_eq!(read, u32::from_le_bytes([val[0], val[1], val[2], val[3]])); - } - } - - #[test] - fn test_buffer_read_u64() { - let mut buffer = Buffer::default(); - let val = 12345u64; - buffer.copy_from_slice(&val.to_le_bytes()[..]); - buffer.reset_available(64); - - let read = buffer.read_u64(); - assert_eq!(read, val); - } + const DATA_BITS: u64 = 254; + const TARGET_BITS: u64 = 256; #[test] fn test_simple_short() { @@ -380,7 +167,8 @@ mod tests { reader .read_to_end(&mut padded) .expect("in-memory read failed"); - assert_eq!(&data[..], &padded[..]); + assert_eq!(padded.len(), 32); + assert_eq!(&data[..], &padded[..30]); assert_eq!(padded.into_boxed_slice(), bit_vec_padding(data)); } @@ -397,9 +185,10 @@ mod tests { assert_eq!(&padded[0..31], &data[0..31]); assert_eq!(padded[31], 0b0011_1111); assert_eq!(padded[32], 0b0000_0011); - assert_eq!(padded.len(), 33); - - assert_eq!(padded.into_boxed_slice(), bit_vec_padding(data)); + assert_eq!(padded.len(), 64); + let bv = bit_vec_padding(data); + assert_eq!(bv.len(), 64); + assert_eq!(padded.into_boxed_slice(), bv); } #[test] @@ -479,7 +268,7 @@ mod tests { let mut rng = rand::thread_rng(); for i in 1..100 { - for j in 0..50 { + for j in 1..50 { let mut data = vec![0u8; i * j]; rng.fill_bytes(&mut data); @@ -487,16 +276,17 @@ mod tests { let mut reader = Fr32Reader::new(io::Cursor::new(&data)); reader.read_to_end(&mut buf).expect("in-memory read failed"); - assert_eq!(buf.clone().into_boxed_slice(), bit_vec_padding(data)); + assert_eq!( + buf.into_boxed_slice(), + bit_vec_padding(data), + "{} - {}", + i, + j + ); } } } - // Simple (and slow) padder implementation using `BitVec`. - // It is technically not quite right to use `BitVec` to test this, since at - // the moment that function still uses - // it for some corner cases, but since largely this implementation - // has been replaced it seems reasonable. fn bit_vec_padding(raw_data: Vec) -> Box<[u8]> { use bitvec::{order::Lsb0 as LittleEndian, vec::BitVec}; use itertools::Itertools; @@ -517,6 +307,10 @@ mod tests { } } + while padded_data.len() % (TARGET_BITS as usize) != 0 { + padded_data.push(false); + } + padded_data.into_boxed_slice() } @@ -567,4 +361,87 @@ mod tests { assert_eq!(buf.into_boxed_slice(), bit_vec_padding(source)); } + + const BLOCK_SIZE: usize = 127 * 8; + + #[test] + fn test_fast() { + // unpadded 127 * 8 bits => 4 * 254 + // padded 128 * 8 bits => 4 * 256 + + // source: 127bytes + // [ + // 0: 0..254 (0..31) + // 1: 254..508 (31..64) + // 254..318 + // 318..382 + // 382..446 + // + // 2: 508..762 (64..96) + // 3: 762..1016 (96..) + // ] + + // target: 128bytes + + let data = [255u8; 127]; + let mut padded = [0u8; 128]; + + let num_bits = data.len() * 8; + assert_eq!(num_bits % 127, 0); + + let num_blocks = div_ceil(num_bits, BLOCK_SIZE); + + let mut in_buffer_bytes = [0u8; 128]; + let mut out_buffer = [0u128; NUM_U128S_PER_BLOCK]; + for block in 0..num_blocks { + // load current block + let block_offset_start = block * 127; + let block_offset_end = std::cmp::min(block_offset_start + 128, data.len()); + let end = block_offset_end - block_offset_start; + in_buffer_bytes[..end] + .copy_from_slice(&data[dbg!(block_offset_start)..dbg!(block_offset_end)]); + let in_buffer: &[u128] = in_buffer_bytes.as_slice_of::().unwrap(); + + // write out fr chunks + + // 0..254 + { + out_buffer[0] = in_buffer[0]; + out_buffer[1] = in_buffer[1] & MASK_SKIP_HIGH_2; + } + // 254..508 + { + out_buffer[2] = in_buffer[1] >> 126; // top 2 bits + out_buffer[2] |= in_buffer[2] << 2; // low 126 bits + out_buffer[3] = in_buffer[2] >> 126; // top 2 bits + out_buffer[3] |= in_buffer[3] << 2; // low 124 bits + out_buffer[3] &= MASK_SKIP_HIGH_2; // zero high 2 bits + } + // 508..762 + { + out_buffer[4] = in_buffer[3] >> 124; // top 4 bits + out_buffer[4] |= in_buffer[4] << 4; // low 124 bits + out_buffer[5] = in_buffer[4] >> 124; // top 4 bits + out_buffer[5] |= in_buffer[5] << 4; // low 122 bits + out_buffer[5] &= MASK_SKIP_HIGH_2; // zero high 2 bits + } + // 762..1016 + { + out_buffer[6] = in_buffer[5] >> 122; // top 6 bits + out_buffer[6] |= in_buffer[6] << 6; // low 122 bits + out_buffer[7] = in_buffer[6] >> 122; // top 6 bits + out_buffer[7] |= in_buffer[7] << 6; // low 120 bits + out_buffer[7] &= MASK_SKIP_HIGH_2; // zero high 2 bits + } + padded[block * 128..(block + 1) * 128].copy_from_slice(out_buffer.as_byte_slice()); + } + + assert_eq!(&padded[0..31], &data[0..31]); + assert_eq!(padded[31], 0b0011_1111); + assert_eq!(padded[32], 0b1111_1111); + + assert_eq!(padded.len(), 128); + + assert_eq!(&padded[..], &bit_vec_padding(data.to_vec())[..]); + } }