Skip to content

Commit

Permalink
go: indexed message iterator: guard against bad offsets in file
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Sep 12, 2024
1 parent a7aac13 commit fb386a0
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math/bits"
"slices"
"sort"

"math"

"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)

var ErrBadOffset = errors.New("cannot seek to offset")

const (
chunkBufferGrowthMultiple = 1.2
)
Expand Down Expand Up @@ -54,6 +59,7 @@ type indexedMessageIterator struct {
attachmentIndexes []*AttachmentIndex
metadataIndexes []*MetadataIndex
footer *Footer
fileSize int64

curChunkIndex int
messageIndexes []messageIndexWithChunkSlot
Expand All @@ -68,14 +74,28 @@ type indexedMessageIterator struct {
metadataCallback func(*Metadata) error
}

func (it *indexedMessageIterator) seekTo(offset uint64) (int64, error) {
if offset > uint64(math.MaxInt64) {
return 0, fmt.Errorf("%w: %d > int64 max", ErrBadOffset, offset)
}
signedOffset := int64(offset)
if signedOffset >= it.fileSize {
return 0, fmt.Errorf("%w: %d past file end %d", ErrBadOffset, offset, it.fileSize)
}
pos, err := it.rs.Seek(signedOffset, io.SeekStart)
return pos, err
}

// parseIndexSection parses the index section of the file and populates the
// related fields of the structure. It must be called prior to any of the other
// access methods.
func (it *indexedMessageIterator) parseSummarySection() error {
_, err := it.rs.Seek(-8-4-8-8, io.SeekEnd) // magic, plus 20 bytes footer
const footerStartOffsetFromEnd = +8 + 4 + 8 + 8 // magic, plus 20 bytes footer
footerStartPos, err := it.rs.Seek(-footerStartOffsetFromEnd, io.SeekEnd)
if err != nil {
return err
}
it.fileSize = footerStartPos + footerStartOffsetFromEnd
buf := make([]byte, 8+20)
_, err = io.ReadFull(it.rs, buf)
if err != nil {
Expand All @@ -96,7 +116,7 @@ func (it *indexedMessageIterator) parseSummarySection() error {
it.hasReadSummarySection = true
return nil
}
_, err = it.rs.Seek(int64(footer.SummaryStart), io.SeekStart)
_, err = it.seekTo(footer.SummaryStart)
if err != nil {
return fmt.Errorf("failed to seek to summary start")
}
Expand Down Expand Up @@ -188,7 +208,7 @@ func (it *indexedMessageIterator) parseSummarySection() error {
// loadChunk seeks to and decompresses a chunk into a chunk slot, then populates it.messageIndexes
// with the offsets of messages in that chunk.
func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
_, err := it.rs.Seek(int64(chunkIndex.ChunkStartOffset), io.SeekStart)
_, err := it.seekTo(chunkIndex.ChunkStartOffset)
if err != nil {
return err
}
Expand Down Expand Up @@ -377,7 +397,7 @@ func (it *indexedMessageIterator) NextInto(msg *Message) (*Schema, *Channel, *Me
// take care of the metadata here
if it.metadataCallback != nil {
for _, idx := range it.metadataIndexes {
_, err := it.rs.Seek(int64(idx.Offset), io.SeekStart)
_, err := it.seekTo(idx.Offset)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to seek to metadata: %w", err)
}
Expand Down

0 comments on commit fb386a0

Please sign in to comment.