Skip to content

Commit

Permalink
Basic on-the-fly zstd compression
Browse files Browse the repository at this point in the history
Prefix every bin-key value with a single uint64 varint containing:
  (3bit store-type)            // 0 for as-is, 1 for basic zstd compression, remainder reserved
    +
  (one reserverd bit)    << 3  // always 0 for now
    +
  (3bit compressability) << 4  // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen`
                               // (using integer math, `origLen > compLen > 0` holds for any non-0 store-type)
    +
  (IPLD block data size) << 7  // 0 if store-type is 0

Include a rudimentary, dictionary-less zstd compressor as the first
non-verbatim storage type 1
  • Loading branch information
ribasushi committed Apr 19, 2024
1 parent 664a128 commit 9f982cc
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 18 deletions.
231 changes: 217 additions & 14 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/DataDog/zstd"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
badgerstruct "github.com/dgraph-io/badger/v2/pb"
Expand All @@ -22,7 +23,9 @@ import (
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-varint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
Expand All @@ -35,6 +38,18 @@ const (
binkeyBits = 128
)

var (
ZstdCompressionWorkers = 0 // defaults to between( 2, 32, runtime.NumCPU/2 )
ZstdCompressionLevel = 15
)

func zstdCompressionWorkersActual() int {
if ZstdCompressionWorkers != 0 {
return ZstdCompressionWorkers
}
return between(2, 32, runtime.NumCPU()/2)
}

var (
// ErrBlockstoreClosed is returned from blockstore operations after
// the blockstore has been closed.
Expand Down Expand Up @@ -483,25 +498,42 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr er
if err := ctx.Err(); err != nil {
return err
}

eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(zstdCompressionWorkersActual())

for _, kv := range kvs {
k := kv.Key
// pass through already encoded
if len(kv.Key) == binkeyLen {
if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
continue
}

if len(k) != binkeyLen {
// this is a legacy key: remake it regardless of b.opts.QueryLegacyKeys
// we have a legacy non-encoded key: remake it and its value, asynchronously
// regardless of b.opts.QueryLegacyKeys
k := kv.Key
v := kv.Value
eg.Go(func() error {
// do not use a pooled buffer, as it messes with the badger batch
keyDec := make([]byte, maxMhPrefixLen+hashLen)
n, err := base32.RawStdEncoding.Decode(keyDec, k[b.prefixLen:])
if err != nil {
return err
}
k = keyDec[n-hashLen : n-hashLen+binkeyLen : n-hashLen+binkeyLen] // assume the multihash digest is hashLen (256bits) long
}

if err := batch.Set(k, kv.Value); err != nil {
return err
}
v, err = encodeValue(v, k)
if err != nil {
return err
}
return batch.Set(
keyDec[n-hashLen:n-hashLen+binkeyLen:n-hashLen+binkeyLen], // assume the multihash digest is hashLen (256bits) long
v,
)
})
}
return nil

return eg.Wait()
})
}

Expand Down Expand Up @@ -709,6 +741,75 @@ func (b *Blockstore) Size() (int64, error) {
return size, nil
}

// https://tools.ietf.org/html/rfc8478#section-3.1.1
const (
zstdMagic = "\x28\xB5\x2F\xFD"
zstdMagicLen = len(zstdMagic)
)

/*
Each value keyed by a new-style shortened multihash receives a single varint prefix, with the uint64 value:
(3bit store-type) // 0 for as-is, 1 for basic zstd compression, remainder reserved
+
(one reserverd bit) << 3 // always 0 for now
+
(3bit compressability) << 4 // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen`
// (using integer math, `origLen > compLen > 0` holds for any non-0 store-type)
+
(IPLD block data size) << 7 // 0 if store-type is 0
*/

func encodeValue(input, k []byte) ([]byte, error) {
if len(input) == 0 {
return []byte{0}, nil
}

// figure out pooling in a future commit
buf := make(
[]byte,
varint.MaxLenUvarint63+zstd.CompressBound(len(input)),
)

// a scope to contain the Compress output on purpose: ensures we always operate within buf
var compLen int
{
// Despite the cgo boundary cross, this is all 2x faster than using "github.com/klauspost/compress/zstd"
cOut, err := zstd.CompressLevel(
buf[varint.MaxLenUvarint63-zstdMagicLen:], // arrange so that the zstdMagic ends where max-varint would end
input,
ZstdCompressionLevel,
)
if err != nil {
return nil, err
}
if hdr := buf[varint.MaxLenUvarint63-zstdMagicLen : varint.MaxLenUvarint63]; !bytes.Equal(hdr, []byte(zstdMagic)) {
return nil, xerrors.Errorf("zstandard compressor produced stream prefixed with 0x%X instead of the expected 0x%X for key 0x%X", hdr, zstdMagic, k)
}
if len(cOut) > zstd.CompressBound(len(input)) {
return nil, xerrors.Errorf("zstandard compressor produced stream of %d bytes larger than the %d indicated CompressBound() for key 0x%X", len(cOut), zstd.CompressBound(len(input)), k)
}
compLen = len(cOut) - zstdMagicLen
}

// check if compression is worthwhile at all
if compLen >= len(input) {
buf[0] = 0 // stored as-is
copy(buf[1:], input)
return buf[0 : len(input)+1 : len(input)+1], nil
}

n := varint.PutUvarint(buf, uint64(
1+ // store-type 1
(((len(input)-compLen)*8/len(input))<<4)+
(len(input)<<7),
))
copy(buf[varint.MaxLenUvarint63-n:], buf[:n]) // move the varint so it ends where zstdMagic ends

return buf[varint.MaxLenUvarint63-n : varint.MaxLenUvarint63+compLen : varint.MaxLenUvarint63+compLen], nil

}

// badgerGet is a basic tri-state: value+nil nil+nil nil+err
func (b *Blockstore) badgerGet(t *badger.Txn, mk badgerMultiKey) (*valueItem, error) {
switch item, err := t.Get(mk.binKey()); err {
Expand Down Expand Up @@ -738,20 +839,62 @@ type valueItem struct {
}

func (vi *valueItem) size() (int, error) {
return int(vi.badgerItem.ValueSize()), nil
if len(vi.currentKey) != binkeyLen {
// legacy key
return int(vi.badgerItem.ValueSize()), nil
}

sz := int(-1)
err := vi.badgerItem.Value(func(val []byte) error {
if len(val) == 0 {
return xerrors.New("unexpected zero-length record")
}

if val[0] == 0 { // storage type 0
sz = len(val) - 1
return nil
}

pref, _, err := varint.FromUvarint(val)
if err != nil {
return err
}
sz = int(pref >> 7)
return nil
})

return sz, err
}
func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) {
payload, err := vi.badgerItem.ValueCopy(nil)
if err != nil {
return nil, err
}
if len(vi.currentKey) == binkeyLen {
var err error
payload, _, err = decodeValue(payload, false)
if err != nil {
return nil, err
}
}
if err := checkHash(c, payload, vi.currentKey); err != nil {
return nil, err
}
return blocks.NewBlockWithCid(payload, c)
}
func (vi *valueItem) view(c cid.Cid, f func(val []byte) error) error {
return vi.badgerItem.Value(func(payload []byte) error {
if len(vi.currentKey) == binkeyLen {
var isPooled bool
var err error
payload, isPooled, err = decodeValue(payload, true) // maybe pooled combined with a defer below
if err != nil {
return err
}
if isPooled {
defer pool.Put(payload)
}
}
if err := checkHash(c, payload, vi.currentKey); err != nil {
return err
}
Expand All @@ -768,6 +911,47 @@ func checkHash(c cid.Cid, b, k []byte) error {
return xerrors.Errorf("multihash mismatch for cid %s (badger key 0x%X): value hashes to 0x%X, but expected multihash 0x%X", c, k, []byte(rehash.Hash()), []byte(c.Hash()))
}
return nil

}
func decodeValue(in []byte, pooledReturn bool) ([]byte, bool, error) {
if len(in) == 0 {
return nil, false, xerrors.New("corrupted zero-length record")
}
if in[0] == 0 {
return in[1:], false, nil
}

pref, n, err := varint.FromUvarint(in)
if err != nil {
return nil, false, err
}

storType := pref & 0b111
if storType != 1 {
return nil, false, xerrors.Errorf("unexpected StorageType %d", storType)
}

var bufOut []byte
if pooledReturn {
bufOut = pool.Get(int(pref >> 7))
} else {
bufOut = make([]byte, int(pref>>7))
}
bufDec := pool.Get(zstdMagicLen + len(in) - n)
defer pool.Put(bufDec)

_, err = zstd.Decompress(
bufOut,
append(append(bufDec[:0], zstdMagic...), in[n:]...),
)
if err != nil {
if pooledReturn {
pool.Put(bufOut)
}
return nil, false, xerrors.Errorf("decompression failed: %w", err)
}

return bufOut, pooledReturn, nil
}

// View implements blockstore.Viewer, which leverages zero-copy read-only
Expand Down Expand Up @@ -964,6 +1148,9 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

var eg errgroup.Group
eg.SetLimit(zstdCompressionWorkersActual())

if err := b.db.View(func(txn *badger.Txn) error {
for i := range kvs {
val, err := b.badgerGet(txn, kvs[i].mk)
Expand All @@ -977,14 +1164,23 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
jrnl = append(jrnl, make([]byte, pad)...)
}
jrnl = append(jrnl, mh...)
kvs[i].val = blocks[i].RawData()

i := i
eg.Go(func() error {
kvs[i].val, err = encodeValue(blocks[i].RawData(), kvs[i].mk)
return err
})
}
}
return nil
}); err != nil {
return err
}

if err := eg.Wait(); err != nil {
return err
}

put := func(db *badger.DB, mhj flushWriter) error {
batch := db.NewWriteBatch()
defer batch.Cancel()
Expand Down Expand Up @@ -1146,7 +1342,6 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
b.lockDB()
defer b.unlockDB()

var err error
var c cid.Cid

return iterateBadger(context.Background(), b.db, func(kvs []*badgerstruct.KV) error {
Expand All @@ -1167,9 +1362,17 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error {
c = cid.NewCidV1(uint64(multicodec.Raw), mhBuf[:n])
} else {

dec, isPooled, err := decodeValue(kv.Value, true)
if err != nil {
return err
}
if isPooled {
defer pool.Put(dec)
}

// this is so so SO nasty... 🤮
for _, maker := range cidMakers {
c, err = maker.Sum(kv.Value)
c, err = maker.Sum(dec)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion blockstore/badger/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func testMove(t *testing.T, optsF func(string) Options) {

// add some blocks
for i := 0; i < 10; i++ {
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) // compressible
err := db.Put(ctx, blk)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ retract v1.20.2 // Wrongfully cherry picked PR, use v1.20.2+ instead.
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/BurntSushi/toml v1.3.0
github.com/DataDog/zstd v1.4.5
github.com/DataDog/zstd v1.5.5
github.com/GeertJohan/go.rice v1.0.3
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ github.com/BurntSushi/toml v1.3.0 h1:Ws8e5YmnrGEHzZEzg0YvK/7COGYtTC5PbaH9oSSbgfA
github.com/BurntSushi/toml v1.3.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/GeertJohan/go.incremental v1.0.0 h1:7AH+pY1XUgQE4Y1HcXYaMqAI0m9yrFqo/jt0CW30vsg=
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZSmI=
Expand Down

0 comments on commit 9f982cc

Please sign in to comment.