Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Dec 20, 2016
1 parent d0a4ecf commit bd29860
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 384 deletions.
201 changes: 121 additions & 80 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"io"
"net"
"os"
"path"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/pkg/errors"
"github.com/travisjeffery/jocko/cluster"
"github.com/travisjeffery/jocko/commitlog"
"github.com/travisjeffery/jocko/jocko"
"github.com/travisjeffery/simplelog"
)

Expand All @@ -21,6 +23,8 @@ const (
waitDelay = 100 * time.Millisecond
)

type CmdType int

const (
addPartition CmdType = iota
addBroker
Expand All @@ -32,8 +36,6 @@ var (
ErrTopicExists = errors.New("topic exists already")
)

type CmdType int

type command struct {
Cmd CmdType `json:"type"`
Data *json.RawMessage `json:"data"`
Expand All @@ -52,89 +54,101 @@ func newCommand(cmd CmdType, data interface{}) (c command, err error) {
}, nil
}

type Options struct {
DataDir string
RaftAddr string
TCPAddr string
LogDir string
ID int
Logger *simplelog.Logger
type Broker struct {
mu sync.RWMutex
logger *simplelog.Logger

id int32
host string
port string
topics map[string][]*jocko.Partition
brokers []*jocko.BrokerConn

DefaultNumPartitions int
Brokers []*cluster.Broker
dataDir string
raftAddr string
tcpAddr string
logDir string

peerStore raft.PeerStore
transport raft.Transport
raft *raft.Raft
store *raftboltdb.BoltStore
}

type broker struct {
ID int `json:"id"`
Host string `json:"host"`
Port int `json:"port"`
func New(id int32,
dataDir string,
logDir string,
raftAddr string,
tcpAddr string,
brokers []*jocko.BrokerConn, logger *simplelog.Logger) *Broker {
return &Broker{
id: id,
dataDir: dataDir,
logDir: logDir,
raftAddr: raftAddr,
tcpAddr: tcpAddr,
topics: make(map[string][]*jocko.Partition),
brokers: brokers,
logger: logger,
}
}

type Broker struct {
Options

mu sync.RWMutex

// state for fsm
topics map[string][]*cluster.TopicPartition
peers []*cluster.Broker

peerStore raft.PeerStore
transport raft.Transport
func (b *Broker) ID() int32 {
return b.id
}

raft *raft.Raft
store *raftboltdb.BoltStore
func (b *Broker) Host() string {
return b.host
}

func New(options Options) *Broker {
if options.DefaultNumPartitions == 0 {
options.DefaultNumPartitions = 4
}
func (b *Broker) Port() string {
return b.port
}

return &Broker{
topics: make(map[string][]*cluster.TopicPartition),
Options: options,
}
func (b *Broker) Cluster() []*jocko.BrokerConn {
return b.brokers
}

func (s *Broker) Open() error {
host, port, err := net.SplitHostPort(s.TCPAddr)
host, port, err := net.SplitHostPort(s.tcpAddr)
if err != nil {
return err
}
s.Brokers = append(s.Brokers, &cluster.Broker{

s.host = host
s.port = port

s.brokers = append(s.brokers, &jocko.BrokerConn{
Host: host,
Port: port,
RaftAddr: s.RaftAddr,
ID: s.ID,
RaftAddr: s.raftAddr,
ID: s.id,
})

conf := raft.DefaultConfig()

addr, err := net.ResolveTCPAddr("tcp", s.RaftAddr)
addr, err := net.ResolveTCPAddr("tcp", s.raftAddr)
if err != nil {
return errors.Wrap(err, "resolve bind addr failed")
}

if s.transport == nil {
s.transport, err = raft.NewTCPTransport(s.RaftAddr, addr, 3, timeout, os.Stderr)
s.transport, err = raft.NewTCPTransport(s.raftAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failed")
}
}

if err = os.MkdirAll(s.DataDir, 0755); err != nil {
if err = os.MkdirAll(s.dataDir, 0755); err != nil {
return errors.Wrap(err, "data directory mkdir failed")
}
s.peerStore = raft.NewJSONPeers(s.DataDir, s.transport)
s.peerStore = raft.NewJSONPeers(s.dataDir, s.transport)

if len(s.Brokers) == 1 {
if len(s.brokers) == 1 {
conf.EnableSingleNode = true
} else {
var peers []string
for _, b := range s.Brokers {
for _, b := range s.brokers {
peers = append(peers, b.RaftAddr)
}
err = s.peerStore.SetPeers(peers)
Expand All @@ -143,12 +157,12 @@ func (s *Broker) Open() error {
}
}

snapshots, err := raft.NewFileSnapshotStore(s.DataDir, 2, os.Stderr)
snapshots, err := raft.NewFileSnapshotStore(s.dataDir, 2, os.Stderr)
if err != nil {
return err
}

boltStore, err := raftboltdb.NewBoltStore(filepath.Join(s.DataDir, "store.db"))
boltStore, err := raftboltdb.NewBoltStore(filepath.Join(s.dataDir, "store.db"))
if err != nil {
return errors.Wrap(err, "bolt store failed")
}
Expand All @@ -175,30 +189,30 @@ func (s *Broker) ControllerID() string {
return s.raft.Leader()
}

func (s *Broker) PartitionsForTopic(topic string) (found []*cluster.TopicPartition, err error) {
func (s *Broker) TopicPartitions(topic string) (found []*jocko.Partition, err error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.topics[topic], nil
}

func (s *Broker) Partition(topic string, partition int32) (*cluster.TopicPartition, error) {
found, err := s.PartitionsForTopic(topic)
func (s *Broker) Partition(topic string, partition int32) (*jocko.Partition, error) {
found, err := s.TopicPartitions(topic)
if err != nil {
return nil, err
}
for _, f := range found {
if f.Partition == partition {
if f.ID == partition {
return f, nil
}
}
return nil, errors.New("partition not found")
}

func (s *Broker) AddPartition(partition cluster.TopicPartition) error {
func (s *Broker) AddPartition(partition *jocko.Partition) error {
return s.apply(addPartition, partition)
}

func (s *Broker) AddBroker(broker cluster.Broker) error {
func (s *Broker) AddBroker(broker jocko.BrokerConn) error {
return s.apply(addBroker, broker)
}

Expand All @@ -215,33 +229,62 @@ func (s *Broker) apply(cmdType CmdType, data interface{}) error {
return f.Error()
}

func (s *Broker) addPartition(partition *cluster.TopicPartition) {
func (s *Broker) BecomeLeader(pid int32) error {
return nil
}

func (s *Broker) BecomeFollower(topic string, pid int32, leader int32) error {
return nil
}

func (s *Broker) Broker(id int32) *jocko.BrokerConn {
for _, b := range s.brokers {
if b.ID == id {
return b
}
}
return nil
}

func (s *Broker) addPartition(partition *jocko.Partition) {
s.mu.RLock()
if v, ok := s.topics[partition.Topic]; ok {
s.topics[partition.Topic] = append(v, partition)
} else {
s.topics[partition.Topic] = []*cluster.TopicPartition{partition}
s.topics[partition.Topic] = []*jocko.Partition{partition}
}
s.mu.RUnlock()
if s.IsLeaderOfPartition(partition) {
if err := partition.OpenCommitLog(s.LogDir); err != nil {
if s.IsLeaderOfPartition(partition.Topic, partition.ID, partition.LeaderID()) {
commitLog, err := commitlog.New(commitlog.Options{
Path: path.Join(s.logDir, partition.String()),
MaxSegmentBytes: 1024,
MaxLogBytes: -1,
})
if err != nil {
panic(err)
}
if err = commitLog.Init(); err != nil {
panic(err)
}
if err = commitLog.Open(); err != nil {
panic(err)
}
partition.CommitLog = commitLog
}
}

func (s *Broker) addBroker(broker *cluster.Broker) {
func (s *Broker) addBroker(broker *jocko.BrokerConn) {
s.mu.Lock()
defer s.mu.Unlock()
s.Brokers = append(s.Brokers, broker)
s.brokers = append(s.brokers, broker)
}

func (s *Broker) IsLeaderOfPartition(partition *cluster.TopicPartition) bool {
func (s *Broker) IsLeaderOfPartition(topic string, pid int32, lid int32) bool {
s.mu.RLock()
defer s.mu.RUnlock()
for _, p := range s.topics[partition.Topic] {
if p.Partition == partition.Partition {
if partition.Leader.ID == s.ID {
for _, p := range s.topics[topic] {
if p.ID == pid {
if lid == s.id {
return true
}
break
Expand All @@ -258,7 +301,7 @@ func (s *Broker) Topics() []string {
return topics
}

func (s *Broker) Join(id int, addr string) error {
func (s *Broker) Join(id int32, addr string) error {
f := s.raft.AddPeer(addr)
return f.Error()
}
Expand All @@ -268,10 +311,10 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.Logger.Debug("broker/apply cmd [%d]", c.Cmd)
s.logger.Debug("broker/apply cmd [%d]", c.Cmd)
switch c.Cmd {
case addBroker:
broker := new(cluster.Broker)
broker := new(jocko.BrokerConn)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
Expand All @@ -281,7 +324,7 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
}
s.addBroker(broker)
case addPartition:
p := new(cluster.TopicPartition)
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
Expand All @@ -291,7 +334,7 @@ func (s *Broker) Apply(l *raft.Log) interface{} {
}
s.addPartition(p)
case deleteTopic:
p := new(cluster.TopicPartition)
p := new(jocko.Partition)
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
Expand All @@ -313,15 +356,15 @@ func (s *Broker) CreateTopic(topic string, partitions int32) error {
return ErrTopicExists
}
}
brokers := s.Brokers
brokers := s.brokers
for i := 0; i < int(partitions); i++ {
broker := brokers[i%len(brokers)]
partition := cluster.TopicPartition{
Partition: int32(i),
partition := &jocko.Partition{
Topic: topic,
ID: int32(i),
Leader: broker,
PreferredLeader: broker,
Replicas: []*cluster.Broker{broker},
Replicas: []*jocko.BrokerConn{broker},
}
if err := s.AddPartition(partition); err != nil {
return err
Expand All @@ -342,18 +385,16 @@ func (s *Broker) DeleteTopics(topics ...string) error {
}

func (s *Broker) DeleteTopic(topic string) error {
return s.apply(deleteTopic, cluster.TopicPartition{
Topic: topic,
})
return s.apply(deleteTopic, &jocko.Partition{Topic: topic})
}

func (s *Broker) deleteTopic(tp *cluster.TopicPartition) error {
partitions, err := s.PartitionsForTopic(tp.Topic)
func (s *Broker) deleteTopic(tp *jocko.Partition) error {
partitions, err := s.TopicPartitions(tp.Topic)
if err != nil {
return err
}
for _, p := range partitions {
if err := p.CommitLog.DeleteAll(); err != nil {
if err := p.Delete(); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit bd29860

Please sign in to comment.