Skip to content

Commit

Permalink
add initial serf calls
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 23, 2016
1 parent 180154f commit 48fb383
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 72 deletions.
151 changes: 103 additions & 48 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package broker

import (
"encoding/json"
"fmt"
"io"
"net"
"os"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/pkg/errors"
"github.com/travisjeffery/jocko/commitlog"
"github.com/travisjeffery/jocko/jocko"
Expand Down Expand Up @@ -70,114 +72,166 @@ type Broker struct {
tcpAddr string
logDir string

peerStore raft.PeerStore
transport raft.Transport
raft *raft.Raft
store *raftboltdb.BoltStore
raft *raft.Raft
raftPeers raft.PeerStore
raftTransport *raft.NetworkTransport
raftStore *raftboltdb.BoltStore
raftLeaderCh chan bool

serf *serf.Serf
serfReconcileCh chan serf.Member
serfEventCh chan serf.Event

reconcileInterval time.Duration
reconcileCh chan serf.Member

left bool
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
}

func New(id int32, opts ...Option) *Broker {
const (
raftState = "raft/"
serfSnapshot = "serf/snapshot"
)

func New(id int32, opts ...Option) (*Broker, error) {
var err error
b := &Broker{
replicationManager: newReplicationManager(),
id: id,
topics: make(map[string][]*jocko.Partition),
serfReconcileCh: make(chan serf.Member, 32),
serfEventCh: make(chan serf.Event, 256),
reconcileInterval: time.Second * 60,
}
for _, o := range opts {
o.modifyBroker(b)
}
return b
}

func (b *Broker) ID() int32 {
return b.id
}

func (b *Broker) Host() string {
return b.host
}

func (b *Broker) Port() string {
return b.port
}

func (b *Broker) Cluster() []*jocko.BrokerConn {
return b.brokers
}
serfConfig := serf.DefaultConfig()
b.serf, err = b.setupSerf(serfConfig, b.serfEventCh, serfSnapshot)
if err != nil {
return nil, err
}

func (s *Broker) Open() error {
host, port, err := net.SplitHostPort(s.tcpAddr)
host, port, err := net.SplitHostPort(b.tcpAddr)
if err != nil {
return err
return nil, err
}

s.host = host
s.port = port
b.host = host
b.port = port

s.brokers = append(s.brokers, &jocko.BrokerConn{
b.brokers = append(b.brokers, &jocko.BrokerConn{
Host: host,
Port: port,
RaftAddr: s.raftAddr,
ID: s.id,
RaftAddr: b.raftAddr,
ID: b.id,
})

if err = b.setupRaft(); err != nil {
return nil, err
}

return b, nil
}

func (b *Broker) setupRaft() error {
conf := raft.DefaultConfig()

addr, err := net.ResolveTCPAddr("tcp", s.raftAddr)
addr, err := net.ResolveTCPAddr("tcp", b.raftAddr)
if err != nil {
return errors.Wrap(err, "resolve bind addr failed")
}

if s.transport == nil {
s.transport, err = raft.NewTCPTransport(s.raftAddr, addr, 3, timeout, os.Stderr)
if b.raftTransport == nil {
b.raftTransport, err = raft.NewTCPTransport(b.raftAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
}
}

if err = os.MkdirAll(s.dataDir, 0755); err != nil {
path := filepath.Join(b.dataDir, raftState)
if err = os.MkdirAll(path, 0755); err != nil {
return errors.Wrap(err, "data directory mkdir failed")
}
s.peerStore = raft.NewJSONPeers(s.dataDir, s.transport)

if len(s.brokers) == 1 {
b.raftPeers = raft.NewJSONPeers(path, b.raftTransport)

if len(b.brokers) == 1 {
conf.EnableSingleNode = true
} else {
var peers []string
for _, b := range s.brokers {
for _, b := range b.brokers {
peers = append(peers, b.RaftAddr)
}
err = s.peerStore.SetPeers(peers)
err = b.raftPeers.SetPeers(peers)
if err != nil {
return errors.Wrap(err, "set peers failed")
}
}

snapshots, err := raft.NewFileSnapshotStore(s.dataDir, 2, os.Stderr)
snapshots, err := raft.NewFileSnapshotStore(path, 2, os.Stderr)
if err != nil {
return err
}

boltStore, err := raftboltdb.NewBoltStore(filepath.Join(s.dataDir, "store.db"))
boltStore, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return errors.Wrap(err, "bolt store failed")
}
s.store = boltStore
b.raftStore = boltStore

leaderCh := make(chan bool, 1)
b.raftLeaderCh = leaderCh
conf.NotifyCh = leaderCh

raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, s.transport)
raft, err := raft.NewRaft(conf, b, boltStore, boltStore, snapshots, b.raftPeers, b.raftTransport)
if err != nil {
if b.raftStore != nil {
b.raftStore.Close()
}
b.raftTransport.Close()
return errors.Wrap(err, "raft failed")
}
s.raft = raft
b.raft = raft

return nil
}

func (b *Broker) setupSerf(conf *serf.Config, eventCh chan serf.Event, serfSnapshot string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = fmt.Sprintf("%d", b.id)
conf.EventCh = eventCh
conf.EnableNameConflictResolution = false
return serf.Create(conf)
}

func (b *Broker) ID() int32 {
return b.id
}

func (b *Broker) Host() string {
return b.host
}

func (b *Broker) Port() string {
return b.port
}

func (b *Broker) Cluster() []*jocko.BrokerConn {
return b.brokers
}

func (s *Broker) Close() error {
return s.raft.Shutdown().Error()
}

func (s *Broker) IsController() (bool, error) {
return s.raft.State() == raft.Leader, nil
// IsController checks if this broker is the cluster controller
func (s *Broker) IsController() bool {
return s.raft.State() == raft.Leader
}

func (s *Broker) ControllerID() string {
Expand Down Expand Up @@ -288,9 +342,10 @@ func (s *Broker) Topics() []string {
return topics
}

func (s *Broker) Join(id int32, addr string) error {
f := s.raft.AddPeer(addr)
return f.Error()
// Join is used to have the broker join the gossip ring
// The target address should be another broker listening on the Serf address
func (s *Broker) Join(id int32, addrs ...string) (int, error) {
return s.serf.Join(addrs, true)
}

func (s *Broker) Apply(l *raft.Log) interface{} {
Expand Down
13 changes: 6 additions & 7 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestStoreOpen(t *testing.T) {
}

logger := simplelog.New(os.Stdout, simplelog.INFO, "jocko/broker_test")
s0 := New(
s0, err := New(
0,
OptionDataDir(filepath.Join(dataDir, "0")),
OptionLogDir(filepath.Join(dataDir, "0")),
Expand All @@ -49,13 +49,12 @@ func TestStoreOpen(t *testing.T) {
OptionBrokers([]*jocko.BrokerConn{b1, b2}),
OptionLogger(logger),
)
assert.NoError(t, err)
assert.NotNil(t, s0)

err := s0.Open()
assert.NoError(t, err)
defer s0.Close()

s1 := New(
s1, err := New(
1,
OptionDataDir(filepath.Join(dataDir, "1")),
OptionLogDir(filepath.Join(dataDir, "1")),
Expand All @@ -64,11 +63,11 @@ func TestStoreOpen(t *testing.T) {
OptionBrokers([]*jocko.BrokerConn{b0, b2}),
OptionLogger(logger),
)
err = s1.Open()
assert.NoError(t, err)

defer s1.Close()

s2 := New(
s2, err := New(
2,
OptionDataDir(filepath.Join(dataDir, "2")),
OptionLogDir(filepath.Join(dataDir, "2")),
Expand All @@ -77,8 +76,8 @@ func TestStoreOpen(t *testing.T) {
OptionBrokers([]*jocko.BrokerConn{b0, b1}),
OptionLogger(logger),
)
err = s2.Open()
assert.NoError(t, err)

defer s2.Close()

l, err := s0.WaitForLeader(10 * time.Second)
Expand Down
Loading

0 comments on commit 48fb383

Please sign in to comment.