Skip to content

Commit

Permalink
feat: LID benchmarking tool
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 21, 2023
1 parent 47ae992 commit 2a1bdfa
Show file tree
Hide file tree
Showing 12 changed files with 1,123 additions and 11 deletions.
230 changes: 230 additions & 0 deletions extern/boostd-data/bench/cassandra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package main

import (
"context"
_ "embed"
"fmt"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/cliutil"
"github.com/gocql/gocql"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"strings"
)

var cassandraCmd = &cli.Command{
Name: "cassandra",
Before: before,
Flags: commonFlags,
Action: func(cctx *cli.Context) error {
ctx := cliutil.ReqContext(cctx)
db, err := NewCassandraDB()
if err != nil {
return err
}
return run(ctx, db, runOptsFromCctx(cctx))
},
Subcommands: []*cli.Command{
loadCmd(createCassandra),
bitswapCmd(createCassandra),
graphsyncCmd(createCassandra),
},
}

func createCassandra(ctx context.Context, connectString string) (BenchDB, error) {
return NewCassandraDB()
}

type CassandraDB struct {
session *gocql.Session
}

func NewCassandraDB() (*CassandraDB, error) {
cluster := gocql.NewCluster("localhost")
cluster.Consistency = gocql.Quorum
session, err := cluster.CreateSession()
if err != nil {
return nil, fmt.Errorf("creating cluster: %w", err)
}
return &CassandraDB{session: session}, nil
}

func (c *CassandraDB) Name() string {
return "Cassandra DB"
}

//go:embed create_tables.cql
var createTablesCQL string

func (c *CassandraDB) Init(ctx context.Context) error {
tables := []string{`PayloadToPieces`, `PieceBlockOffsetSize`}
for _, tbl := range tables {
qry := `drop table if exists bench.` + tbl
log.Debug(qry)
err := c.session.Query(qry).WithContext(ctx).Exec()
if err != nil {
log.Warn(err)
}
}

createTablesLines := strings.Split(createTablesCQL, ";")
for _, line := range createTablesLines {
line = strings.Trim(line, "\n \t")
if line == "" {
continue
}
log.Debug(line)
err := c.session.Query(line).WithContext(ctx).Exec()
if err != nil {
return fmt.Errorf("creating tables: executing\n%s\n%w", line, err)
}
}

return nil
}

func (c *CassandraDB) Cleanup(ctx context.Context) error {
_ = c.session.Query(`drop table if exists bench.PayloadToPieces`).WithContext(ctx).Exec()
_ = c.session.Query(`drop table if exists bench.PieceBlockOffsetSize`).WithContext(ctx).Exec()
_ = c.session.Query(`drop keyspace bench`).WithContext(ctx).Exec()
c.session.Close()
return nil
}

func (c *CassandraDB) GetBlockSample(ctx context.Context, count int) ([]pieceBlock, error) {
// TODO: randomize order
qry := `SELECT PieceCid, PayloadMultihash FROM bench.PieceBlockOffsetSize LIMIT ?`
iter := c.session.Query(qry, count).WithContext(ctx).Iter()

var pieceCidBz, payloadMHBz []byte
pbs := make([]pieceBlock, 0, count)
for iter.Scan(&pieceCidBz, &payloadMHBz) {
_, pcid, err := cid.CidFromBytes(pieceCidBz)
if err != nil {
return nil, fmt.Errorf("scanning piece cid: %w", err)
}
_, pmh, err := multihash.MHFromBytes(payloadMHBz)
if err != nil {
return nil, fmt.Errorf("scanning mulithash: %w", err)
}

pbs = append(pbs, pieceBlock{
PieceCid: pcid,
PayloadMultihash: pmh,
})
}
if err := iter.Close(); err != nil {
return nil, err
}

//log.Debug("got pbs:")
//for _, pb := range pbs {
// log.Debugf(" %s %s", pb.PieceCid, pb.PayloadMultihash)
//}

return pbs, nil
}

func (c *CassandraDB) AddIndexRecords(ctx context.Context, pieceCid cid.Cid, recs []model.Record) error {
if len(recs) == 0 {
return nil
}

batchEntries := make([]gocql.BatchEntry, 0, 2*len(recs))

// Add payload to pieces index
for _, rec := range recs {
batchEntries = append(batchEntries, gocql.BatchEntry{
Stmt: `INSERT INTO bench.PayloadToPieces (PayloadMultihash, PieceCids) VALUES (?, ?)`,
Args: []interface{}{rec.Cid.Hash(), pieceCid.Bytes()},
Idempotent: true,
})
}

// Add piece to block info index
for _, rec := range recs {
batchEntries = append(batchEntries, gocql.BatchEntry{
Stmt: `INSERT INTO bench.PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)`,
Args: []interface{}{pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size},
Idempotent: true,
})
}

// Cassandra has a 50k limit on batch statements. Keeping batch size small
// makes sure we're under the limit.
const batchSize = 128
var batchIdx int
var batch *gocql.Batch
for allIdx, entry := range batchEntries {
if batchIdx == 0 {
batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
}
if allIdx == len(batchEntries)-1 || batchIdx == batchSize {
err := c.session.ExecuteBatch(batch)
if err != nil {
return fmt.Errorf("adding index records for piece %s: %w", pieceCid, err)
}
batchIdx = 0
continue
}

batch.Entries = append(batch.Entries, entry)
batchIdx++
}
return nil
}

func (c *CassandraDB) PiecesContainingMultihash(ctx context.Context, m multihash.Multihash) ([]cid.Cid, error) {
var bz []byte
qry := `SELECT PieceCids FROM bench.PayloadToPieces WHERE PayloadMultihash = ?`
err := c.session.Query(qry, m).WithContext(ctx).Scan(&bz)
if err != nil {
return nil, fmt.Errorf("getting pieces containing multihash: %w", err)
}

return cidsFromBytes(bz)
}

func (c *CassandraDB) GetOffsetSize(ctx context.Context, pieceCid cid.Cid, hash multihash.Multihash) (*model.OffsetSize, error) {
var offset, size uint64
qry := `SELECT BlockOffset, BlockSize FROM bench.PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash = ?`
err := c.session.Query(qry, pieceCid.Bytes(), hash).WithContext(ctx).Scan(&offset, &size)
if err != nil {
return nil, fmt.Errorf("getting offset / size: %w", err)
}

return &model.OffsetSize{Offset: offset, Size: size}, nil
}

func (c *CassandraDB) GetIterableIndex(ctx context.Context, pieceCid cid.Cid) (index.IterableIndex, error) {
qry := `SELECT PayloadMultihash, BlockOffset FROM bench.PieceBlockOffsetSize WHERE PieceCid = ?`
iter := c.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter()

var records []index.Record
var payloadMHBz []byte
var offset uint64
for iter.Scan(&payloadMHBz, &offset) {
_, pmh, err := multihash.MHFromBytes(payloadMHBz)
if err != nil {
return nil, fmt.Errorf("scanning mulithash: %w", err)
}

records = append(records, index.Record{
Cid: cid.NewCidV1(cid.Raw, pmh),
Offset: offset,
})
}
if err := iter.Close(); err != nil {
return nil, err
}

mis := make(index.MultihashIndexSorted)
err := mis.Load(records)
if err != nil {
return nil, err
}

return &mis, nil
}
44 changes: 44 additions & 0 deletions extern/boostd-data/bench/cidbytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
"fmt"
"github.com/ipfs/go-cid"
"math/rand"
"sync"
)

func cidsFromBytes(bz []byte) ([]cid.Cid, error) {
var bytesIdx int
var cids []cid.Cid
for bytesIdx < len(bz) {
readCount, pcid, err := cid.CidFromBytes(bz[bytesIdx:])
if err != nil {
return nil, fmt.Errorf("parsing bytes to cid: %w", err)
}

bytesIdx += readCount
cids = append(cids, pcid)
}
return cids, nil
}

var rndlk sync.Mutex

func generateRandomCid(baseCid []byte) (cid.Cid, error) {
buff := make([]byte, len(baseCid))
copy(buff, baseCid)

rndlk.Lock()
_, err := rand.Read(buff[len(buff)-8:])
rndlk.Unlock()
if err != nil {
return cid.Undef, err
}

_, c, err := cid.CidFromBytes(buff)
if err != nil {
return cid.Undef, fmt.Errorf("generating cid: %w", err)
}

return c, nil
}
46 changes: 46 additions & 0 deletions extern/boostd-data/bench/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import "github.com/urfave/cli/v2"

var commonFlags = []cli.Flag{
&cli.IntFlag{
Name: "piece-add-parallelism",
Value: 2,
},
&cli.IntFlag{
Name: "piece-count",
Value: 30,
},
&cli.IntFlag{
Name: "blocks-per-piece",
Value: 1024,
},
&cli.IntFlag{
Name: "bs-fetch-count",
Value: 100,
},
&cli.IntFlag{
Name: "bs-fetch-parallelism",
Value: 10,
},
&cli.IntFlag{
Name: "gs-fetch-count",
Value: 10,
},
&cli.IntFlag{
Name: "gs-fetch-parallelism",
Value: 3,
},
}

func runOptsFromCctx(cctx *cli.Context) runOpts {
return runOpts{
pieceParallelism: cctx.Int("piece-add-parallelism"),
blocksPerPiece: cctx.Int("blocks-per-piece"),
pieceCount: cctx.Int("piece-count"),
bitswapFetchCount: cctx.Int("bs-fetch-count"),
bitswapFetchParallelism: cctx.Int("bs-fetch-parallelism"),
graphsyncFetchCount: cctx.Int("gs-fetch-count"),
graphsyncFetchParallelism: cctx.Int("gs-fetch-parallelism"),
}
}
14 changes: 14 additions & 0 deletions extern/boostd-data/bench/create_tables.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
create keyspace if not exists bench with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

CREATE TABLE bench.PayloadToPieces (
PayloadMultihash BLOB PRIMARY KEY,
PieceCids BLOB
);

CREATE TABLE bench.PieceBlockOffsetSize (
PieceCid BLOB,
PayloadMultihash BLOB,
BlockOffset BIGINT,
BlockSize BIGINT,
PRIMARY KEY (PieceCid, PayloadMultihash)
);
12 changes: 12 additions & 0 deletions extern/boostd-data/bench/create_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE PayloadToPieces (
PayloadMultihash bytea PRIMARY KEY,
PieceCids bytea
);

CREATE TABLE PieceBlockOffsetSize (
PieceCid bytea,
PayloadMultihash bytea,
BlockOffset BIGINT,
BlockSize BIGINT,
PRIMARY KEY (PieceCid, PayloadMultihash)
);
Loading

0 comments on commit 2a1bdfa

Please sign in to comment.