From 1ad164874c7e1cdbfcf49e67ea9d38b5591086a8 Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sat, 23 Sep 2017 18:10:44 -0500 Subject: [PATCH] fix lock use --- broker/broker.go | 1 + broker/replication_manager.go | 4 ++-- examples/sarama/main.go | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 34614643..124bb13a 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -98,6 +98,7 @@ func (b *Broker) IsController() bool { // TopicPartitions is used to get the partitions for the given topic. func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err *jocko.Error) { b.mu.RLock() + defer b.mu.RUnlock() if p, ok := b.topics[topic]; !ok { return nil, &jocko.Error{ErrorCode: protocol.ErrUnknownTopicOrPartition} } else { diff --git a/broker/replication_manager.go b/broker/replication_manager.go index 56f368b9..8ca16cc4 100644 --- a/broker/replication_manager.go +++ b/broker/replication_manager.go @@ -19,7 +19,7 @@ func newReplicationManager() *replicationManager { func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *protocol.PartitionState) error { p, err := rm.Partition(topic, pid) - if err != nil { + if err != protocol.ErrNone { return err } // stop replicator to current leader @@ -43,7 +43,7 @@ func (rm *replicationManager) BecomeFollower(topic string, pid int32, command *p func (rm *replicationManager) BecomeLeader(topic string, pid int32, command *protocol.PartitionState) error { p, err := rm.Partition(topic, pid) - if err != nil { + if err != protocol.ErrNone { return err } if r, ok := rm.replicators[p]; ok { diff --git a/examples/sarama/main.go b/examples/sarama/main.go index 9eaba5e3..9ad3d774 100644 --- a/examples/sarama/main.go +++ b/examples/sarama/main.go @@ -8,6 +8,7 @@ import ( "github.com/Shopify/sarama" "github.com/travisjeffery/jocko/broker" + "github.com/travisjeffery/jocko/protocol" "github.com/travisjeffery/jocko/raft" "github.com/travisjeffery/jocko/serf" "github.com/travisjeffery/jocko/server" @@ -143,7 +144,7 @@ func setup() func() { } // creating/deleting topic directly since Sarama doesn't support it - if err := store.CreateTopic(topic, numPartitions, 1); err != nil && err != broker.ErrTopicExists { + ir err := store.CreateTopic(topic, numPartitions, 1); err != protocol.ErrNone { panic(err) }