Skip to content

Commit

Permalink
fix some concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Nov 22, 2016
1 parent c0719ab commit ca770ad
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 36 deletions.
16 changes: 10 additions & 6 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type broker struct {
type Broker struct {
Options

mu sync.Mutex
mu sync.RWMutex

// state for fsm
topics map[string][]*cluster.TopicPartition
Expand Down Expand Up @@ -169,6 +169,8 @@ func (s *Broker) ControllerID() string {
}

func (s *Broker) PartitionsForTopic(topic string) (found []*cluster.TopicPartition, err error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.topics[topic], nil
}

Expand Down Expand Up @@ -207,13 +209,13 @@ func (s *Broker) apply(cmdType CmdType, data interface{}) error {
}

func (s *Broker) addPartition(partition *cluster.TopicPartition) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
if v, ok := s.topics[partition.Topic]; ok {
s.topics[partition.Topic] = append(v, partition)
} else {
s.topics[partition.Topic] = []*cluster.TopicPartition{partition}
}
s.mu.RUnlock()
if s.IsLeaderOfPartition(partition) {
if err := partition.OpenCommitLog(s.LogDir); err != nil {
panic(err)
Expand All @@ -228,7 +230,8 @@ func (s *Broker) addBroker(broker *cluster.Broker) {
}

func (s *Broker) IsLeaderOfPartition(partition *cluster.TopicPartition) bool {
// TODO: switch this to a map for perf
s.mu.RLock()
defer s.mu.RUnlock()
for _, p := range s.topics[partition.Topic] {
if p.Partition == partition.Partition {
if partition.Leader == s.ID {
Expand Down Expand Up @@ -258,6 +261,7 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.Logger.Debug("broker/apply cmd [%d]", c.Cmd)
switch c.Cmd {
case addBroker:
broker := new(cluster.Broker)
Expand Down Expand Up @@ -335,8 +339,6 @@ func (s *Broker) DeleteTopic(topic string) error {
}

func (s *Broker) deleteTopic(tp *cluster.TopicPartition) error {
s.mu.Lock()
defer s.mu.Unlock()
partitions, err := s.PartitionsForTopic(tp.Topic)
if err != nil {
return err
Expand All @@ -346,7 +348,9 @@ func (s *Broker) deleteTopic(tp *cluster.TopicPartition) error {
return err
}
}
s.mu.Lock()
delete(s.topics, tp.Topic)
s.mu.Unlock()
return nil
}

Expand Down
51 changes: 33 additions & 18 deletions commitlog/commitlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,7 @@ func (l *CommitLog) Open() error {
return nil
}

func (l *CommitLog) DeleteAll() error {
return os.RemoveAll(l.Path)
}

func (l *CommitLog) Append(ms MessageSet) (offset int64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.checkSplit() {
if err := l.split(); err != nil {
return offset, err
Expand All @@ -99,6 +93,37 @@ func (l *CommitLog) Read(p []byte) (n int, err error) {
defer l.mu.Unlock()
return l.activeSegment().Read(p)
}
func (l *CommitLog) NewestOffset() int64 {
return l.activeSegment().NextOffset
}

func (l *CommitLog) OldestOffset() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.segments[0].BaseOffset
}

func (l *CommitLog) activeSegment() *Segment {
return l.vActiveSegment.Load().(*Segment)
}

func (l *CommitLog) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
for _, segment := range l.segments {
if err := segment.Close(); err != nil {
return err
}
}
return nil
}

func (l *CommitLog) DeleteAll() error {
if err := l.Close(); err != nil {
return err
}
return os.RemoveAll(l.Path)
}

func (l *CommitLog) checkSplit() bool {
return l.activeSegment().IsFull()
Expand All @@ -109,19 +134,9 @@ func (l *CommitLog) split() error {
if err != nil {
return err
}
l.mu.Lock()
l.segments = append(l.segments, seg)
l.mu.Unlock()
l.vActiveSegment.Store(seg)
return nil
}

func (l *CommitLog) NewestOffset() int64 {
return l.activeSegment().NextOffset
}

func (l *CommitLog) OldestOffset() int64 {
return l.segments[0].BaseOffset
}

func (l *CommitLog) activeSegment() *Segment {
return l.vActiveSegment.Load().(*Segment)
}
12 changes: 12 additions & 0 deletions commitlog/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"os"
"sync"

"github.com/pkg/errors"

Expand All @@ -24,6 +25,7 @@ type index struct {
options
mmap gommap.MMap
file *os.File
mu sync.RWMutex
offset int64
}

Expand Down Expand Up @@ -101,7 +103,9 @@ func (idx *index) WriteEntry(entry Entry) (err error) {
return errors.Wrap(err, "binary write failed")
}
idx.WriteAt(b.Bytes(), idx.offset)
idx.mu.Lock()
idx.offset += entryWidth
idx.mu.Unlock()
return nil
}

Expand All @@ -114,11 +118,15 @@ func (idx *index) ReadEntry(e *Entry, offset int64) error {
if err != nil {
return errors.Wrap(err, "binar read failed")
}
idx.mu.RLock()
rel.fill(e, idx.baseOffset)
idx.mu.RUnlock()
return nil
}

func (idx *index) ReadAt(p []byte, offset int64) (n int) {
idx.mu.RLock()
defer idx.mu.RUnlock()
return copy(p, idx.mmap[offset:offset+entryWidth])
}

Expand All @@ -127,10 +135,14 @@ func (idx *index) Write(p []byte) (n int, err error) {
}

func (idx *index) WriteAt(p []byte, offset int64) (n int) {
idx.mu.Lock()
defer idx.mu.Unlock()
return copy(idx.mmap[offset:offset+entryWidth], p)
}

func (idx *index) Sync() error {
idx.mu.Lock()
defer idx.mu.Unlock()
if err := idx.file.Sync(); err != nil {
return errors.Wrap(err, "file sync failed")
}
Expand Down
4 changes: 4 additions & 0 deletions commitlog/message_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (ms MessageSet) Offset() int64 {
return int64(big.Uint64(ms[offsetPos : offsetPos+8]))
}

func (ms MessageSet) PutOffset(offset int64) {
big.PutUint64(ms[offsetPos:offsetPos+8], uint64(offset))
}

func (ms MessageSet) Size() int32 {
return int32(big.Uint32(ms[sizePos:sizePos+4]) + msgSetHeaderLen)
}
Expand Down
4 changes: 0 additions & 4 deletions protocol/offsets_request.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package protocol

import "fmt"

type OffsetsPartition struct {
Partition int32
Timestamp int64 // -1 to receive latest offset, -2 to receive earliest offset
Expand Down Expand Up @@ -51,10 +49,8 @@ func (r *OffsetsRequest) Decode(d PacketDecoder) error {
}
topicCount, err := d.ArrayLength()
if err != nil {
fmt.Println("errror!", err)
return err
}
fmt.Println("topic count:", topicCount)
r.Topics = make([]*OffsetsTopic, topicCount)
for i := range r.Topics {
ot := new(OffsetsTopic)
Expand Down
16 changes: 8 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"time"

"github.com/gorilla/handlers"
Expand All @@ -27,9 +28,9 @@ type Broker struct {
}

type Server struct {
addr string
ln *net.TCPListener

addr string
ln *net.TCPListener
mu sync.Mutex
logger *simplelog.Logger
broker *broker.Broker
}
Expand Down Expand Up @@ -70,7 +71,8 @@ func (s *Server) Start() error {
for {
conn, err := s.ln.Accept()
if err != nil {
panic(errors.Wrap(err, "Listener accept failed"))
s.logger.Debug("listener accept failed: %v", err)
continue
}

go s.handleRequest(conn)
Expand Down Expand Up @@ -123,7 +125,6 @@ func (s *Server) handleRequest(conn net.Conn) {
continue
}

s.logger.Debug("request with size: %d", size)
b := make([]byte, size+4) //+4 since we're going to copy the size into b
copy(b, p)

Expand Down Expand Up @@ -182,7 +183,6 @@ func (s *Server) decode(header *protocol.RequestHeader, req protocol.Decoder, d
if err != nil {
return err
}
// s.logger.Debug("[%d], request: %s", header.CorrelationID, spew.Sdump(req))
return nil
}

Expand All @@ -206,6 +206,7 @@ func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader
}
}
} else {
s.logger.Info("Failed to create topic %s: %v", errors.New("broker is not controller"))
// cID := s.broker.ControllerID()
// send the request to the controller
return
Expand Down Expand Up @@ -312,7 +313,6 @@ func (s *Server) handleMetadata(conn net.Conn, header *protocol.RequestHeader, r
}

func (s *Server) write(conn net.Conn, header *protocol.RequestHeader, e protocol.Encoder) error {
// s.logger.Debug("correlation id [%d], response: %s", header.CorrelationID, spew.Sdump(e))
b, err := protocol.Encode(e)
if err != nil {
return err
Expand Down Expand Up @@ -376,7 +376,7 @@ func (s *Server) handleProduce(conn net.Conn, header *protocol.RequestHeader, re
}
offset, err := partition.CommitLog.Append(p.RecordSet)
if err != nil {
s.logger.Info("commitlog append failed: %s", err)
s.logger.Info("commitlog/append failed: %s", err)
presp.ErrorCode = protocol.ErrUnknown
}
presp.Partition = p.Partition
Expand Down

0 comments on commit ca770ad

Please sign in to comment.