From a98f5b26ceed68dd71ae88c245ae653a45cfc254 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sat, 10 Dec 2016 04:19:25 -0500 Subject: [PATCH] add log cleaner --- cluster/topic_partition.go | 1 + commitlog/cleaner.go | 45 +++++++++++++++++++++++++++++++++++++ commitlog/commitlog.go | 8 +++++++ commitlog/commitlog_test.go | 45 ++++++++++++++++++++++++++++--------- 4 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 commitlog/cleaner.go diff --git a/cluster/topic_partition.go b/cluster/topic_partition.go index 1b2ec770..8ff03907 100644 --- a/cluster/topic_partition.go +++ b/cluster/topic_partition.go @@ -30,6 +30,7 @@ func (partition *TopicPartition) OpenCommitLog(logDir string) error { partition.CommitLog, err = commitlog.New(commitlog.Options{ Path: path.Join(logDir, partition.String()), MaxSegmentBytes: 1024, + MaxLogBytes: -1, }) if err != nil { return err diff --git a/commitlog/cleaner.go b/commitlog/cleaner.go new file mode 100644 index 00000000..90203eab --- /dev/null +++ b/commitlog/cleaner.go @@ -0,0 +1,45 @@ +package commitlog + +type Cleaner interface { + Clean([]*Segment) ([]*Segment, error) +} + +type DeleteCleaner struct { + Retention struct { + Bytes int64 + } +} + +func NewDeleteCleaner(bytes int64) *DeleteCleaner { + c := &DeleteCleaner{} + c.Retention.Bytes = bytes + return c +} + +func (c *DeleteCleaner) Clean(segments []*Segment) ([]*Segment, error) { + if len(segments) == 0 || c.Retention.Bytes == -1 { + return segments, nil + } + cleanedSegments := []*Segment{segments[len(segments)-1]} + totalBytes := cleanedSegments[0].Position + if len(segments) > 1 { + var i int + for i = len(segments) - 2; i > -1; i-- { + s := segments[i] + totalBytes += s.Position + if totalBytes > c.Retention.Bytes { + break + } + cleanedSegments = append([]*Segment{s}, cleanedSegments...) + } + if i > -1 { + for ; i != 0; i-- { + s := segments[i] + if err := s.Delete(); err != nil { + return nil, err + } + } + } + } + return cleanedSegments, nil +} diff --git a/commitlog/commitlog.go b/commitlog/commitlog.go index 45eb550b..8919d245 100644 --- a/commitlog/commitlog.go +++ b/commitlog/commitlog.go @@ -18,6 +18,7 @@ var ( type CommitLog struct { Options + cleaner Cleaner name string mu sync.RWMutex segments []*Segment @@ -27,6 +28,7 @@ type CommitLog struct { type Options struct { Path string MaxSegmentBytes int64 + MaxLogBytes int64 } func New(opts Options) (*CommitLog, error) { @@ -42,6 +44,7 @@ func New(opts Options) (*CommitLog, error) { l := &CommitLog{ Options: opts, name: filepath.Base(path), + cleaner: NewDeleteCleaner(opts.MaxLogBytes), } return l, nil @@ -176,6 +179,11 @@ func (l *CommitLog) split() error { } l.mu.Lock() l.segments = append(l.segments, seg) + segments, err := l.cleaner.Clean(l.segments) + if err != nil { + return err + } + l.segments = segments l.mu.Unlock() l.vActiveSegment.Store(seg) return nil diff --git a/commitlog/commitlog_test.go b/commitlog/commitlog_test.go index c0eef0a2..8fdb3fe5 100644 --- a/commitlog/commitlog_test.go +++ b/commitlog/commitlog_test.go @@ -61,14 +61,14 @@ func TestNewCommitLog(t *testing.T) { func TestNewCommitLogExisting(t *testing.T) { var err error - l0 := setup(t) + l := setup(t) defer cleanup(t) for _, msgSet := range msgSets { - _, err = l0.Append(msgSet) + _, err = l.Append(msgSet) assert.NoError(t, err) } - assert.Equal(t, int64(2), l0.NewestOffset()) + assert.Equal(t, int64(2), l.NewestOffset()) l1 := setup(t) msgs1 := []Message{ @@ -108,28 +108,28 @@ func TestNewCommitLogExisting(t *testing.T) { func TestTruncateTo(t *testing.T) { var err error - l0 := setup(t) + l := setup(t) defer cleanup(t) for _, msgSet := range msgSets { - _, err = l0.Append(msgSet) + _, err = l.Append(msgSet) assert.NoError(t, err) } - assert.Equal(t, int64(2), l0.NewestOffset()) - assert.Equal(t, 2, len(l0.Segments())) + assert.Equal(t, int64(2), l.NewestOffset()) + assert.Equal(t, 2, len(l.Segments())) - err = l0.TruncateTo(int64(1)) + err = l.TruncateTo(int64(1)) assert.NoError(t, err) - assert.Equal(t, 1, len(l0.Segments())) + assert.Equal(t, 1, len(l.Segments())) maxBytes := msgSets[0].Size() - _, err = l0.NewReader(ReaderOptions{ + _, err = l.NewReader(ReaderOptions{ Offset: 0, MaxBytes: maxBytes, }) assert.Error(t, err) - r, err := l0.NewReader(ReaderOptions{ + r, err := l.NewReader(ReaderOptions{ Offset: 1, MaxBytes: maxBytes, }) @@ -152,6 +152,28 @@ func TestTruncateTo(t *testing.T) { } } +func TestCleaner(t *testing.T) { + var err error + l := setup(t) + defer cleanup(t) + + for _, msgSet := range msgSets { + _, err = l.Append(msgSet) + assert.NoError(t, err) + } + segments := l.Segments() + assert.Equal(t, 2, len(segments)) + + for _, msgSet := range msgSets { + _, err = l.Append(msgSet) + assert.NoError(t, err) + } + assert.Equal(t, 2, len(l.Segments())) + for i, s := range l.Segments() { + assert.NotEqual(t, s, segments[i]) + } +} + func check(t *testing.T, got, want []byte) { if !bytes.Equal(got, want) { t.Errorf("got = %s, want %s", string(got), string(want)) @@ -162,6 +184,7 @@ func setup(t *testing.T) *CommitLog { opts := Options{ Path: path, MaxSegmentBytes: 6, + MaxLogBytes: 30, } l, err := New(opts) assert.NoError(t, err)