From 4e41f1fef57b94e442d521069779882934fc1810 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 6 Apr 2024 00:54:49 +0200 Subject: [PATCH] Basic on-the-fly zstd compression Prefix every bin-key value with a single uint64 varint containing: (3bit store-type) // 0 for as-is, 1 for basic zstd compression, remainder reserved + (one reserverd bit) << 3 // always 0 for now + (3bit compressability) << 4 // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen` // (using integer math, `origLen > compLen > 0` holds for any non-0 store-type) + (IPLD block data size) << 7 // 0 if store-type is 0 Include a rudimentary, dictionary-less zstd compressor as the first non-verbatim storage type 1 --- blockstore/badger/blockstore.go | 242 +++++++++++++++++++++++++-- blockstore/badger/blockstore_test.go | 2 +- go.mod | 7 +- go.sum | 6 +- 4 files changed, 237 insertions(+), 20 deletions(-) diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index b51b2eb4e..98f37a294 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/DataDog/zstd" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" badgerstruct "github.com/dgraph-io/badger/v2/pb" @@ -23,7 +24,9 @@ import ( "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" "github.com/multiformats/go-varint" + zstdv "github.com/valyala/gozstd" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" @@ -60,6 +63,18 @@ const ( binkeyLen = binkeyBits / 8 ) +var ( + ZstdCompressionWorkers = 0 // defaults to between( 2, 32, runtime.NumCPU/2 ) + ZstdCompressionLevel = 18 // https://github.com/facebook/zstd/pull/3895#issuecomment-2067746853 +) + +func zstdCompressionWorkersActual() int { + if ZstdCompressionWorkers != 0 { + return ZstdCompressionWorkers + } + return between(2, 32, runtime.NumCPU()/2) +} + var ( // ErrBlockstoreClosed is returned from blockstore operations after // the blockstore has been closed. @@ -495,10 +510,19 @@ func symlink(path, linkTo string) error { } func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, smh supportedMultihash, err error) { + decV, wasPooled, decErr := decodeValue(kv.Value, true) + if decErr != nil { + err = decErr + return + } + if wasPooled { + defer pool.Put(decV) + } + // this is so so SO nasty... 🤮 for _, pref := range tryOrder { trySmh := supportedMultihashes[pref] - c, err = trySmh.cidMaker.Sum(kv.Value) + c, err = trySmh.cidMaker.Sum(decV) if err != nil { return } @@ -509,6 +533,7 @@ func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, smh supportedMultihash } c = cid.Undef } + if !c.Defined() { err = xerrors.Errorf("none of the available mutihashers produced a hash starting with 0x%X", kv.Key) } @@ -537,8 +562,12 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. defer pool.Put(jrnlSlab) jrnl := jrnlSlab[:0] + eg, _ := errgroup.WithContext(ctx) + eg.SetLimit(zstdCompressionWorkersActual()) + for _, kv := range kvs { + // pass through already encoded value if len(kv.Key) == binkeyLen { // nasty way to recreate the hash from the payload alone 🤮 // worth it however for maintaining journal consistency @@ -561,7 +590,9 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. continue } - // this is a legacy key: remake it regardless of b.opts.QueryLegacyKeys + // we have a legacy non-encoded key: remake it and then asynchoronously its value + // do this regardless of b.opts.QueryLegacyKeys + // // can not use a pooled decode buffer, as it gets held by the badger batch mh := make([]byte, varint.MaxLenUvarint63+supportedHashLen) n, err := base32.RawStdEncoding.Decode(mh, kv.Key[b.prefixLen:]) @@ -574,24 +605,33 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. return xerrors.Errorf("unsupported multihash for key 0x%X: %w", kv.Key[b.prefixLen:], err) } - if err := batch.Set( - mh[n-supportedHashLen:n-supportedHashLen+binkeyLen:n-supportedHashLen+binkeyLen], // we checked the multihash digest is hashLen (256bits) long - kv.Value, - ); err != nil { - return err - } - // add a journal record // NOTE: this could very well result in duplicates // there isn't much we can do about this right now... jrnl = append(jrnl, smh.journalShortCode) jrnl = append(jrnl, mh[len(mh)-supportedHashLen:]...) + + v := kv.Value + eg.Go(func() error { + var err error + v, err = encodeValue(v, mh) + if err != nil { + return err + } + return batch.Set( + mh[len(mh)-supportedHashLen:len(mh)-supportedHashLen+binkeyLen:len(mh)-supportedHashLen+binkeyLen], // we checked the multihash digest is hashLen (256bits) long + v, + ) + }) + } + if err := eg.Wait(); err != nil { + return err + } if _, err := jrnlFh.Write(jrnl); err != nil { return xerrors.Errorf("failed to write multihashes to journal: %w", err) } - return nil }) } @@ -816,6 +856,79 @@ func isMultihashSupported(mh []byte) (supportedMultihash, error) { return smh, nil } +// https://tools.ietf.org/html/rfc8478#section-3.1.1 +const ( + zstdMagic = "\x28\xB5\x2F\xFD" + zstdMagicLen = len(zstdMagic) +) + +/* +Each value keyed by a new-style shortened multihash receives a single varint prefix, with the uint64 value: + + (3bit store-type) // 0 for as-is, 1 for basic zstd compression, remainder reserved + + + (one reserverd bit) << 3 // always 0 for now + + + (3bit compressability) << 4 // 0 if store-type is 0, otherwise `c := (origLen-compLen) * 8 / origLen` + // (using integer math, `origLen > compLen > 0` holds for any non-0 store-type) + + + (IPLD block data size) << 7 // 0 if store-type is 0 +*/ + +func encodeValue(input, k []byte) ([]byte, error) { + if len(input) == 0 { + return []byte{0}, nil + } + + maxClen := zstd.CompressBound(len(input)) + + // figure out pooling in a future commit + buf := make( + []byte, + varint.MaxLenUvarint63+maxClen, + ) + + // a scope to contain the Compress output on purpose: ensures we always operate within buf + var compLen int + { + // temporary use zstdv until we get datadog 1.5.6 + cOut := zstdv.CompressLevel( + buf[varint.MaxLenUvarint63-zstdMagicLen:varint.MaxLenUvarint63-zstdMagicLen], // arrange so that the zstdMagic ends where max-varint would end + input, + ZstdCompressionLevel, + ) + if hdr := buf[varint.MaxLenUvarint63-zstdMagicLen : varint.MaxLenUvarint63]; !bytes.Equal(hdr, []byte(zstdMagic)) { + return nil, xerrors.Errorf("zstandard compressor produced stream prefixed with 0x%X instead of the expected 0x%X for key 0x%X", hdr, zstdMagic, k) + } + if len(cOut) > maxClen { + return nil, xerrors.Errorf("zstandard compressor produced stream of %d bytes larger than the %d indicated CompressBound() for key 0x%X", len(cOut), maxClen, k) + } + compLen = len(cOut) - zstdMagicLen + } + + // check if compression is worthwhile at all + if compLen >= len(input) { + buf[0] = 0 // stored as-is + copy(buf[1:], input) + return buf[0 : len(input)+1 : len(input)+1], nil + } + + pref := uint64( + 1 + // store-type 1 + (((len(input) - compLen) * 8 / len(input)) << 4) + + (len(input) << 7), + ) + + n := varint.PutUvarint( + // place the varint so it ends where the data (post zstdMagic) begins + buf[varint.MaxLenUvarint63-varint.UvarintSize(pref):], + pref, + ) + + return buf[varint.MaxLenUvarint63-n : varint.MaxLenUvarint63+compLen : varint.MaxLenUvarint63+compLen], nil + +} + // badgerGet is a basic tri-state: value+nil nil+nil nil+err func (b *Blockstore) badgerGet(t *badger.Txn, mk badgerMultiKey) (*valueItem, error) { switch item, err := t.Get(mk.binKey()); err { @@ -845,13 +958,44 @@ type valueItem struct { } func (vi *valueItem) size() (int, error) { - return int(vi.badgerItem.ValueSize()), nil + if len(vi.currentKey) != binkeyLen { + // legacy key + return int(vi.badgerItem.ValueSize()), nil + } + + sz := int(-1) + err := vi.badgerItem.Value(func(val []byte) error { + if len(val) == 0 { + return xerrors.New("unexpected zero-length record") + } + + if val[0] == 0 { // storage type 0 + sz = len(val) - 1 + return nil + } + + pref, _, err := varint.FromUvarint(val) + if err != nil { + return err + } + sz = int(pref >> 7) + return nil + }) + + return sz, err } func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) { payload, err := vi.badgerItem.ValueCopy(nil) if err != nil { return nil, err } + if len(vi.currentKey) == binkeyLen { + var err error + payload, _, err = decodeValue(payload, false) + if err != nil { + return nil, err + } + } if err := checkHash(c, payload, vi.currentKey); err != nil { return nil, err } @@ -859,6 +1003,17 @@ func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) { } func (vi *valueItem) view(c cid.Cid, f func(val []byte) error) error { return vi.badgerItem.Value(func(payload []byte) error { + if len(vi.currentKey) == binkeyLen { + var isPooled bool + var err error + payload, isPooled, err = decodeValue(payload, true) // maybe pooled combined with a defer below + if err != nil { + return err + } + if isPooled { + defer pool.Put(payload) + } + } if err := checkHash(c, payload, vi.currentKey); err != nil { return err } @@ -875,6 +1030,49 @@ func checkHash(c cid.Cid, b, k []byte) error { return xerrors.Errorf("multihash mismatch for cid %s (badger key 0x%X): value hashes to 0x%X, but expected multihash 0x%X", c, k, []byte(rehash.Hash()), []byte(c.Hash())) } return nil + +} +func decodeValue(in []byte, pooledReturn bool) ([]byte, bool, error) { + if len(in) == 0 { + return nil, false, xerrors.New("corrupted zero-length record") + } + if in[0] == 0 { + return in[1:], false, nil + } + + pref, n, err := varint.FromUvarint(in) + if err != nil { + return nil, false, err + } + + storType := pref & 0b111 + if storType != 1 { + return nil, false, xerrors.Errorf("unexpected StorageType %d", storType) + } + + var bufOut []byte + if pooledReturn { + bufOut = pool.Get(int(pref >> 7)) + } else { + bufOut = make([]byte, int(pref>>7)) + } + bufDec := pool.Get(zstdMagicLen + len(in) - n) + defer pool.Put(bufDec) + + // eventually switch to a pooled zstd.NewCtx() + // https://github.com/DataDog/zstd/pull/130#issuecomment-2067955171 + _, err = zstd.DecompressInto( + bufOut, + append(append(bufDec[:0], zstdMagic...), in[n:]...), + ) + if err != nil { + if pooledReturn { + pool.Put(bufOut) + } + return nil, false, xerrors.Errorf("decompression failed: %w", err) + } + + return bufOut, pooledReturn, nil } // View implements blockstore.Viewer, which leverages zero-copy read-only @@ -1073,6 +1271,9 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { defer pool.Put(jrnlSlab) jrnl := jrnlSlab[:0] + var eg errgroup.Group + eg.SetLimit(zstdCompressionWorkersActual()) + if err := b.db.View(func(txn *badger.Txn) error { for i := range kvs { val, err := b.badgerGet(txn, kvs[i].mk) @@ -1080,14 +1281,18 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { // Something is actually wrong return err } else if val == nil { - // Got to insert that, check it is supported, write journal + // Got to insert that, check it is supported, write journal, encode value mh := blocks[i].Cid().Hash() smh, err := isMultihashSupported(mh) if err != nil { return xerrors.Errorf("unsupported multihash for cid %s: %w", blocks[i].Cid(), err) } - kvs[i].val = blocks[i].RawData() + i := i + eg.Go(func() error { + kvs[i].val, err = encodeValue(blocks[i].RawData(), kvs[i].mk) + return err + }) // add a journal record jrnl = append(jrnl, smh.journalShortCode) @@ -1099,6 +1304,10 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { return err } + if err := eg.Wait(); err != nil { + return err + } + put := func(db *badger.DB, mhj flushWriter) error { batch := db.NewWriteBatch() defer batch.Cancel() @@ -1254,15 +1463,15 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { b.lockDB() defer b.unlockDB() - var err error - var c cid.Cid - return iterateBadger(context.Background(), b.db, func(kvs []*badgerstruct.KV) error { if !b.isOpen() { return ErrBlockstoreClosed } for _, kv := range kvs { + + var c cid.Cid + if len(kv.Key) != binkeyLen { if !b.opts.QueryLegacyKeys { continue @@ -1274,6 +1483,7 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { } c = cid.NewCidV1(uint64(multicodec.Raw), mhBuf[:n]) } else { + var err error c, _, err = findCidForPartialKV(kv) if err != nil { return err diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index d96591f83..3363603f4 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -77,7 +77,7 @@ func testMove(t *testing.T, optsF func(string) Options) { // add some blocks for i := 0; i < 10; i++ { - blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + blk := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) // compressible err := db.Put(ctx, blk) if err != nil { t.Fatal(err) diff --git a/go.mod b/go.mod index b9e2925b2..6a84df6fb 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,14 @@ retract v1.14.0 // Accidentally force-pushed tag, use v1.14.1+ instead. retract v1.20.2 // Wrongfully cherry picked PR, use v1.20.2+ instead. +// Temporary, to get zstd 1.5.6 and https://github.com/facebook/zstd/pull/3895#issuecomment-2067746853 +// Switch back fully to datadog when they upgrade ( valyala has panic()s in their code 😰 ) +replace github.com/valyala/gozstd => github.com/denisgolius/gozstd v1.20.2-0.20240327164824-1647c29138c6 + require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 github.com/BurntSushi/toml v1.3.0 - github.com/DataDog/zstd v1.4.5 + github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e github.com/GeertJohan/go.rice v1.0.3 github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa @@ -139,6 +143,7 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/triplewz/poseidon v0.0.0-20220525065023-a7cdb0e183e7 github.com/urfave/cli/v2 v2.25.5 + github.com/valyala/gozstd v0.0.0-00010101000000-000000000000 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.1.1 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 diff --git a/go.sum b/go.sum index b5d02a21b..5a7c67fe8 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ github.com/BurntSushi/toml v1.3.0 h1:Ws8e5YmnrGEHzZEzg0YvK/7COGYtTC5PbaH9oSSbgfA github.com/BurntSushi/toml v1.3.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= -github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/GeertJohan/go.incremental v1.0.0 h1:7AH+pY1XUgQE4Y1HcXYaMqAI0m9yrFqo/jt0CW30vsg= github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZSmI= @@ -215,6 +215,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPc github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= +github.com/denisgolius/gozstd v1.20.2-0.20240327164824-1647c29138c6 h1:QYYnfle/XmaADh8ehv2vYYZR4Qy+c3L6OLRhCIxwTVo= +github.com/denisgolius/gozstd v1.20.2-0.20240327164824-1647c29138c6/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e h1:lj77EKYUpYXTd8CD/+QMIf8b6OIOTsfEBSXiAzuEHTU= github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e/go.mod h1:3ZQK6DMPSz/QZ73jlWxBtUhNA8xZx7LzUFSq/OfP8vk= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=