Skip to content

Commit

Permalink
use raft for partition consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 21, 2016
1 parent af87a57 commit 89196da
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 44 deletions.
41 changes: 41 additions & 0 deletions cluster/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cluster

import (
"errors"

"github.com/travisjeffery/jocko/store"
)

type Controller struct {
store *store.Store
}

func (c *Controller) CreateTopic(topic string) error {
for _, t := range c.store.Topics() {
if t == topic {
return errors.New("topic exists already")
}
}

numPartitions, err := c.store.NumPartitions()
if err != nil {
return err
}
brokers, err := c.store.Brokers()
if err != nil {
return err
}

for i := 0; i < numPartitions; i++ {
broker := brokers[len(brokers)%i]
partition := store.TopicPartition{
Topic: topic,
Leader: broker,
PreferredLeader: broker,
Replicas: []string{broker},
}
if err := c.store.AddPartition(partition); err != nil {
return err
}
}
}
13 changes: 7 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ type MetadataRequest struct {
}

type Broker struct {
ID int `json:"id"`
ID string `json:"id"`
Host string `json:"host"`
Port int `json:"port"`
}

type PartitionMetadata struct {
ErrorCode int `json:"error_code"`
ID int `json:"id"`
Leader int `json:"leader"`
Replicas []int `json:"replicas"`
ErrorCode int `json:"error_code"`
ID string `json:"id"`
Leader string `json:"leader"`
Replicas []string `json:"replicas"`
}

type TopicMetadata struct {
Expand All @@ -46,7 +46,7 @@ type TopicMetadata struct {

type MetadataResponse struct {
Brokers []Broker `json:"brokers"`
ControllerID int `json:"controller_id"`
ControllerID string `json:"controller_id"`
TopicMetadata []TopicMetadata `json:"topic_metadata"`
}

Expand Down Expand Up @@ -135,6 +135,7 @@ func (s *Server) handleMetadata(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}

}

// Addr returns the address on which the Server is listening
Expand Down
206 changes: 172 additions & 34 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"encoding/json"
"fmt"
"io"
"net"
Expand All @@ -9,38 +10,74 @@ import (
"sync"
"time"

msgpack "gopkg.in/vmihailenco/msgpack.v2"

"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/pkg/errors"
)

const (
timeout = 10 * time.Second
timeout = 10 * time.Second
waitDelay = 100 * time.Millisecond
)

const (
addPartition CmdType = iota
)

type CmdType int

type command struct {
Op []byte `msgpack:"op"`
Key []byte `msgpack:"key"`
Value []byte `msgpack:"value"`
Cmd CmdType `json:"type"`
Data *json.RawMessage `json:"data"`
}

func newCommand(cmd CmdType, data interface{}) (c command, err error) {
var b []byte
b, err = json.Marshal(data)
if err != nil {
return c, err
}
r := json.RawMessage(b)
return command{
Cmd: cmd,
Data: &r,
}, nil
}

type TopicPartition struct {
Topic string `json:"topic"`
Partition int `json:"partition"`

// broker ids
Replicas []string `json:"replicas"`
Leader string `json:"leader"`
PreferredLeader string `json:"preferred_leader"`
}

type Options struct {
dataDir string
bindAddr string
numPartitions int
transport raft.Transport
}

type Store struct {
dataDir string
bindAddr string
Options

mu sync.Mutex

partitions []*TopicPartition
topics []string

peerStore raft.PeerStore
transport raft.Transport
raft *raft.Raft
store *raftboltdb.BoltStore
}

func New(dataDir, bindAddr string) *Store {
func New(options Options) *Store {
return &Store{
dataDir: dataDir,
bindAddr: bindAddr,
Options: options,
}
}

Expand All @@ -54,12 +91,14 @@ func (s *Store) Open() error {
return errors.Wrap(err, "resolve bind addr failed")
}

transport, err := raft.NewTCPTransport(s.bindAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failede")
if s.transport == nil {
s.transport, err = raft.NewTCPTransport(s.bindAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failede")
}
}

s.peerStore = raft.NewJSONPeers(s.dataDir, transport)
s.peerStore = raft.NewJSONPeers(s.dataDir, s.transport)

snapshots, err := raft.NewFileSnapshotStore(s.dataDir, 2, os.Stderr)
if err != nil {
Expand All @@ -72,49 +111,110 @@ func (s *Store) Open() error {
}
s.store = boltStore

raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, transport)
raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, s.transport)
if err != nil {
return errors.Wrap(err, "raft failed")
}

s.raft = raft

return nil
}

func (s *Store) Get(key []byte) ([]byte, error) {
// add cache
return s.store.Get(key)
func (s *Store) Close() error {
return s.raft.Shutdown().Error()
}

func (s *Store) IsController() bool {
return s.raft.State() == raft.Leader
}

func (s *Store) ControllerID() string {
return s.raft.Leader()
}

func (s *Store) BrokerID() string {
return s.transport.LocalAddr()
}

func (s *Store) Brokers() ([]string, error) {
return s.peerStore.Peers()
}

func (s *Store) Partitions() ([]*TopicPartition, error) {
return s.partitions, nil
}

func (s *Store) NumPartitions() (int, error) {
// TODO: need to get to get from store
return s.numPartitions, nil
}

func (s *Store) Set(key, value []byte) error {
c := &command{
Op: []byte("set"),
Key: key,
Value: value,
func (s *Store) AddPartition(partition TopicPartition) error {
return s.apply(addPartition, partition)
}

func (s *Store) apply(cmdType CmdType, data interface{}) error {
c, err := newCommand(cmdType, data)
if err != nil {
return err
}
b, err := msgpack.Marshal(c)
b, err := json.Marshal(c)
if err != nil {
return errors.Wrap(err, "msgpack failed")
return err
}
f := s.raft.Apply(b, timeout)
return f.Error()
}

func (s *Store) Delete(key []byte) error {
return nil
func (s *Store) addPartition(partition TopicPartition) {
s.mu.Lock()
defer s.mu.Unlock()
s.partitions = append(s.partitions, &partition)
}

func (s *Store) Join(addr []byte) error {
f := s.raft.AddPeer(string(addr))
func (s *Store) IsLeaderOfPartition(partition TopicPartition) bool {
// TODO: switch this to a map for perf
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.partitions {
if p.Topic == partition.Topic && p.Partition == partition.Partition {
if partition.Leader == s.BrokerID() {
return true
}
break
}
}
return false
}

func (s *Store) Topics() []string {
return s.topics
}

func (s *Store) Join(addr string) error {
f := s.raft.AddPeer(addr)
return f.Error()
}

func (s *Store) Apply(l *raft.Log) interface{} {
return nil
}
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}

switch c.Cmd {
case addPartition:
var p TopicPartition
b, err := c.Data.MarshalJSON()
if err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
if err := json.Unmarshal(b, &p); err != nil {
panic(errors.Wrap(err, "json unmarshal failed"))
}
s.addPartition(p)
}

func (s *Store) applySet(k, v []byte) interface{} {
return nil
}

Expand All @@ -134,3 +234,41 @@ func (f *FSMSnapshot) Release() {}
func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
return &FSMSnapshot{}, nil
}

func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
tick := time.NewTicker(waitDelay)
defer tick.Stop()

timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-tick.C:
l := s.raft.Leader()
if l != "" {
return l, nil
}
case <-timer.C:
return "", fmt.Errorf("timeout expired")
}
}
}

func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
tick := time.NewTicker(waitDelay)
defer tick.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-tick.C:
if s.raft.AppliedIndex() >= idx {
return nil
}
case <-timer.C:
return fmt.Errorf("timeout expired")
}
}
}
Loading

0 comments on commit 89196da

Please sign in to comment.