Skip to content

Commit

Permalink
lid bench: Make cassandra put much more robust (#1318)
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 22, 2023
1 parent b3bf8c5 commit 48a1b3b
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions extern/boostd-data/bench/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ import (
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"strings"
"time"
)

var cassandraCmd = &cli.Command{
Name: "cassandra",
Before: before,
Flags: commonFlags,
Flags: append(commonFlags, &cli.StringFlag{
Name: "connect-string",
Value: "localhost",
}),
Action: func(cctx *cli.Context) error {
ctx := cliutil.ReqContext(cctx)
db, err := NewCassandraDB()
db, err := NewCassandraDB(cctx.String("connect-string"))
if err != nil {
return err
}
Expand All @@ -34,15 +38,15 @@ var cassandraCmd = &cli.Command{
}

func createCassandra(ctx context.Context, connectString string) (BenchDB, error) {
return NewCassandraDB()
return NewCassandraDB(connectString)
}

type CassandraDB struct {
session *gocql.Session
}

func NewCassandraDB() (*CassandraDB, error) {
cluster := gocql.NewCluster("localhost")
func NewCassandraDB(connectString string) (*CassandraDB, error) {
cluster := gocql.NewCluster(connectString)
cluster.Consistency = gocql.Quorum
session, err := cluster.CreateSession()
if err != nil {
Expand Down Expand Up @@ -155,23 +159,31 @@ func (c *CassandraDB) AddIndexRecords(ctx context.Context, pieceCid cid.Cid, rec
// Cassandra has a 50k limit on batch statements. Keeping batch size small
// makes sure we're under the limit.
const batchSize = 128
var batchIdx int
var batch *gocql.Batch
for allIdx, entry := range batchEntries {
if batchIdx == 0 {
batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx)
if batch == nil {
batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx).RetryPolicy(&gocql.ExponentialBackoffRetryPolicy{Max: 120 * time.Second, NumRetries: 25})
}
if allIdx == len(batchEntries)-1 || batchIdx == batchSize {
err := c.session.ExecuteBatch(batch)

batch.Entries = append(batch.Entries, entry)

if allIdx == len(batchEntries)-1 || len(batch.Entries) == batchSize {
var err error
for i := 0; i < 30; i++ {
err = c.session.ExecuteBatch(batch)
if err == nil {
break
}

log.Warnf("error executing batch: %s", err)
time.Sleep(1 * time.Second)
}
if err != nil {
return fmt.Errorf("adding index records for piece %s: %w", pieceCid, err)
}
batchIdx = 0
batch = nil
continue
}

batch.Entries = append(batch.Entries, entry)
batchIdx++
}
return nil
}
Expand Down

0 comments on commit 48a1b3b

Please sign in to comment.