Skip to content

Commit

Permalink
Merge branch 'devel' into e35
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Apr 18, 2024
2 parents e7ca122 + 3730cff commit 67eb9ad
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 25 deletions.
7 changes: 5 additions & 2 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,11 @@ var createTorrent = &cobra.Command{
Example: "go run ./cmd/downloader torrent_create --datadir=<your_datadir> --file=<relative_file_path>",
RunE: func(cmd *cobra.Command, args []string) error {
dirs := datadir.New(datadirCli)
err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFiles(dirs.Snap), chain, nil)
createdAmount, err := downloader.BuildTorrentFilesIfNeed(cmd.Context(), dirs, downloader.NewAtomicTorrentFiles(dirs.Snap), chain, nil)
if err != nil {
return err
}
log.Info("created .torent files", "amount", createdAmount)
return nil
},
}
Expand Down Expand Up @@ -517,9 +518,11 @@ func doPrintTorrentHashes(ctx context.Context, logger log.Logger) error {
return err
}
}
if err := downloader.BuildTorrentFilesIfNeed(ctx, dirs, tf, chain, nil); err != nil {
createdAmount, err := downloader.BuildTorrentFilesIfNeed(ctx, dirs, tf, chain, nil)
if err != nil {
return fmt.Errorf("BuildTorrentFilesIfNeed: %w", err)
}
log.Info("created .torent files", "amount", createdAmount)
}

res := map[string]string{}
Expand Down
4 changes: 2 additions & 2 deletions cmd/snapshots/torrents/torrents.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func updateTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f

defer os.Remove(filepath.Join(srcSession.LocalFsRoot(), file))

err = downloader.BuildTorrentIfNeed(gctx, file, srcSession.LocalFsRoot(), torrentFiles)
_, err = downloader.BuildTorrentIfNeed(gctx, file, srcSession.LocalFsRoot(), torrentFiles)

if err != nil {
return err
Expand Down Expand Up @@ -474,7 +474,7 @@ func verifyTorrents(ctx context.Context, srcSession *downloader.RCloneSession, f

defer os.Remove(filepath.Join(srcSession.LocalFsRoot(), file))

err = downloader.BuildTorrentIfNeed(gctx, file, srcSession.LocalFsRoot(), torrentFiles)
_, err = downloader.BuildTorrentIfNeed(gctx, file, srcSession.LocalFsRoot(), torrentFiles)

if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2315,7 +2315,7 @@ func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error
}

// if we don't have the torrent file we build it if we have the .seg file
err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFiles)
_, err := BuildTorrentIfNeed(ctx, name, d.SnapDir(), d.torrentFiles)
if err != nil {
return fmt.Errorf("AddNewSeedableFile: %w", err)
}
Expand Down Expand Up @@ -2526,7 +2526,8 @@ func (d *Downloader) addTorrentFilesFromDisk(quiet bool) error {
return eg.Wait()
}
func (d *Downloader) BuildTorrentFilesIfNeed(ctx context.Context, chain string, ignore snapcfg.Preverified) error {
return BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFiles, chain, ignore)
_, err := BuildTorrentFilesIfNeed(ctx, d.cfg.Dirs, d.torrentFiles, chain, ignore)
return err
}
func (d *Downloader) Stats() AggStats {
d.lock.RLock()
Expand Down
12 changes: 6 additions & 6 deletions erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ func TestNoEscape(t *testing.T) {

tf := NewAtomicTorrentFiles(dirs.Snap)
// allow adding files only if they are inside snapshots dir
err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap, tf)
_, err := BuildTorrentIfNeed(ctx, "a.seg", dirs.Snap, tf)
require.NoError(err)
err = BuildTorrentIfNeed(ctx, "b/a.seg", dirs.Snap, tf)
_, err = BuildTorrentIfNeed(ctx, "b/a.seg", dirs.Snap, tf)
require.NoError(err)
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "a.seg"), dirs.Snap, tf)
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "a.seg"), dirs.Snap, tf)
require.NoError(err)
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "b", "a.seg"), dirs.Snap, tf)
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Snap, "b", "a.seg"), dirs.Snap, tf)
require.NoError(err)

// reject escaping snapshots dir
err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Chaindata, "b", "a.seg"), dirs.Snap, tf)
_, err = BuildTorrentIfNeed(ctx, filepath.Join(dirs.Chaindata, "b", "a.seg"), dirs.Snap, tf)
require.Error(err)
err = BuildTorrentIfNeed(ctx, "./../a.seg", dirs.Snap, tf)
_, err = BuildTorrentIfNeed(ctx, "./../a.seg", dirs.Snap, tf)
require.Error(err)
}
29 changes: 17 additions & 12 deletions erigon-lib/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,49 @@ func ensureCantLeaveDir(fName, root string) (string, error) {
return fName, nil
}

func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *TorrentFiles) (err error) {
func BuildTorrentIfNeed(ctx context.Context, fName, root string, torrentFiles *TorrentFiles) (ok bool, err error) {
select {
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
default:
}
fName, err = ensureCantLeaveDir(fName, root)
if err != nil {
return err
return false, err
}

if torrentFiles.Exists(fName) {
return nil
return false, nil
}

fPath := filepath.Join(root, fName)
if !dir2.FileExist(fPath) {
return nil
return false, nil
}

info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize, Name: fName}
if err := info.BuildFromFilePath(fPath); err != nil {
return fmt.Errorf("createTorrentFileFromSegment: %w", err)
return false, fmt.Errorf("createTorrentFileFromSegment: %w", err)
}
info.Name = fName

return CreateTorrentFileFromInfo(root, info, nil, torrentFiles)
return true, CreateTorrentFileFromInfo(root, info, nil, torrentFiles)
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *TorrentFiles, chain string, ignore snapcfg.Preverified) error {
func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFiles *TorrentFiles, chain string, ignore snapcfg.Preverified) (int, error) {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

files, err := SeedableFiles(dirs, chain)
if err != nil {
return err
return 0, err
}

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.GOMAXPROCS(-1) * 16)
var i atomic.Int32
var createdAmount atomic.Int32

for _, file := range files {
file := file
Expand All @@ -186,9 +187,13 @@ func BuildTorrentFilesIfNeed(ctx context.Context, dirs datadir.Dirs, torrentFile

g.Go(func() error {
defer i.Add(1)
if err := BuildTorrentIfNeed(ctx, file, dirs.Snap, torrentFiles); err != nil {
ok, err := BuildTorrentIfNeed(ctx, file, dirs.Snap, torrentFiles)
if err != nil {
return err
}
if ok {
createdAmount.Add(1)
}
return nil
})
}
Expand All @@ -206,9 +211,9 @@ Loop:
}
}
if err := g.Wait(); err != nil {
return err
return int(createdAmount.Load()), err
}
return nil
return int(createdAmount.Load()), nil
}

func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo, torrentFiles *TorrentFiles) error {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ func (u *snapshotUploader) upload(ctx context.Context, logger log.Logger) {
g.Go(func() error {
defer i.Add(1)

err := downloader.BuildTorrentIfNeed(gctx, state.file, u.cfg.dirs.Snap, u.torrentFiles)
_, err := downloader.BuildTorrentIfNeed(gctx, state.file, u.cfg.dirs.Snap, u.torrentFiles)

state.Lock()
state.buildingTorrent = false
Expand Down

0 comments on commit 67eb9ad

Please sign in to comment.