Skip to content

Commit

Permalink
lid bench: Add retry to postgres put (#1316)
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 22, 2023
1 parent 015724f commit b3bf8c5
Showing 1 changed file with 71 additions and 47 deletions.
118 changes: 71 additions & 47 deletions extern/boostd-data/bench/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"strings"
"time"
)

var postgresCmd = &cli.Command{
Expand Down Expand Up @@ -159,67 +160,90 @@ func (db *Postgres) AddIndexRecords(ctx context.Context, pieceCid cid.Cid, recs
return nil
}

tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Commit()
var err error
for attempt := 0; attempt < 5; attempt++ {
err = func() error {
tx, err := db.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

// Add payload to pieces index
if _, err := tx.Exec(`
// Add payload to pieces index
if _, err := tx.Exec(`
create temp table PayloadToPiecesTmp (like PayloadToPieces excluding constraints) on commit drop;
`); err != nil {
return fmt.Errorf("create PayloadToPiecesTemp: %w", err)
}

stmt, err := tx.Prepare(`copy PayloadToPiecesTmp (PayloadMultihash, PieceCids) from stdin `)
if err != nil {
return fmt.Errorf("prepare copy PayloadToPiecesTemp: %w", err)
}

for _, rec := range recs {
if _, err := stmt.Exec(rec.Cid.Hash(), pieceCid.Bytes()); err != nil {
return fmt.Errorf("exec copy PayloadToPiecesTemp: %w", err)
}
}
if err := stmt.Close(); err != nil {
return fmt.Errorf("close PayloadToPiecesTemp statement: %w", err)
}

if _, err := tx.Exec(`
return fmt.Errorf("create PayloadToPiecesTemp: %w", err)
}

stmt, err := tx.Prepare(`copy PayloadToPiecesTmp (PayloadMultihash, PieceCids) from stdin `)
if err != nil {
return fmt.Errorf("prepare copy PayloadToPiecesTemp: %w", err)
}

for _, rec := range recs {
if _, err := stmt.Exec(rec.Cid.Hash(), pieceCid.Bytes()); err != nil {
return fmt.Errorf("exec copy PayloadToPiecesTemp: %w", err)
}
}
if err := stmt.Close(); err != nil {
return fmt.Errorf("close PayloadToPiecesTemp statement: %w", err)
}

if _, err := tx.Exec(`
insert into PayloadToPieces select * from PayloadToPiecesTmp on conflict do nothing
`); err != nil {
return fmt.Errorf("insert into PayloadToPieces: %w", err)
}
return fmt.Errorf("insert into PayloadToPieces: %w", err)
}

// Add piece to block info index
if _, err := tx.Exec(`
// Add piece to block info index
if _, err := tx.Exec(`
create temp table PieceBlockOffsetSizeTmp (like PieceBlockOffsetSize excluding constraints) on commit drop;
`); err != nil {
return fmt.Errorf("create PieceBlockOffsetSizeTmp: %w", err)
}
return fmt.Errorf("create PieceBlockOffsetSizeTmp: %w", err)
}

stmt, err = tx.Prepare(`copy PieceBlockOffsetSizeTmp (PieceCid, PayloadMultihash, BlockOffset, BlockSize) from stdin `)
if err != nil {
return fmt.Errorf("prepare copy PieceBlockOffsetSizeTmp: %w", err)
}

for _, rec := range recs {
if _, err := stmt.Exec(pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size); err != nil {
return fmt.Errorf("exec copy PieceBlockOffsetSizeTmp: %w", err)
}
}
if err := stmt.Close(); err != nil {
return fmt.Errorf("close PieceBlockOffsetSizeTmp statement: %w", err)
}

if _, err := tx.Exec(`
insert into PieceBlockOffsetSize select * from PieceBlockOffsetSizeTmp on conflict do nothing
`); err != nil {
return fmt.Errorf("insert into PieceBlockOffsetSize: %w", err)
}

stmt, err = tx.Prepare(`copy PieceBlockOffsetSizeTmp (PieceCid, PayloadMultihash, BlockOffset, BlockSize) from stdin `)
if err != nil {
return fmt.Errorf("prepare copy PieceBlockOffsetSizeTmp: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit: %w", err)
}

for _, rec := range recs {
if _, err := stmt.Exec(pieceCid.Bytes(), rec.Cid.Hash(), rec.Offset, rec.Size); err != nil {
return fmt.Errorf("exec copy PieceBlockOffsetSizeTmp: %w", err)
return nil
}()

if err == nil {
return nil
}
}
if err := stmt.Close(); err != nil {
return fmt.Errorf("close PieceBlockOffsetSizeTmp statement: %w", err)
}

if _, err := tx.Exec(`
insert into PieceBlockOffsetSize select * from PieceBlockOffsetSizeTmp on conflict do nothing
`); err != nil {
return fmt.Errorf("insert into PieceBlockOffsetSize: %w", err)
if strings.Contains(err.Error(), "Restart read required") {
time.Sleep(time.Duration(attempt*100) * time.Millisecond)
continue
}

return err
}

return nil
return err
}

func (db *Postgres) PiecesContainingMultihash(ctx context.Context, m multihash.Multihash) ([]cid.Cid, error) {
Expand Down

0 comments on commit b3bf8c5

Please sign in to comment.