Skip to content

Commit

Permalink
Move leadership from broker to raft (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
shwetabhgarg authored and Travis Jeffery committed May 11, 2017
1 parent 03a7547 commit 8394c6b
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 77 deletions.
41 changes: 7 additions & 34 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package broker
import (
"path"
"sync"
"time"

"github.com/pkg/errors"
"github.com/travisjeffery/jocko"
Expand All @@ -25,13 +24,8 @@ type Broker struct {
brokerAddr string
logDir string

raft jocko.Raft
leaderCh chan bool
commandCh chan jocko.RaftCommand

serf jocko.Serf
reconcileCh chan *jocko.ClusterMember
reconcileInterval time.Duration
raft jocko.Raft
serf jocko.Serf

shutdownCh chan struct{}
shutdown bool
Expand All @@ -43,11 +37,7 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) {
replicationManager: newReplicationManager(),
id: id,
topics: make(map[string][]*jocko.Partition),
reconcileCh: make(chan *jocko.ClusterMember, 32),
reconcileInterval: time.Second * 5,
shutdownCh: make(chan struct{}),
leaderCh: make(chan bool, 1),
commandCh: make(chan jocko.RaftCommand, 16),
}

for _, o := range opts {
Expand All @@ -69,19 +59,18 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) {
RaftPort: raftPort,
}

if err := b.serf.Bootstrap(conn, b.reconcileCh); err != nil {
reconcileCh := make(chan *jocko.ClusterMember, 32)
if err := b.serf.Bootstrap(conn, reconcileCh); err != nil {
b.logger.Info("failed to start serf: %s", err)
return nil, err
}

if err := b.raft.Bootstrap(b.serf.Cluster(), b.commandCh, b.leaderCh); err != nil {
commandCh := make(chan jocko.RaftCommand, 16)
if err := b.raft.Bootstrap(b.serf, reconcileCh, commandCh); err != nil {
return nil, err
}

// monitor leadership changes
go b.monitorLeadership()

go b.handleRaftCommmands()
go b.handleRaftCommmands(commandCh)

return b, nil
}
Expand All @@ -100,10 +89,6 @@ func (b *Broker) IsController() bool {
return b.raft.IsLeader()
}

func (b *Broker) ControllerID() string {
return b.raft.LeaderID()
}

func (b *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err error) {
b.mu.RLock()
defer b.mu.RUnlock()
Expand Down Expand Up @@ -236,15 +221,6 @@ func (b *Broker) deleteTopic(tp *jocko.Partition) error {
return nil
}

// leave is used to prepare for a graceful shutdown of the server
func (b *Broker) leave() error {
b.logger.Info("broker starting to leave")

// TODO: handle case if we're the controller/leader

return nil
}

func (b *Broker) Shutdown() error {
b.logger.Info("shutting down broker")
b.shutdownLock.Lock()
Expand All @@ -255,9 +231,6 @@ func (b *Broker) Shutdown() error {
}

b.shutdown = true
if err := b.leave(); err != nil {
return err
}
close(b.shutdownCh)

if b.serf != nil {
Expand Down
4 changes: 2 additions & 2 deletions broker/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (s *Broker) raftApply(cmd jocko.RaftCmdType, data interface{}) error {
return s.raft.Apply(c)
}

func (s *Broker) handleRaftCommmands() {
func (s *Broker) handleRaftCommmands(commandCh <-chan jocko.RaftCommand) {
for {
select {
case cmd := <-s.commandCh:
case cmd := <-commandCh:
s.apply(cmd)
case <-s.shutdownCh:
return
Expand Down
7 changes: 2 additions & 5 deletions jocko.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type Serf interface {
Member(memberID int32) *ClusterMember
Join(addrs ...string) (int, error)
Shutdown() error
Addr() string
ID() int32
}

type RaftCmdType int
Expand All @@ -159,13 +159,10 @@ type RaftCommand struct {
// Raft is the interface that wraps Raft's methods and is used to
// manage consensus for the Jocko cluster.
type Raft interface {
Bootstrap(peers []*ClusterMember, commandCh chan<- RaftCommand, leaderCh chan<- bool) (err error)
Bootstrap(serf Serf, serfEventCh <-chan *ClusterMember, commandCh chan<- RaftCommand) error
Apply(cmd RaftCommand) error
IsLeader() bool
LeaderID() string
WaitForBarrier() error
AddPeer(addr string) error
RemovePeer(addr string) error
Shutdown() error
Addr() string
}
Expand Down
39 changes: 20 additions & 19 deletions broker/leader.go → raft/leader.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package broker
package raft

import (
"net"
Expand All @@ -9,14 +9,14 @@ import (

// monitorLeadership is used to monitor if we acquire or lose our role as the
// leader in the Raft cluster.
func (b *Broker) monitorLeadership() {
func (b *Raft) monitorLeadership(raftEventCh <-chan bool, serfEventCh <-chan *jocko.ClusterMember) {
var stopCh chan struct{}
for {
select {
case isLeader := <-b.leaderCh:
case isLeader := <-raftEventCh:
if isLeader {
stopCh = make(chan struct{})
go b.leaderLoop(stopCh)
go b.leaderLoop(stopCh, serfEventCh)
b.logger.Info("cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
Expand All @@ -31,21 +31,21 @@ func (b *Broker) monitorLeadership() {

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to the leader.
func (b *Broker) revokeLeadership() error {
func (b *Raft) revokeLeadership() error {
return nil
}

// leaderLoop runs as long as we are the leader to run maintainence duties
func (b *Broker) leaderLoop(stopCh chan struct{}) {
func (b *Raft) leaderLoop(stopCh chan struct{}, serfEventCh <-chan *jocko.ClusterMember) {
defer b.revokeLeadership()
var reconcileCh chan *jocko.ClusterMember
var reconcileCh <-chan *jocko.ClusterMember
establishedLeader := false

RECONCILE:
reconcileCh = nil
interval := time.After(b.reconcileInterval)

if err := b.raft.WaitForBarrier(); err != nil {
if err := b.waitForBarrier(); err != nil {
goto WAIT
}

Expand All @@ -62,7 +62,7 @@ RECONCILE:
goto WAIT
}

reconcileCh = b.reconcileCh
reconcileCh = serfEventCh

WAIT:
for {
Expand All @@ -74,22 +74,22 @@ WAIT:
case <-interval:
goto RECONCILE
case member := <-reconcileCh:
if b.IsController() {
if b.IsLeader() {
b.reconcileMember(member)
}
}
}
}

func (b *Broker) establishLeadership(stopCh chan struct{}) error {
func (b *Raft) establishLeadership(stopCh chan struct{}) error {
// start monitoring other brokers
// b.periodicDispatcher.SetEnabled(true)
// b.periodicDispatcher.Start()
return nil
}

func (b *Broker) reconcile() error {
members := b.Cluster()
func (b *Raft) reconcile() error {
members := b.serf.Cluster()
for _, member := range members {
if err := b.reconcileMember(member); err != nil {
return err
Expand All @@ -98,9 +98,9 @@ func (b *Broker) reconcile() error {
return nil
}

func (b *Broker) reconcileMember(member *jocko.ClusterMember) error {
func (b *Raft) reconcileMember(member *jocko.ClusterMember) error {
// don't reconcile ourself
if member.ID == b.id {
if member.ID == b.serf.ID() {
return nil
}
var err error
Expand All @@ -110,18 +110,19 @@ func (b *Broker) reconcileMember(member *jocko.ClusterMember) error {
case jocko.StatusLeft, jocko.StatusReap:
err = b.removeRaftPeer(member)
}

if err != nil {
b.logger.Info("failed to reconcile member: %v: %v", member, err)
return err
}
return nil
}

func (b *Broker) addRaftPeer(member *jocko.ClusterMember) error {
func (b *Raft) addRaftPeer(member *jocko.ClusterMember) error {
addr := &net.TCPAddr{IP: net.ParseIP(member.IP), Port: member.RaftPort}
return b.raft.AddPeer(addr.String())
return b.addPeer(addr.String())
}

func (b *Broker) removeRaftPeer(member *jocko.ClusterMember) error {
return b.raft.RemovePeer(member.IP)
func (b *Raft) removeRaftPeer(member *jocko.ClusterMember) error {
return b.removePeer(member.IP)
}
49 changes: 38 additions & 11 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ type Raft struct {
dataDir string
addr string
devDisableBootstrap bool

serf jocko.Serf
reconcileInterval time.Duration
shutdownCh chan struct{}
}

// New Raft object
func New(opts ...OptionFn) (*Raft, error) {
r := &Raft{config: raft.DefaultConfig()}
r := &Raft{
config: raft.DefaultConfig(),
reconcileInterval: time.Second * 5,
shutdownCh: make(chan struct{}),
}

for _, o := range opts {
o(r)
Expand All @@ -44,9 +52,9 @@ func New(opts ...OptionFn) (*Raft, error) {
}

// Bootstrap the Raft agent using fsm and connect to peers
// Updates to leadership are returned on leaderCh channel
// Commands received by raft are returned on commandCh channel
func (b *Raft) Bootstrap(peers []*jocko.ClusterMember, commandCh chan<- jocko.RaftCommand, leaderCh chan<- bool) (err error) {
func (b *Raft) Bootstrap(serf jocko.Serf, serfEventCh <-chan *jocko.ClusterMember, commandCh chan<- jocko.RaftCommand) (err error) {
b.serf = serf
b.transport, err = raft.NewTCPTransport(b.addr, nil, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
Expand All @@ -58,7 +66,7 @@ func (b *Raft) Bootstrap(peers []*jocko.ClusterMember, commandCh chan<- jocko.Ra
}

var peersAddrs []string
for _, p := range peers {
for _, p := range serf.Cluster() {
addr := &net.TCPAddr{IP: net.ParseIP(p.IP), Port: p.RaftPort}
peersAddrs = append(peersAddrs, addr.String())
}
Expand All @@ -78,7 +86,8 @@ func (b *Raft) Bootstrap(peers []*jocko.ClusterMember, commandCh chan<- jocko.Ra
}
b.store = boltStore

b.config.NotifyCh = leaderCh
raftNotifyCh := make(chan bool, 1)
b.config.NotifyCh = raftNotifyCh
b.config.StartAsLeader = !b.devDisableBootstrap

fsm := &fsm{
Expand All @@ -94,6 +103,9 @@ func (b *Raft) Bootstrap(peers []*jocko.ClusterMember, commandCh chan<- jocko.Ra
}
b.raft = raft

// monitor leadership changes
go b.monitorLeadership(raftNotifyCh, serfEventCh)

return nil
}

Expand Down Expand Up @@ -122,8 +134,8 @@ func (b *Raft) LeaderID() string {
return b.raft.Leader()
}

// WaitForBarrier to let fsm finish
func (b *Raft) WaitForBarrier() error {
// waitForBarrier to let fsm finish
func (b *Raft) waitForBarrier() error {
barrier := b.raft.Barrier(0)
if err := barrier.Error(); err != nil {
b.logger.Info("failed to wait for barrier: %v", err)
Expand All @@ -132,8 +144,8 @@ func (b *Raft) WaitForBarrier() error {
return nil
}

// AddPeer of given address to raft
func (b *Raft) AddPeer(addr string) error {
// addPeer of given address to raft
func (b *Raft) addPeer(addr string) error {
future := b.raft.AddPeer(addr)
if err := future.Error(); err != nil && err != raft.ErrKnownPeer {
b.logger.Info("failed to add raft peer: %v", err)
Expand All @@ -144,8 +156,8 @@ func (b *Raft) AddPeer(addr string) error {
return nil
}

// RemovePeer of given address from raft
func (b *Raft) RemovePeer(addr string) error {
// removePeer of given address from raft
func (b *Raft) removePeer(addr string) error {
future := b.raft.RemovePeer(addr)
if err := future.Error(); err != nil && err != raft.ErrUnknownPeer {
b.logger.Info("failed to remove raft peer: %v", err)
Expand All @@ -156,8 +168,23 @@ func (b *Raft) RemovePeer(addr string) error {
return nil
}

// leave is used to prepare for a graceful shutdown of the server
func (b *Raft) leave() error {
b.logger.Info("preparing to leave raft peers")

// TODO: handle case if we're the controller/leader

return nil
}

// Shutdown raft agent
func (b *Raft) Shutdown() error {
close(b.shutdownCh)

if err := b.leave(); err != nil {
return err
}

if err := b.transport.Close(); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8394c6b

Please sign in to comment.