Skip to content

Commit

Permalink
refactor: fixup fvm IPLD flush logic
Browse files Browse the repository at this point in the history
In preparation for reachability analysis (we're going to re-use this
same code).

- Make it less generic (performance).

- Remove blocks from the write buffer as we write them to avoid
  duplicate writes.

- Simplify some of the checks around what is allowed. For example, I'm
  now allowing CBOR + Identity hash which should have been allowed
  previously but wasn't (we don't use it but still, it should have been
  allowed).

- Remove the explicit 100 byte CID length check. The `Cid` type already
  validates that the digest can be no longer than 64 bytes.

- Be less strict on DagCBOR validation. Counter-intuitively, being
  overly strict here is dangerous as it gives us more points where
  implementations can disagree and fork. Instead, we enforce the
  following rules for DAG_CBOR:

  1. Blocks must have a valid CBOR structure, but _values_ aren't
     validated. E.g., no utf-8 validation, no float validation, no
     minimum encoding requirements, no canonical ordering requirements,
     etc.

  2. All CBOR values tagged with 42 must be valid CIDs. I.e., a CBOR
     byte string starting with a 0x0 byte followed by a valid CID with
     at most a 64 byte digest.
  • Loading branch information
Stebalien committed Jul 7, 2023
1 parent 8f33aa6 commit bdaf6a3
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 119 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion fvm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ filecoin-proofs-api = { version = "14", default-features = false }
rayon = "1"
num_cpus = "1.15.0"
log = "0.4.19"
byteorder = "1.4.3"
fvm-wasm-instrument = "0.4.0"
yastl = "0.1.2"
arbitrary = { version = "1.3.0", optional = true, features = ["derive"] }
Expand Down
201 changes: 84 additions & 117 deletions fvm/src/blockstore/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

use std::cell::RefCell;
use std::collections::HashMap;
use std::io::{Cursor, Read, Seek};
use std::io::Read;

use anyhow::{anyhow, Result};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use cid::Cid;
use fvm_ipld_blockstore::{Blockstore, Buffered};
use fvm_ipld_encoding::{CBOR, DAG_CBOR};
use fvm_ipld_encoding::{CBOR, DAG_CBOR, IPLD_RAW};
use fvm_shared::commcid::{FIL_COMMITMENT_SEALED, FIL_COMMITMENT_UNSEALED};

/// Wrapper around `Blockstore` to limit and have control over when values are written.
Expand Down Expand Up @@ -43,15 +42,10 @@ where
{
/// Flushes the buffered cache based on the root node.
/// This will recursively traverse the cache and write all data connected by links to this
/// root Cid. Calling flush will not reset the write buffer.
/// root Cid, moving the reachable blocks from the write buffer to the backing store.
fn flush(&self, root: &Cid) -> Result<()> {
let mut buffer = Vec::new();
let s = self.write.borrow();
copy_rec(&s, *root, &mut buffer)?;

self.base.put_many_keyed(buffer)?;

Ok(())
self.base
.put_many_keyed(take_reachable(&mut self.write.borrow_mut(), root)?)
}
}

Expand All @@ -62,86 +56,63 @@ where
/// This was implemented because the CBOR library we use does not expose low
/// methods like this, requiring us to deserialize the whole CBOR payload, which
/// is unnecessary and quite inefficient for our usecase here.
fn cbor_read_header_buf<B: Read>(br: &mut B, scratch: &mut [u8]) -> anyhow::Result<(u8, usize)> {
let first = br.read_u8()?;
fn cbor_read_header_buf<B: Read>(br: &mut B) -> anyhow::Result<(u8, u64)> {
#[inline(always)]
pub fn read_fixed<const N: usize>(r: &mut impl Read) -> std::io::Result<[u8; N]> {
let mut buf = [0; N];
r.read_exact(&mut buf).map(|_| buf)
}

let first = read_fixed::<1>(br)?[0];
let maj = (first & 0xe0) >> 5;
let low = first & 0x1f;

if low < 24 {
Ok((maj, low as usize))
} else if low == 24 {
let val = br.read_u8()?;
if val < 24 {
return Err(anyhow!(
"cbor input was not canonical (lval 24 with value < 24)"
));
}
Ok((maj, val as usize))
} else if low == 25 {
br.read_exact(&mut scratch[..2])?;
let val = BigEndian::read_u16(&scratch[..2]);
if val <= u8::MAX as u16 {
return Err(anyhow!(
"cbor input was not canonical (lval 25 with value <= MaxUint8)"
));
}
Ok((maj, val as usize))
} else if low == 26 {
br.read_exact(&mut scratch[..4])?;
let val = BigEndian::read_u32(&scratch[..4]);
if val <= u16::MAX as u32 {
return Err(anyhow!(
"cbor input was not canonical (lval 26 with value <= MaxUint16)"
));
}
Ok((maj, val as usize))
} else if low == 27 {
br.read_exact(&mut scratch[..8])?;
let val = BigEndian::read_u64(&scratch[..8]);
if val <= u32::MAX as u64 {
return Err(anyhow!(
"cbor input was not canonical (lval 27 with value <= MaxUint32)"
));
}
Ok((maj, val as usize))
} else {
Err(anyhow!("invalid header cbor_read_header_buf"))
}
let val = match low {
..=23 => low.into(),
24 => read_fixed::<1>(br)?[0].into(),
25 => u16::from_be_bytes(read_fixed(br)?).into(),
26 => u32::from_be_bytes(read_fixed(br)?).into(),
27 => u64::from_be_bytes(read_fixed(br)?),
_ => return Err(anyhow!("invalid header cbor_read_header_buf")),
};
Ok((maj, val))
}

/// Given a CBOR serialized IPLD buffer, read through all of it and return all the Links.
/// This function is useful because it is quite a bit more fast than doing this recursively on a
/// deserialized IPLD object.
fn scan_for_links<B: Read + Seek, F>(buf: &mut B, mut callback: F) -> Result<()>
where
F: FnMut(Cid) -> anyhow::Result<()>,
{
let mut scratch: [u8; 100] = [0; 100];
fn scan_for_links(mut buf: &[u8], out: &mut Vec<Cid>) -> Result<()> {
let mut remaining = 1;
while remaining > 0 {
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?;
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
match maj {
// MajUnsignedInt, MajNegativeInt, MajOther
0 | 1 | 7 => {}
// MajByteString, MajTextString
2 | 3 => {
buf.seek(std::io::SeekFrom::Current(extra as i64))?;
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
buf = &buf[extra as usize..];
}
// MajTag
6 => {
// Check if the tag refers to a CID
if extra == 42 {
let (maj, extra) = cbor_read_header_buf(buf, &mut scratch)?;
let (maj, extra) = cbor_read_header_buf(&mut buf)?;
// The actual CID is expected to be a byte string
if maj != 2 {
return Err(anyhow!("expected cbor type byte string in input"));
}
if extra > 100 {
return Err(anyhow!("string in cbor input too long"));
if extra > buf.len() as u64 {
return Err(anyhow!("unexpected end of cbor stream"));
}
buf.read_exact(&mut scratch[..extra])?;
let c = Cid::try_from(&scratch[1..extra])?;
callback(c)?;
if buf.first() != Some(&0u8) {
return Err(anyhow!("DagCBOR CID does not start with a 0x byte"));
}
let cid_buf;
(cid_buf, buf) = buf.split_at(extra as usize);
out.push(Cid::try_from(&cid_buf[1..])?);
} else {
remaining += 1;
}
Expand All @@ -154,22 +125,18 @@ where
5 => {
remaining += extra * 2;
}
_ => {
return Err(anyhow!("unhandled cbor type: {}", maj));
8.. => {
// This case is statically impossible unless `cbor_read_header_buf` makes a mistake.
return Err(anyhow!("invalid cbor tag exceeds 3 bits: {}", maj));
}
}
remaining -= 1;
}
Ok(())
}

/// Copies the IPLD DAG under `root` from the cache to the base store.
fn copy_rec<'a>(
cache: &'a HashMap<Cid, Vec<u8>>,
root: Cid,
buffer: &mut Vec<(Cid, &'a [u8])>,
) -> Result<()> {
const DAG_RAW: u64 = 0x55;
/// Moves the IPLD DAG under `root` from the cache to the base store.
fn take_reachable(cache: &mut HashMap<Cid, Vec<u8>>, root: &Cid) -> Result<Vec<(Cid, Vec<u8>)>> {
const BLAKE2B_256: u64 = 0xb220;
const BLAKE2B_LEN: u8 = 32;
const IDENTITY: u64 = 0x0;
Expand All @@ -180,53 +147,53 @@ fn copy_rec<'a>(
// 2. We always write-back new blocks, even if the client already has them. We haven't noticed a
// perf impact.

// TODO(M2): Make this not cbor specific.
// TODO(M2): Allow CBOR (not just DAG_CBOR).
match (root.codec(), root.hash().code(), root.hash().size()) {
// Allow non-truncated blake2b-256 raw/cbor (code/state)
(DAG_RAW | DAG_CBOR | CBOR, BLAKE2B_256, BLAKE2B_LEN) => (),
// Ignore raw identity cids (fake code cids)
(DAG_RAW, IDENTITY, _) => return Ok(()),
// Copy links from cbor identity cids.
// We shouldn't be creating these at the moment, but lotus' vm.Copy supports them.
(DAG_CBOR, IDENTITY, _) => {
return scan_for_links(&mut Cursor::new(root.hash().digest()), |link| {
copy_rec(cache, link, buffer)
})
let mut stack = vec![*root];
let mut result = Vec::new();

while let Some(k) = stack.pop() {
// Check the codec.
match k.codec() {
// We ignore piece commitment CIDs.
FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED => continue,
// We allow raw, cbor, and dag cbor.
IPLD_RAW | DAG_CBOR | CBOR => (),
// Everything else is rejected.
codec => return Err(anyhow!("cid {k} has unexpected codec ({codec})")),
}
// Ignore commitments (not even going to check the hash function.
(FIL_COMMITMENT_UNSEALED | FIL_COMMITMENT_SEALED, _, _) => return Ok(()),
// Fail on anything else. We usually want to continue on error, but there's really no going
// back from here.
(codec, hash, length) => {
return Err(anyhow!(
"cid {root} has unexpected codec ({codec}), hash ({hash}), or length ({length})"
))
// Check the hash construction.
match (k.hash().code(), k.hash().size()) {
// Allow non-truncated blake2b-256 and identity hashes.
(BLAKE2B_256, BLAKE2B_LEN) | (IDENTITY, _) => (),
// Reject everything else.
(hash, length) => {
return Err(anyhow!(
"cid {k} has unexpected multihash (code={hash}, len={length})"
))
}
}
}

// If we don't have the block, we assume it's already in the datastore.
//
// The alternative would be to check if it's in the datastore, but that's likely even more
// expensive. And there wouldn't be much we could do at that point but abort the block.
let block = match cache.get(&root) {
Some(blk) => blk,
None => return Ok(()),
};
if k.hash().code() == IDENTITY {
if k.codec() == DAG_CBOR {
scan_for_links(k.hash().digest(), &mut stack)?;
}
} else {
// If we don't have the block, we assume it and it's children are already in the
// datastore.
//
// The alternative would be to check if it's in the datastore, but that's likely even more
// expensive. And there wouldn't be much we could do at that point but abort the block.
let Some(block) = cache.remove(&k) else { continue };

// At the moment, only DAG_CBOR can link to other blocks.
if k.codec() == DAG_CBOR {
scan_for_links(&block, &mut stack)?;
}

// At the moment, we only expect dag-cbor and raw.
// In M2, we'll need to copy explicitly.
if root.codec() == DAG_CBOR {
// TODO(M2): Make this non-recursive.
scan_for_links(&mut Cursor::new(block), |link| {
copy_rec(cache, link, buffer)
})?;
// Record the block so we can write it back.
result.push((k, block));
};
}

// Finally, push the block. We do this _last_ so that we always include write before parents.
buffer.push((root, block));

Ok(())
Ok(result)
}

impl<BS> Blockstore for BufferedBlockstore<BS>
Expand Down

0 comments on commit bdaf6a3

Please sign in to comment.