diff --git a/extern/boostd-data/bench/cassandra.go b/extern/boostd-data/bench/cassandra.go index 71e7b0722..f54066a3f 100644 --- a/extern/boostd-data/bench/cassandra.go +++ b/extern/boostd-data/bench/cassandra.go @@ -12,15 +12,19 @@ import ( "github.com/multiformats/go-multihash" "github.com/urfave/cli/v2" "strings" + "time" ) var cassandraCmd = &cli.Command{ Name: "cassandra", Before: before, - Flags: commonFlags, + Flags: append(commonFlags, &cli.StringFlag{ + Name: "connect-string", + Value: "localhost", + }), Action: func(cctx *cli.Context) error { ctx := cliutil.ReqContext(cctx) - db, err := NewCassandraDB() + db, err := NewCassandraDB(cctx.String("connect-string")) if err != nil { return err } @@ -34,15 +38,15 @@ var cassandraCmd = &cli.Command{ } func createCassandra(ctx context.Context, connectString string) (BenchDB, error) { - return NewCassandraDB() + return NewCassandraDB(connectString) } type CassandraDB struct { session *gocql.Session } -func NewCassandraDB() (*CassandraDB, error) { - cluster := gocql.NewCluster("localhost") +func NewCassandraDB(connectString string) (*CassandraDB, error) { + cluster := gocql.NewCluster(connectString) cluster.Consistency = gocql.Quorum session, err := cluster.CreateSession() if err != nil { @@ -155,23 +159,31 @@ func (c *CassandraDB) AddIndexRecords(ctx context.Context, pieceCid cid.Cid, rec // Cassandra has a 50k limit on batch statements. Keeping batch size small // makes sure we're under the limit. const batchSize = 128 - var batchIdx int var batch *gocql.Batch for allIdx, entry := range batchEntries { - if batchIdx == 0 { - batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + if batch == nil { + batch = c.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx).RetryPolicy(&gocql.ExponentialBackoffRetryPolicy{Max: 120 * time.Second, NumRetries: 25}) } - if allIdx == len(batchEntries)-1 || batchIdx == batchSize { - err := c.session.ExecuteBatch(batch) + + batch.Entries = append(batch.Entries, entry) + + if allIdx == len(batchEntries)-1 || len(batch.Entries) == batchSize { + var err error + for i := 0; i < 30; i++ { + err = c.session.ExecuteBatch(batch) + if err == nil { + break + } + + log.Warnf("error executing batch: %s", err) + time.Sleep(1 * time.Second) + } if err != nil { return fmt.Errorf("adding index records for piece %s: %w", pieceCid, err) } - batchIdx = 0 + batch = nil continue } - - batch.Entries = append(batch.Entries, entry) - batchIdx++ } return nil }