From bdaf6a316b8599f5a8ab79eb2b15f1fd14985081 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 6 Jul 2023 17:43:04 -0700 Subject: [PATCH] refactor: fixup fvm IPLD flush logic In preparation for reachability analysis (we're going to re-use this same code). - Make it less generic (performance). - Remove blocks from the write buffer as we write them to avoid duplicate writes. - Simplify some of the checks around what is allowed. For example, I'm now allowing CBOR + Identity hash which should have been allowed previously but wasn't (we don't use it but still, it should have been allowed). - Remove the explicit 100 byte CID length check. The `Cid` type already validates that the digest can be no longer than 64 bytes. - Be less strict on DagCBOR validation. Counter-intuitively, being overly strict here is dangerous as it gives us more points where implementations can disagree and fork. Instead, we enforce the following rules for DAG_CBOR: 1. Blocks must have a valid CBOR structure, but _values_ aren't validated. E.g., no utf-8 validation, no float validation, no minimum encoding requirements, no canonical ordering requirements, etc. 2. All CBOR values tagged with 42 must be valid CIDs. I.e., a CBOR byte string starting with a 0x0 byte followed by a valid CID with at most a 64 byte digest. --- Cargo.lock | 1 - fvm/Cargo.toml | 1 - fvm/src/blockstore/buffered.rs | 201 ++++++++++++++------------------- 3 files changed, 84 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f4de6e16..58ec33ade 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2326,7 +2326,6 @@ version = "3.5.0" dependencies = [ "anyhow", "arbitrary", - "byteorder", "cid 0.10.1", "derive_more", "filecoin-proofs-api", diff --git a/fvm/Cargo.toml b/fvm/Cargo.toml index 4bae60953..095eb751d 100644 --- a/fvm/Cargo.toml +++ b/fvm/Cargo.toml @@ -31,7 +31,6 @@ filecoin-proofs-api = { version = "14", default-features = false } rayon = "1" num_cpus = "1.15.0" log = "0.4.19" -byteorder = "1.4.3" fvm-wasm-instrument = "0.4.0" yastl = "0.1.2" arbitrary = { version = "1.3.0", optional = true, features = ["derive"] } diff --git a/fvm/src/blockstore/buffered.rs b/fvm/src/blockstore/buffered.rs index 17107b98b..4b7f0da22 100644 --- a/fvm/src/blockstore/buffered.rs +++ b/fvm/src/blockstore/buffered.rs @@ -4,13 +4,12 @@ use std::cell::RefCell; use std::collections::HashMap; -use std::io::{Cursor, Read, Seek}; +use std::io::Read; use anyhow::{anyhow, Result}; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use cid::Cid; use fvm_ipld_blockstore::{Blockstore, Buffered}; -use fvm_ipld_encoding::{CBOR, DAG_CBOR}; +use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW}; use fvm_shared::commcid::{FIL_COMMITMENT_SEALED, FIL_COMMITMENT_UNSEALED}; /// Wrapper around `Blockstore` to limit and have control over when values are written. @@ -43,15 +42,10 @@ where { /// Flushes the buffered cache based on the root node. /// This will recursively traverse the cache and write all data connected by links to this - /// root Cid. Calling flush will not reset the write buffer. + /// root Cid, moving the reachable blocks from the write buffer to the backing store. fn flush(&self, root: &Cid) -> Result<()> { - let mut buffer = Vec::new(); - let s = self.write.borrow(); - copy_rec(&s, *root, &mut buffer)?; - - self.base.put_many_keyed(buffer)?; - - Ok(()) + self.base + .put_many_keyed(take_reachable(&mut self.write.borrow_mut(), root)?) } } @@ -62,86 +56,63 @@ where /// This was implemented because the CBOR library we use does not expose low /// methods like this, requiring us to deserialize the whole CBOR payload, which /// is unnecessary and quite inefficient for our usecase here. -fn cbor_read_header_buf(br: &mut B, scratch: &mut [u8]) -> anyhow::Result<(u8, usize)> { - let first = br.read_u8()?; +fn cbor_read_header_buf(br: &mut B) -> anyhow::Result<(u8, u64)> { + #[inline(always)] + pub fn read_fixed(r: &mut impl Read) -> std::io::Result<[u8; N]> { + let mut buf = [0; N]; + r.read_exact(&mut buf).map(|_| buf) + } + + let first = read_fixed::<1>(br)?[0]; let maj = (first & 0xe0) >> 5; let low = first & 0x1f; - if low < 24 { - Ok((maj, low as usize)) - } else if low == 24 { - let val = br.read_u8()?; - if val < 24 { - return Err(anyhow!( - "cbor input was not canonical (lval 24 with value < 24)" - )); - } - Ok((maj, val as usize)) - } else if low == 25 { - br.read_exact(&mut scratch[..2])?; - let val = BigEndian::read_u16(&scratch[..2]); - if val <= u8::MAX as u16 { - return Err(anyhow!( - "cbor input was not canonical (lval 25 with value <= MaxUint8)" - )); - } - Ok((maj, val as usize)) - } else if low == 26 { - br.read_exact(&mut scratch[..4])?; - let val = BigEndian::read_u32(&scratch[..4]); - if val <= u16::MAX as u32 { - return Err(anyhow!( - "cbor input was not canonical (lval 26 with value <= MaxUint16)" - )); - } - Ok((maj, val as usize)) - } else if low == 27 { - br.read_exact(&mut scratch[..8])?; - let val = BigEndian::read_u64(&scratch[..8]); - if val <= u32::MAX as u64 { - return Err(anyhow!( - "cbor input was not canonical (lval 27 with value <= MaxUint32)" - )); - } - Ok((maj, val as usize)) - } else { - Err(anyhow!("invalid header cbor_read_header_buf")) - } + let val = match low { + ..=23 => low.into(), + 24 => read_fixed::<1>(br)?[0].into(), + 25 => u16::from_be_bytes(read_fixed(br)?).into(), + 26 => u32::from_be_bytes(read_fixed(br)?).into(), + 27 => u64::from_be_bytes(read_fixed(br)?), + _ => return Err(anyhow!("invalid header cbor_read_header_buf")), + }; + Ok((maj, val)) } /// Given a CBOR serialized IPLD buffer, read through all of it and return all the Links. /// This function is useful because it is quite a bit more fast than doing this recursively on a /// deserialized IPLD object. -fn scan_for_links(buf: &mut B, mut callback: F) -> Result<()> -where - F: FnMut(Cid) -> anyhow::Result<()>, -{ - let mut scratch: [u8; 100] = [0; 100]; +fn scan_for_links(mut buf: &[u8], out: &mut Vec) -> Result<()> { let mut remaining = 1; while remaining > 0 { - let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?; + let (maj, extra) = cbor_read_header_buf(&mut buf)?; match maj { // MajUnsignedInt, MajNegativeInt, MajOther 0 | 1 | 7 => {} // MajByteString, MajTextString 2 | 3 => { - buf.seek(std::io::SeekFrom::Current(extra as i64))?; + if extra > buf.len() as u64 { + return Err(anyhow!("unexpected end of cbor stream")); + } + buf = &buf[extra as usize..]; } // MajTag 6 => { // Check if the tag refers to a CID if extra == 42 { - let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?; + let (maj, extra) = cbor_read_header_buf(&mut buf)?; // The actual CID is expected to be a byte string if maj != 2 { return Err(anyhow!("expected cbor type byte string in input")); } - if extra > 100 { - return Err(anyhow!("string in cbor input too long")); + if extra > buf.len() as u64 { + return Err(anyhow!("unexpected end of cbor stream")); } - buf.read_exact(&mut scratch[..extra])?; - let c = Cid::try_from(&scratch[1..extra])?; - callback(c)?; + if buf.first() != Some(&0u8) { + return Err(anyhow!("DagCBOR CID does not start with a 0x byte")); + } + let cid_buf; + (cid_buf, buf) = buf.split_at(extra as usize); + out.push(Cid::try_from(&cid_buf[1..])?); } else { remaining += 1; } @@ -154,8 +125,9 @@ where 5 => { remaining += extra * 2; } - _ => { - return Err(anyhow!("unhandled cbor type: {}", maj)); + 8.. => { + // This case is statically impossible unless `cbor_read_header_buf` makes a mistake. + return Err(anyhow!("invalid cbor tag exceeds 3 bits: {}", maj)); } } remaining -= 1; @@ -163,13 +135,8 @@ where Ok(()) } -/// Copies the IPLD DAG under `root` from the cache to the base store. -fn copy_rec<'a>( - cache: &'a HashMap>, - root: Cid, - buffer: &mut Vec<(Cid, &'a [u8])>, -) -> Result<()> { - const DAG_RAW: u64 = 0x55; +/// Moves the IPLD DAG under `root` from the cache to the base store. +fn take_reachable(cache: &mut HashMap>, root: &Cid) -> Result)>> { const BLAKE2B_256: u64 = 0xb220; const BLAKE2B_LEN: u8 = 32; const IDENTITY: u64 = 0x0; @@ -180,53 +147,53 @@ fn copy_rec<'a>( // 2. We always write-back new blocks, even if the client already has them. We haven't noticed a // perf impact. - // TODO(M2): Make this not cbor specific. - // TODO(M2): Allow CBOR (not just DAG_CBOR). - match (root.codec(), root.hash().code(), root.hash().size()) { - // Allow non-truncated blake2b-256 raw/cbor (code/state) - (DAG_RAW | DAG_CBOR | CBOR, BLAKE2B_256, BLAKE2B_LEN) => (), - // Ignore raw identity cids (fake code cids) - (DAG_RAW, IDENTITY, _) => return Ok(()), - // Copy links from cbor identity cids. - // We shouldn't be creating these at the moment, but lotus' vm.Copy supports them. - (DAG_CBOR, IDENTITY, _) => { - return scan_for_links(&mut Cursor::new(root.hash().digest()), |link| { - copy_rec(cache, link, buffer) - }) + let mut stack = vec![*root]; + let mut result = Vec::new(); + + while let Some(k) = stack.pop() { + // Check the codec. + match k.codec() { + // We ignore piece commitment CIDs. + FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED => continue, + // We allow raw, cbor, and dag cbor. + IPLD_RAW | DAG_CBOR | CBOR => (), + // Everything else is rejected. + codec => return Err(anyhow!("cid {k} has unexpected codec ({codec})")), } - // Ignore commitments (not even going to check the hash function. - (FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED, _, _) => return Ok(()), - // Fail on anything else. We usually want to continue on error, but there's really no going - // back from here. - (codec, hash, length) => { - return Err(anyhow!( - "cid {root} has unexpected codec ({codec}), hash ({hash}), or length ({length})" - )) + // Check the hash construction. + match (k.hash().code(), k.hash().size()) { + // Allow non-truncated blake2b-256 and identity hashes. + (BLAKE2B_256, BLAKE2B_LEN) | (IDENTITY, _) => (), + // Reject everything else. + (hash, length) => { + return Err(anyhow!( + "cid {k} has unexpected multihash (code={hash}, len={length})" + )) + } } - } - - // If we don't have the block, we assume it's already in the datastore. - // - // The alternative would be to check if it's in the datastore, but that's likely even more - // expensive. And there wouldn't be much we could do at that point but abort the block. - let block = match cache.get(&root) { - Some(blk) => blk, - None => return Ok(()), - }; + if k.hash().code() == IDENTITY { + if k.codec() == DAG_CBOR { + scan_for_links(k.hash().digest(), &mut stack)?; + } + } else { + // If we don't have the block, we assume it and it's children are already in the + // datastore. + // + // The alternative would be to check if it's in the datastore, but that's likely even more + // expensive. And there wouldn't be much we could do at that point but abort the block. + let Some(block) = cache.remove(&k) else { continue }; + + // At the moment, only DAG_CBOR can link to other blocks. + if k.codec() == DAG_CBOR { + scan_for_links(&block, &mut stack)?; + } - // At the moment, we only expect dag-cbor and raw. - // In M2, we'll need to copy explicitly. - if root.codec() == DAG_CBOR { - // TODO(M2): Make this non-recursive. - scan_for_links(&mut Cursor::new(block), |link| { - copy_rec(cache, link, buffer) - })?; + // Record the block so we can write it back. + result.push((k, block)); + }; } - // Finally, push the block. We do this _last_ so that we always include write before parents. - buffer.push((root, block)); - - Ok(()) + Ok(result) } impl Blockstore for BufferedBlockstore