Skip to content

Commit

Permalink
Basic on-the-fly zstd compression
Browse files Browse the repository at this point in the history
Prefix every bin-key value with a single uint64 varint containing:
  (3bit store-type)            // 0 for as-is, 1 for basic zstd compression, remainder reserved
    +
  (one reserverd bit)    << 3  // always 0 for now
    +
  (3bit compressability) << 4  // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen`
                               // (using integer math, `origLen > compLen > 0` holds for any non-0 store-type)
    +
  (IPLD block data size) << 7  // 0 if store-type is 0

Include a rudimentary, dictionary-less zstd compressor as the first
non-verbatim storage type 1
  • Loading branch information
ribasushi committed Apr 28, 2024
1 parent f681ea8 commit 4e41f1f
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 20 deletions.
242 changes: 226 additions & 16 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/DataDog/zstd"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
badgerstruct "github.com/dgraph-io/badger/v2/pb"
Expand All @@ -23,7 +24,9 @@ import (
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
zstdv "github.com/valyala/gozstd"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
Expand Down Expand Up @@ -60,6 +63,18 @@ const (
binkeyLen = binkeyBits / 8
)

var (
ZstdCompressionWorkers = 0 // defaults to between( 2, 32, runtime.NumCPU/2 )
ZstdCompressionLevel = 18 // https://github.com/facebook/zstd/pull/3895#issuecomment-2067746853
)

func zstdCompressionWorkersActual() int {
if ZstdCompressionWorkers != 0 {
return ZstdCompressionWorkers
}
return between(2, 32, runtime.NumCPU()/2)
}

var (
// ErrBlockstoreClosed is returned from blockstore operations after
// the blockstore has been closed.
Expand Down Expand Up @@ -495,10 +510,19 @@ func symlink(path, linkTo string) error {
}

func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, smh supportedMultihash, err error) {
decV, wasPooled, decErr := decodeValue(kv.Value, true)
if decErr != nil {
err = decErr
return
}
if wasPooled {
defer pool.Put(decV)
}

// this is so so SO nasty... 🤮
for _, pref := range tryOrder {
trySmh := supportedMultihashes[pref]
c, err = trySmh.cidMaker.Sum(kv.Value)
c, err = trySmh.cidMaker.Sum(decV)
if err != nil {
return
}
Expand All @@ -509,6 +533,7 @@ func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, smh supportedMultihash
}
c = cid.Undef
}

if !c.Defined() {
err = xerrors.Errorf("none of the available mutihashers produced a hash starting with 0x%X", kv.Key)
}
Expand Down Expand Up @@ -537,8 +562,12 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(zstdCompressionWorkersActual())

for _, kv := range kvs {

// pass through already encoded value
if len(kv.Key) == binkeyLen {
// nasty way to recreate the hash from the payload alone 🤮
// worth it however for maintaining journal consistency
Expand All @@ -561,7 +590,9 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.
continue
}

// this is a legacy key: remake it regardless of b.opts.QueryLegacyKeys
// we have a legacy non-encoded key: remake it and then asynchoronously its value
// do this regardless of b.opts.QueryLegacyKeys
//
// can not use a pooled decode buffer, as it gets held by the badger batch
mh := make([]byte, varint.MaxLenUvarint63+supportedHashLen)
n, err := base32.RawStdEncoding.Decode(mh, kv.Key[b.prefixLen:])
Expand All @@ -574,24 +605,33 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.
return xerrors.Errorf("unsupported multihash for key 0x%X: %w", kv.Key[b.prefixLen:], err)
}

if err := batch.Set(
mh[n-supportedHashLen:n-supportedHashLen+binkeyLen:n-supportedHashLen+binkeyLen], // we checked the multihash digest is hashLen (256bits) long
kv.Value,
); err != nil {
return err
}

// add a journal record
// NOTE: this could very well result in duplicates
// there isn't much we can do about this right now...
jrnl = append(jrnl, smh.journalShortCode)
jrnl = append(jrnl, mh[len(mh)-supportedHashLen:]...)

v := kv.Value
eg.Go(func() error {
var err error
v, err = encodeValue(v, mh)
if err != nil {
return err
}
return batch.Set(
mh[len(mh)-supportedHashLen:len(mh)-supportedHashLen+binkeyLen:len(mh)-supportedHashLen+binkeyLen], // we checked the multihash digest is hashLen (256bits) long
v,
)
})

}

if err := eg.Wait(); err != nil {
return err
}
if _, err := jrnlFh.Write(jrnl); err != nil {
return xerrors.Errorf("failed to write multihashes to journal: %w", err)
}

return nil
})
}
Expand Down Expand Up @@ -816,6 +856,79 @@ func isMultihashSupported(mh []byte) (supportedMultihash, error) {
return smh, nil
}

// https://tools.ietf.org/html/rfc8478#section-3.1.1
const (
zstdMagic = "\x28\xB5\x2F\xFD"
zstdMagicLen = len(zstdMagic)
)

/*
Each value keyed by a new-style shortened multihash receives a single varint prefix, with the uint64 value:
(3bit store-type) // 0 for as-is, 1 for basic zstd compression, remainder reserved
+
(one reserverd bit) << 3 // always 0 for now
+
(3bit compressability) << 4 // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen`
// (using integer math, `origLen > compLen > 0` holds for any non-0 store-type)
+
(IPLD block data size) << 7 // 0 if store-type is 0
*/

func encodeValue(input, k []byte) ([]byte, error) {
if len(input) == 0 {
return []byte{0}, nil
}

maxClen := zstd.CompressBound(len(input))

// figure out pooling in a future commit
buf := make(
[]byte,
varint.MaxLenUvarint63+maxClen,
)

// a scope to contain the Compress output on purpose: ensures we always operate within buf
var compLen int
{
// temporary use zstdv until we get datadog 1.5.6
cOut := zstdv.CompressLevel(
buf[varint.MaxLenUvarint63-zstdMagicLen:varint.MaxLenUvarint63-zstdMagicLen], // arrange so that the zstdMagic ends where max-varint would end
input,
ZstdCompressionLevel,
)
if hdr := buf[varint.MaxLenUvarint63-zstdMagicLen : varint.MaxLenUvarint63]; !bytes.Equal(hdr, []byte(zstdMagic)) {
return nil, xerrors.Errorf("zstandard compressor produced stream prefixed with 0x%X instead of the expected 0x%X for key 0x%X", hdr, zstdMagic, k)
}
if len(cOut) > maxClen {
return nil, xerrors.Errorf("zstandard compressor produced stream of %d bytes larger than the %d indicated CompressBound() for key 0x%X", len(cOut), maxClen, k)
}
compLen = len(cOut) - zstdMagicLen
}

// check if compression is worthwhile at all
if compLen >= len(input) {
buf[0] = 0 // stored as-is
copy(buf[1:], input)
return buf[0 : len(input)+1 : len(input)+1], nil
}

pref := uint64(
1 + // store-type 1
(((len(input) - compLen) * 8 / len(input)) << 4) +
(len(input) << 7),
)

n := varint.PutUvarint(
// place the varint so it ends where the data (post zstdMagic) begins
buf[varint.MaxLenUvarint63-varint.UvarintSize(pref):],
pref,
)

return buf[varint.MaxLenUvarint63-n : varint.MaxLenUvarint63+compLen : varint.MaxLenUvarint63+compLen], nil

}

// badgerGet is a basic tri-state: value+nil nil+nil nil+err
func (b *Blockstore) badgerGet(t *badger.Txn, mk badgerMultiKey) (*valueItem, error) {
switch item, err := t.Get(mk.binKey()); err {
Expand Down Expand Up @@ -845,20 +958,62 @@ type valueItem struct {
}

func (vi *valueItem) size() (int, error) {
return int(vi.badgerItem.ValueSize()), nil
if len(vi.currentKey) != binkeyLen {
// legacy key
return int(vi.badgerItem.ValueSize()), nil
}

sz := int(-1)
err := vi.badgerItem.Value(func(val []byte) error {
if len(val) == 0 {
return xerrors.New("unexpected zero-length record")
}

if val[0] == 0 { // storage type 0
sz = len(val) - 1
return nil
}

pref, _, err := varint.FromUvarint(val)
if err != nil {
return err
}
sz = int(pref >> 7)
return nil
})

return sz, err
}
func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) {
payload, err := vi.badgerItem.ValueCopy(nil)
if err != nil {
return nil, err
}
if len(vi.currentKey) == binkeyLen {
var err error
payload, _, err = decodeValue(payload, false)
if err != nil {
return nil, err
}
}
if err := checkHash(c, payload, vi.currentKey); err != nil {
return nil, err
}
return blocks.NewBlockWithCid(payload, c)
}
func (vi *valueItem) view(c cid.Cid, f func(val []byte) error) error {
return vi.badgerItem.Value(func(payload []byte) error {
if len(vi.currentKey) == binkeyLen {
var isPooled bool
var err error
payload, isPooled, err = decodeValue(payload, true) // maybe pooled combined with a defer below
if err != nil {
return err
}
if isPooled {
defer pool.Put(payload)
}
}
if err := checkHash(c, payload, vi.currentKey); err != nil {
return err
}
Expand All @@ -875,6 +1030,49 @@ func checkHash(c cid.Cid, b, k []byte) error {
return xerrors.Errorf("multihash mismatch for cid %s (badger key 0x%X): value hashes to 0x%X, but expected multihash 0x%X", c, k, []byte(rehash.Hash()), []byte(c.Hash()))
}
return nil

}
func decodeValue(in []byte, pooledReturn bool) ([]byte, bool, error) {
if len(in) == 0 {
return nil, false, xerrors.New("corrupted zero-length record")
}
if in[0] == 0 {
return in[1:], false, nil
}

pref, n, err := varint.FromUvarint(in)
if err != nil {
return nil, false, err
}

storType := pref & 0b111
if storType != 1 {
return nil, false, xerrors.Errorf("unexpected StorageType %d", storType)
}

var bufOut []byte
if pooledReturn {
bufOut = pool.Get(int(pref >> 7))
} else {
bufOut = make([]byte, int(pref>>7))
}
bufDec := pool.Get(zstdMagicLen + len(in) - n)
defer pool.Put(bufDec)

// eventually switch to a pooled zstd.NewCtx()
// https://github.com/DataDog/zstd/pull/130#issuecomment-2067955171
_, err = zstd.DecompressInto(
bufOut,
append(append(bufDec[:0], zstdMagic...), in[n:]...),
)
if err != nil {
if pooledReturn {
pool.Put(bufOut)
}
return nil, false, xerrors.Errorf("decompression failed: %w", err)
}

return bufOut, pooledReturn, nil
}

// View implements blockstore.Viewer, which leverages zero-copy read-only
Expand Down Expand Up @@ -1073,21 +1271,28 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

var eg errgroup.Group
eg.SetLimit(zstdCompressionWorkersActual())

if err := b.db.View(func(txn *badger.Txn) error {
for i := range kvs {
val, err := b.badgerGet(txn, kvs[i].mk)
if err != nil {
// Something is actually wrong
return err
} else if val == nil {
// Got to insert that, check it is supported, write journal
// Got to insert that, check it is supported, write journal, encode value
mh := blocks[i].Cid().Hash()
smh, err := isMultihashSupported(mh)
if err != nil {
return xerrors.Errorf("unsupported multihash for cid %s: %w", blocks[i].Cid(), err)
}

kvs[i].val = blocks[i].RawData()
i := i
eg.Go(func() error {
kvs[i].val, err = encodeValue(blocks[i].RawData(), kvs[i].mk)
return err
})

// add a journal record
jrnl = append(jrnl, smh.journalShortCode)
Expand All @@ -1099,6 +1304,10 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
return err
}

if err := eg.Wait(); err != nil {
return err
}

put := func(db *badger.DB, mhj flushWriter) error {
batch := db.NewWriteBatch()
defer batch.Cancel()
Expand Down Expand Up @@ -1254,15 +1463,15 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
b.lockDB()
defer b.unlockDB()

var err error
var c cid.Cid

return iterateBadger(context.Background(), b.db, func(kvs []*badgerstruct.KV) error {
if !b.isOpen() {
return ErrBlockstoreClosed
}

for _, kv := range kvs {

var c cid.Cid

if len(kv.Key) != binkeyLen {
if !b.opts.QueryLegacyKeys {
continue
Expand All @@ -1274,6 +1483,7 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
}
c = cid.NewCidV1(uint64(multicodec.Raw), mhBuf[:n])
} else {
var err error
c, _, err = findCidForPartialKV(kv)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 4e41f1f

Please sign in to comment.