Skip to content

Commit

Permalink
add log cleaner
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 10, 2016
1 parent 154ba08 commit a98f5b2
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 11 deletions.
1 change: 1 addition & 0 deletions cluster/topic_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (partition *TopicPartition) OpenCommitLog(logDir string) error {
partition.CommitLog, err = commitlog.New(commitlog.Options{
Path: path.Join(logDir, partition.String()),
MaxSegmentBytes: 1024,
MaxLogBytes: -1,
})
if err != nil {
return err
Expand Down
45 changes: 45 additions & 0 deletions commitlog/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package commitlog

type Cleaner interface {
Clean([]*Segment) ([]*Segment, error)
}

type DeleteCleaner struct {
Retention struct {
Bytes int64
}
}

func NewDeleteCleaner(bytes int64) *DeleteCleaner {
c := &DeleteCleaner{}
c.Retention.Bytes = bytes
return c
}

func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error) {
if len(segments) == 0 || c.Retention.Bytes == -1 {
return segments, nil
}
cleanedSegments := []*Segment{segments[len(segments)-1]}
totalBytes := cleanedSegments[0].Position
if len(segments) > 1 {
var i int
for i = len(segments) - 2; i > -1; i-- {
s := segments[i]
totalBytes += s.Position
if totalBytes > c.Retention.Bytes {
break
}
cleanedSegments = append([]*Segment{s}, cleanedSegments...)
}
if i > -1 {
for ; i != 0; i-- {
s := segments[i]
if err := s.Delete(); err != nil {
return nil, err
}
}
}
}
return cleanedSegments, nil
}
8 changes: 8 additions & 0 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (

type CommitLog struct {
Options
cleaner Cleaner
name string
mu sync.RWMutex
segments []*Segment
Expand All @@ -27,6 +28,7 @@ type CommitLog struct {
type Options struct {
Path string
MaxSegmentBytes int64
MaxLogBytes int64
}

func New(opts Options) (*CommitLog, error) {
Expand All @@ -42,6 +44,7 @@ func New(opts Options) (*CommitLog, error) {
l := &CommitLog{
Options: opts,
name: filepath.Base(path),
cleaner: NewDeleteCleaner(opts.MaxLogBytes),
}

return l, nil
Expand Down Expand Up @@ -176,6 +179,11 @@ func (l *CommitLog) split() error {
}
l.mu.Lock()
l.segments = append(l.segments, seg)
segments, err := l.cleaner.Clean(l.segments)
if err != nil {
return err
}
l.segments = segments
l.mu.Unlock()
l.vActiveSegment.Store(seg)
return nil
Expand Down
45 changes: 34 additions & 11 deletions commitlog/commitlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ func TestNewCommitLog(t *testing.T) {

func TestNewCommitLogExisting(t *testing.T) {
var err error
l0 := setup(t)
l := setup(t)
defer cleanup(t)

for _, msgSet := range msgSets {
_, err = l0.Append(msgSet)
_, err = l.Append(msgSet)
assert.NoError(t, err)
}
assert.Equal(t, int64(2), l0.NewestOffset())
assert.Equal(t, int64(2), l.NewestOffset())

l1 := setup(t)
msgs1 := []Message{
Expand Down Expand Up @@ -108,28 +108,28 @@ func TestNewCommitLogExisting(t *testing.T) {

func TestTruncateTo(t *testing.T) {
var err error
l0 := setup(t)
l := setup(t)
defer cleanup(t)

for _, msgSet := range msgSets {
_, err = l0.Append(msgSet)
_, err = l.Append(msgSet)
assert.NoError(t, err)
}
assert.Equal(t, int64(2), l0.NewestOffset())
assert.Equal(t, 2, len(l0.Segments()))
assert.Equal(t, int64(2), l.NewestOffset())
assert.Equal(t, 2, len(l.Segments()))

err = l0.TruncateTo(int64(1))
err = l.TruncateTo(int64(1))
assert.NoError(t, err)
assert.Equal(t, 1, len(l0.Segments()))
assert.Equal(t, 1, len(l.Segments()))

maxBytes := msgSets[0].Size()
_, err = l0.NewReader(ReaderOptions{
_, err = l.NewReader(ReaderOptions{
Offset: 0,
MaxBytes: maxBytes,
})
assert.Error(t, err)

r, err := l0.NewReader(ReaderOptions{
r, err := l.NewReader(ReaderOptions{
Offset: 1,
MaxBytes: maxBytes,
})
Expand All @@ -152,6 +152,28 @@ func TestTruncateTo(t *testing.T) {
}
}

func TestCleaner(t *testing.T) {
var err error
l := setup(t)
defer cleanup(t)

for _, msgSet := range msgSets {
_, err = l.Append(msgSet)
assert.NoError(t, err)
}
segments := l.Segments()
assert.Equal(t, 2, len(segments))

for _, msgSet := range msgSets {
_, err = l.Append(msgSet)
assert.NoError(t, err)
}
assert.Equal(t, 2, len(l.Segments()))
for i, s := range l.Segments() {
assert.NotEqual(t, s, segments[i])
}
}

func check(t *testing.T, got, want []byte) {
if !bytes.Equal(got, want) {
t.Errorf("got = %s, want %s", string(got), string(want))
Expand All @@ -162,6 +184,7 @@ func setup(t *testing.T) *CommitLog {
opts := Options{
Path: path,
MaxSegmentBytes: 6,
MaxLogBytes: 30,
}
l, err := New(opts)
assert.NoError(t, err)
Expand Down

0 comments on commit a98f5b2

Please sign in to comment.