Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LID benchmarking tool #1276

Merged
merged 30 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8e72cce
feat: LID benchmarking tool
dirkmc Mar 14, 2023
0713d3f
fix: bench thread safety
dirkmc Mar 21, 2023
c3e69a0
refactor: structured logging
dirkmc Mar 21, 2023
806b8ec
refactor: postgres bulk insert
dirkmc Mar 21, 2023
a66c57e
lid bench: Add foundationdb impl
magik6k Mar 21, 2023
482f842
lid fdb: Fix Tx sizing, parallel chunk puts
magik6k Mar 21, 2023
f2eeb12
lid fdb: More efficient sample generation
magik6k Mar 21, 2023
91fee5b
feat: array of piece count / blocks per piece (#1314)
dirkmc Mar 21, 2023
e10de81
lid bench: print add rate
magik6k Mar 21, 2023
1d0dfe7
lid bench: Add retry to postgres put (#1316)
magik6k Mar 22, 2023
366b58a
lid bench: Make cassandra put much more robust (#1318)
magik6k Mar 22, 2023
f8ca07d
instrumentation for bench tool (#1337)
nonsense Apr 5, 2023
313421f
feat: batch insert queries for postgres
dirkmc Apr 5, 2023
49e5105
feat: add flag to insert into postgres using tmp table
dirkmc Apr 5, 2023
e307ecf
refactor: merge changes from nonsense/lid-bench
dirkmc Apr 6, 2023
064f03b
refactor: just use one database (dont create bench database)
dirkmc Apr 6, 2023
f7fb109
refactor: remove unused params
dirkmc Apr 6, 2023
6102aa5
refactor: command structure
dirkmc Apr 7, 2023
d799fd4
fix: cassandra - dont use batch insert for PayloadToPieces
dirkmc Apr 13, 2023
b12415b
fix: create tables CQL
dirkmc Apr 13, 2023
5820c69
fix: increase payload to pieces insert parallelism
dirkmc Apr 14, 2023
8a1ad94
fix: use simple replication strategy
dirkmc Apr 14, 2023
4963af9
feat: use yugabyte cassandra driver
dirkmc Apr 14, 2023
36a80a3
fix: remove bench binary
dirkmc Apr 14, 2023
371d915
update metrics endpoint
nonsense Apr 13, 2023
c159cff
fix random generated piece cid
nonsense Apr 14, 2023
4add736
fixup
nonsense Apr 14, 2023
136f579
fix: cassandra bitswap benchmark
dirkmc Apr 14, 2023
6023cf7
remove foundationdb
nonsense May 19, 2023
d083ec9
Merge branch 'nonsense/piece-store' into feat/lid-bench
nonsense May 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions extern/boostd-data/bench/cassandra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
package main

import (
"context"
_ "embed"
"fmt"
"strings"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/filecoin-project/boostd-data/model"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"github.com/yugabyte/gocql"
"golang.org/x/sync/errgroup"
)

var cassandraCmd = &cli.Command{
Name: "cassandra",
Before: before,
Subcommands: []*cli.Command{
initCmd(createCassandra),
dropCmd(createCassandra),
loadCmd(createCassandra, &cli.IntFlag{
Name: "payload-pieces-parallelism",
Value: 16,
}),
bitswapCmd(createCassandra),
graphsyncCmd(createCassandra),
},
}

func createCassandra(ctx context.Context, cctx *cli.Context) (BenchDB, error) {
return NewCassandraDB(cctx.String("connect-string"), cctx.Int("payload-pieces-parallelism"))
}

type CassandraDB struct {
session *gocql.Session
payloadPiecesParallelism int
}

func NewCassandraDB(connectString string, payloadPiecesParallelism int) (*CassandraDB, error) {
cluster := gocql.NewCluster(connectString)
cluster.Consistency = gocql.Quorum
cluster.Timeout = 30 * time.Second
cluster.ConnectTimeout = 30 * time.Second
session, err := cluster.CreateSession()
if err != nil {
return nil, fmt.Errorf("creating cluster: %w", err)
}
return &CassandraDB{
session: session,
payloadPiecesParallelism: payloadPiecesParallelism,
}, nil
}

func (c *CassandraDB) Name() string {
return "Cassandra DB"
}

//go:embed create_tables.cql
var createTablesCQL string

func (c *CassandraDB) Init(ctx context.Context, b bool) error {
tables := []string{`PayloadToPieces`, `PieceBlockOffsetSize`}
for _, tbl := range tables {
qry := `drop table if exists bench.` + tbl
log.Debug(qry)
err := c.session.Query(qry).WithContext(ctx).Exec()
if err != nil {
log.Warn(err)
}
}

createTablesLines := strings.Split(createTablesCQL, ";")
for _, line := range createTablesLines {
line = strings.Trim(line, "\n \t")
if line == "" {
continue
}
log.Debug(line)
err := c.session.Query(line).WithContext(ctx).Exec()
if err != nil {
return fmt.Errorf("creating tables: executing\n%s\n%w", line, err)
}
}

return nil
}

func (c *CassandraDB) Cleanup(ctx context.Context) error {
_ = c.session.Query(`drop table if exists bench.PayloadToPieces`).WithContext(ctx).Exec()
_ = c.session.Query(`drop table if exists bench.PieceBlockOffsetSize`).WithContext(ctx).Exec()
_ = c.session.Query(`drop keyspace bench`).WithContext(ctx).Exec()
c.session.Close()
return nil
}

func (c *CassandraDB) GetBlockSample(ctx context.Context, count int) ([]pieceBlock, error) {
// TODO: randomize order
qry := `SELECT PieceCid, PayloadMultihash FROM bench.PieceBlockOffsetSize LIMIT ?`
iter := c.session.Query(qry, count).WithContext(ctx).Iter()

var pieceCidBz, payloadMHBz []byte
pbs := make([]pieceBlock, 0, count)
for iter.Scan(&pieceCidBz, &payloadMHBz) {
_, pcid, err := cid.CidFromBytes(pieceCidBz)
if err != nil {
return nil, fmt.Errorf("scanning piece cid: %w", err)
}

cp := make([]byte, len(payloadMHBz))
copy(cp, payloadMHBz)
_, mh, err := multihash.MHFromBytes(cp)
if err != nil {
return nil, fmt.Errorf("scanning multihash: %w", err)
}

pbs = append(pbs, pieceBlock{
PieceCid: pcid,
PayloadMultihash: mh,
})
}
if err := iter.Close(); err != nil {
return nil, err
}

return pbs, nil
}

func (c *CassandraDB) AddIndexRecords(ctx context.Context, pieceCid cid.Cid, recs []model.Record) error {
if len(recs) == 0 {
return nil
}

timeQueryBind := time.Now()
// Add payload to pieces index
startPayloadToPieces := time.Now()
queue := make(chan []byte, len(recs))
for _, rec := range recs {
queue <- rec.Cid.Hash()
}
close(queue)

var eg errgroup.Group
for i := 0; i < c.payloadPiecesParallelism; i++ {
eg.Go(func() error {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return ctx.Err()
case multihashBytes, ok := <-queue:
if !ok {
return nil
}

q := `INSERT INTO bench.PayloadToPieces (PayloadMultihash, PieceCids) VALUES (?, ?)`
err := c.session.Query(q, multihashBytes, pieceCid.Bytes()).Exec()
if err != nil {
return fmt.Errorf("inserting into PayloadToPieces: %w", err)
}
}
}

return ctx.Err()
})
}
err := eg.Wait()
if err != nil {
return err
}
duration := time.Since(startPayloadToPieces)
log.Debugw("insert payload-to-pieces complete", "duration", duration.String(), "duration-ms", duration.Milliseconds())

metrics.GetOrRegisterResettingTimer("csql.query.bind", nil).UpdateSince(timeQueryBind)

timePrepareInsert := time.Now()
// Add piece to block info index
startPieceBlockInfo := time.Now()
batchEntries := make([]gocql.BatchEntry, 0, len(recs))
insertPieceOffsetsQry := `INSERT INTO bench.PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)`
for _, rec := range recs {
batchEntries = append(batchEntries, gocql.BatchEntry{
Stmt: insertPieceOffsetsQry,
Args: []interface{}{pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size},
Idempotent: true,
})
}
metrics.GetOrRegisterResettingTimer("csql.prepare.insert", nil).UpdateSince(timePrepareInsert)

// Cassandra has a 50k limit on batch statements. Keeping batch size small
// makes sure we're under the limit.
const batchSize = 49000
var batch *gocql.Batch
for allIdx, entry := range batchEntries {
if batch == nil {
timeNewBatch := time.Now()
batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx).RetryPolicy(&gocql.ExponentialBackoffRetryPolicy{Max: 120 * time.Second, NumRetries: 25})
metrics.GetOrRegisterResettingTimer("csql.batch.new", nil).UpdateSince(timeNewBatch)
}

batch.Entries = append(batch.Entries, entry)

if allIdx == len(batchEntries)-1 || len(batch.Entries) == batchSize {
var err error
for i := 0; i < 30; i++ {
st := time.Now()
err = c.session.ExecuteBatch(batch)
if err == nil {
metrics.GetOrRegisterResettingTimer("csql.session.execute", nil).UpdateSince(st)
break
}
metrics.GetOrRegisterResettingTimer("csql.session.execute.err", nil).UpdateSince(st)

log.Warnf("error executing batch: %s", err)
time.Sleep(1 * time.Second)
}
if err != nil {
return fmt.Errorf("adding index records for piece %s: %w", pieceCid, err)
}
batch = nil
continue
}
}
duration = time.Since(startPieceBlockInfo)
log.Debugw("insert piece block info complete", "duration", duration.String(), "duration-ms", duration.Milliseconds())

return nil
}

func (c *CassandraDB) PiecesContainingMultihash(ctx context.Context, m multihash.Multihash) ([]cid.Cid, error) {
var bz []byte
qry := `SELECT PieceCids FROM bench.PayloadToPieces WHERE PayloadMultihash = ?`
err := c.session.Query(qry, m).WithContext(ctx).Scan(&bz)
if err != nil {
return nil, fmt.Errorf("getting pieces containing multihash: %w", err)
}

return cidsFromBytes(bz)
}

func (c *CassandraDB) GetOffsetSize(ctx context.Context, pieceCid cid.Cid, hash multihash.Multihash) (*model.OffsetSize, error) {
var offset, size uint64
qry := `SELECT BlockOffset, BlockSize FROM bench.PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash = ?`
err := c.session.Query(qry, pieceCid.Bytes(), hash).WithContext(ctx).Scan(&offset, &size)
if err != nil {
return nil, fmt.Errorf("getting offset / size: %w", err)
}

return &model.OffsetSize{Offset: offset, Size: size}, nil
}

func (c *CassandraDB) GetIterableIndex(ctx context.Context, pieceCid cid.Cid) (index.IterableIndex, error) {
qry := `SELECT PayloadMultihash, BlockOffset FROM bench.PieceBlockOffsetSize WHERE PieceCid = ?`
iter := c.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter()

var records []index.Record
var payloadMHBz []byte
var offset uint64
for iter.Scan(&payloadMHBz, &offset) {
_, pmh, err := multihash.MHFromBytes(payloadMHBz)
if err != nil {
return nil, fmt.Errorf("scanning mulithash: %w", err)
}

records = append(records, index.Record{
Cid: cid.NewCidV1(cid.Raw, pmh),
Offset: offset,
})
}
if err := iter.Close(); err != nil {
return nil, err
}

mis := make(index.MultihashIndexSorted)
err := mis.Load(records)
if err != nil {
return nil, err
}

return &mis, nil
}
44 changes: 44 additions & 0 deletions extern/boostd-data/bench/cidbytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"fmt"
"github.com/ipfs/go-cid"
"math/rand"
"sync"
)

func cidsFromBytes(bz []byte) ([]cid.Cid, error) {
var bytesIdx int
var cids []cid.Cid
for bytesIdx < len(bz) {
readCount, pcid, err := cid.CidFromBytes(bz[bytesIdx:])
if err != nil {
return nil, fmt.Errorf("parsing bytes to cid: %w", err)
}

bytesIdx += readCount
cids = append(cids, pcid)
}
return cids, nil
}

var rndlk sync.Mutex

func generateRandomCid(baseCid []byte) (cid.Cid, error) {
buff := make([]byte, len(baseCid))
copy(buff, baseCid)

rndlk.Lock()
_, err := rand.Read(buff[len(buff)-8:])
rndlk.Unlock()
if err != nil {
return cid.Undef, err
}

_, c, err := cid.CidFromBytes(buff)
if err != nil {
return cid.Undef, fmt.Errorf("generating cid: %w", err)
}

return c, nil
}
Loading