Skip to content

Commit

Permalink
able to read and write across segments
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 10, 2016
1 parent 37146b3 commit cfc462a
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 46 deletions.
80 changes: 50 additions & 30 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,51 @@ import (
)

type CommitLog struct {
Options
name string
path string
mu sync.RWMutex
segments []*segment
vActiveSegment atomic.Value
}

type Reader struct {
segment *segment
mu sync.Mutex
offset int64
type Options struct {
Path string
SegmentBytes int64
}

func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

return r.segment.ReadAt(p, r.offset)
}

func (l *CommitLog) NewReader(offset int64) (r *Reader) {
return &Reader{
segment: l.segments[0],
offset: offset,
func New(opts Options) (*CommitLog, error) {
if opts.Path == "" {
return nil, errors.New("path is empty")
}
}

func New(path string) (*CommitLog, error) {
err := os.MkdirAll(path, 0755)
if err != nil {
return nil, errors.Wrap(err, "mkdir failed")
if opts.SegmentBytes == 0 {
// TODO default here
}

path, _ = filepath.Abs(path)
path, _ := filepath.Abs(opts.Path)
l := &CommitLog{
name: filepath.Base(path),
path: path,
Options: opts,
name: filepath.Base(path),
}
err = l.open()

return l, err
return l, nil
}

func (l *CommitLog) init() error {
err := os.MkdirAll(l.Path, 0755)
if err != nil {
return errors.Wrap(err, "mkdir failed")
}
return nil
}

func (l *CommitLog) open() error {
_, err := ioutil.ReadDir(l.path)
_, err := ioutil.ReadDir(l.Path)
if err != nil {
return errors.Wrap(err, "read dif failed")
return errors.Wrap(err, "read dir failed")
}

activeSegment, err := NewSegment(l.path, 0)
activeSegment, err := NewSegment(l.Path, 0, l.SegmentBytes)
if err != nil {
return err
}
Expand All @@ -72,12 +67,19 @@ func (l *CommitLog) open() error {
}

func (l *CommitLog) deleteAll() error {
return os.RemoveAll(l.path)
return os.RemoveAll(l.Path)
}

func (l *CommitLog) Write(p []byte) (n int, err error) {
l.mu.Lock()
defer l.mu.Unlock()

if l.checkSplit() {
if err = l.split(); err != nil {
return 0, err
}
}

return l.activeSegment().Write(p)
}

Expand All @@ -87,6 +89,24 @@ func (l *CommitLog) Read(p []byte) (n int, err error) {
return l.activeSegment().Read(p)
}

func (l *CommitLog) checkSplit() bool {
return l.activeSegment().IsFull()
}

func (l *CommitLog) split() error {
seg, err := NewSegment(l.Path, l.newestOffset(), l.SegmentBytes)
if err != nil {
return err
}
l.segments = append(l.segments, seg)
l.vActiveSegment.Store(seg)
return nil
}

func (l *CommitLog) newestOffset() int64 {
return l.activeSegment().NewestOffset()
}

func (l *CommitLog) activeSegment() *segment {
return l.vActiveSegment.Load().(*segment)
}
17 changes: 14 additions & 3 deletions commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,19 @@ import (
)

func TestNewCommitLog(t *testing.T) {
l, err := New(filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63())))
defer l.deleteAll()
path := filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63()))
fmt.Println(path)
opts := Options{
Path: path,
SegmentBytes: 3,
}
l, err := New(opts)

// remove old data
l.deleteAll()

l.init()
l.open()

if err != nil {
t.Fatal(err)
Expand All @@ -28,7 +39,7 @@ func TestNewCommitLog(t *testing.T) {
t.Error(err)
}

r := l.NewReader(0)
r, err := l.NewReader(0)
if err != nil {
t.Error(err)
}
Expand Down
54 changes: 54 additions & 0 deletions commitlog/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package commitlog

import (
"io"
"sync"

"github.com/pkg/errors"
)

type Reader struct {
segment *segment
segments []*segment
idx int
mu sync.Mutex
offset int64
}

func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

var readSize int
for {
readSize, err = r.segment.ReadAt(p[n:], r.offset)
n += readSize
if err != io.EOF {
break
}
r.idx++
if len(r.segments) <= r.idx {
err = io.EOF
break
}
r.segment = r.segments[r.idx]
r.offset = 0
}

return n, err
}

func (l *CommitLog) NewReader(offset int64) (r *Reader, err error) {
segment, idx := findSegment(l.segments, offset)

if segment == nil {
return nil, errors.Wrap(err, "segment not found")
}

return &Reader{
segment: segment,
segments: l.segments,
idx: idx,
offset: offset,
}, nil
}
73 changes: 60 additions & 13 deletions commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,84 @@ const (
)

type segment struct {
writer io.Writer
reader io.Reader
file *os.File
startOffset int64
writer io.Writer
reader io.Reader
log *os.File
index *os.File
baseOffset int64
newestOffset int64
bytes int64
maxBytes int64
}

func NewSegment(path string, startOffset int64) (*segment, error) {
filePath := filepath.Join(path, fmt.Sprintf(logNameFormat, startOffset))
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
func NewSegment(path string, baseOffset int64, maxBytes int64) (*segment, error) {
logPath := filepath.Join(path, fmt.Sprintf(logNameFormat, baseOffset))
log, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, errors.Wrap(err, "open file failed")
}

fi, err := log.Stat()
if err != nil {
return nil, errors.Wrap(err, "file stat failed")
}

indexPath := filepath.Join(path, fmt.Sprintf(indexNameFormat, baseOffset))
index, err := os.OpenFile(indexPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, errors.Wrap(err, "open file failed")
}

s := &segment{
file: file,
writer: file,
reader: file,
startOffset: startOffset,
log: log,
index: index,
writer: log,
reader: log,
bytes: fi.Size(),
maxBytes: maxBytes,
baseOffset: baseOffset,
newestOffset: baseOffset,
}

return s, nil
}

func (s *segment) NewestOffset() int64 {
return s.newestOffset
}

func (s *segment) IsFull() bool {
return s.bytes >= s.maxBytes
}

func (s *segment) Write(p []byte) (n int, err error) {
return s.writer.Write(p)
n, err = s.writer.Write(p)
if err != nil {
return n, errors.Wrap(err, "log write failed")
}

_, err = s.index.Write([]byte(fmt.Sprintf("%d,%d\n", s.newestOffset, s.bytes)))
if err != nil {
return 0, errors.Wrap(err, "index write failed")
}

s.newestOffset += 1
s.bytes += int64(n)

return n, nil
}

func (s *segment) Read(p []byte) (n int, err error) {
return s.reader.Read(p)
}

func (s *segment) ReadAt(p []byte, off int64) (n int, err error) {
return s.file.ReadAt(p, off)
return s.log.ReadAt(p, off)
}

func (s *segment) Close() error {
if err := s.log.Close(); err != nil {
return err
}
return s.index.Close()
}
15 changes: 15 additions & 0 deletions commitlog/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package commitlog

import "sort"

func findSegment(segments []*segment, offset int64) (*segment, int) {
idx := sort.Search(len(segments), func(i int) bool {
return segments[i].baseOffset > offset
}) - 1

if idx < 0 {
return nil, idx
}

return segments[idx], idx
}

0 comments on commit cfc462a

Please sign in to comment.