Skip to content

Commit

Permalink
fix lock use
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Sep 23, 2017
1 parent 49dd62f commit 1ad1648
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
1 change: 1 addition & 0 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (b *Broker) IsController() bool {
// TopicPartitions is used to get the partitions for the given topic.
func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err *jocko.Error) {
b.mu.RLock()
defer b.mu.RUnlock()
if p, ok := b.topics[topic]; !ok {
return nil, &jocko.Error{ErrorCode: protocol.ErrUnknownTopicOrPartition}
} else {
Expand Down
4 changes: 2 additions & 2 deletions broker/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func newReplicationManager() *replicationManager {

func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *protocol.PartitionState) error {
p, err := rm.Partition(topic, pid)
if err != nil {
if err != protocol.ErrNone {
return err
}
// stop replicator to current leader
Expand All @@ -43,7 +43,7 @@ func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *p

func (rm *replicationManager) BecomeLeader(topic string, pid int32, command *protocol.PartitionState) error {
p, err := rm.Partition(topic, pid)
if err != nil {
if err != protocol.ErrNone {
return err
}
if r, ok := rm.replicators[p]; ok {
Expand Down
3 changes: 2 additions & 1 deletion examples/sarama/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/Shopify/sarama"
"github.com/travisjeffery/jocko/broker"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/jocko/raft"
"github.com/travisjeffery/jocko/serf"
"github.com/travisjeffery/jocko/server"
Expand Down Expand Up @@ -143,7 +144,7 @@ func setup() func() {
}

// creating/deleting topic directly since Sarama doesn't support it
if err := store.CreateTopic(topic, numPartitions, 1); err != nil && err != broker.ErrTopicExists {
ir err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone {
panic(err)
}

Expand Down

0 comments on commit 1ad1648

Please sign in to comment.