diff --git a/cluster/controller.go b/cluster/controller.go new file mode 100644 index 00000000..f8f0a777 --- /dev/null +++ b/cluster/controller.go @@ -0,0 +1,41 @@ +package cluster + +import ( + "errors" + + "github.com/travisjeffery/jocko/store" +) + +type Controller struct { + store *store.Store +} + +func (c *Controller) CreateTopic(topic string) error { + for _, t := range c.store.Topics() { + if t == topic { + return errors.New("topic exists already") + } + } + + numPartitions, err := c.store.NumPartitions() + if err != nil { + return err + } + brokers, err := c.store.Brokers() + if err != nil { + return err + } + + for i := 0; i < numPartitions; i++ { + broker := brokers[len(brokers)%i] + partition := store.TopicPartition{ + Topic: topic, + Leader: broker, + PreferredLeader: broker, + Replicas: []string{broker}, + } + if err := c.store.AddPartition(partition); err != nil { + return err + } + } +} diff --git a/server/server.go b/server/server.go index 1133a190..1585ff99 100644 --- a/server/server.go +++ b/server/server.go @@ -26,16 +26,16 @@ type MetadataRequest struct { } type Broker struct { - ID int `json:"id"` + ID string `json:"id"` Host string `json:"host"` Port int `json:"port"` } type PartitionMetadata struct { - ErrorCode int `json:"error_code"` - ID int `json:"id"` - Leader int `json:"leader"` - Replicas []int `json:"replicas"` + ErrorCode int `json:"error_code"` + ID string `json:"id"` + Leader string `json:"leader"` + Replicas []string `json:"replicas"` } type TopicMetadata struct { @@ -46,7 +46,7 @@ type TopicMetadata struct { type MetadataResponse struct { Brokers []Broker `json:"brokers"` - ControllerID int `json:"controller_id"` + ControllerID string `json:"controller_id"` TopicMetadata []TopicMetadata `json:"topic_metadata"` } @@ -135,6 +135,7 @@ func (s *Server) handleMetadata(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + } // Addr returns the address on which the Server is listening diff --git a/store/store.go b/store/store.go index ad149a48..de97cd8d 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,7 @@ package store import ( + "encoding/json" "fmt" "io" "net" @@ -9,38 +10,74 @@ import ( "sync" "time" - msgpack "gopkg.in/vmihailenco/msgpack.v2" - "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/pkg/errors" ) const ( - timeout = 10 * time.Second + timeout = 10 * time.Second + waitDelay = 100 * time.Millisecond +) + +const ( + addPartition CmdType = iota ) +type CmdType int + type command struct { - Op []byte `msgpack:"op"` - Key []byte `msgpack:"key"` - Value []byte `msgpack:"value"` + Cmd CmdType `json:"type"` + Data *json.RawMessage `json:"data"` +} + +func newCommand(cmd CmdType, data interface{}) (c command, err error) { + var b []byte + b, err = json.Marshal(data) + if err != nil { + return c, err + } + r := json.RawMessage(b) + return command{ + Cmd: cmd, + Data: &r, + }, nil +} + +type TopicPartition struct { + Topic string `json:"topic"` + Partition int `json:"partition"` + + // broker ids + Replicas []string `json:"replicas"` + Leader string `json:"leader"` + PreferredLeader string `json:"preferred_leader"` +} + +type Options struct { + dataDir string + bindAddr string + numPartitions int + transport raft.Transport } type Store struct { - dataDir string - bindAddr string + Options mu sync.Mutex + partitions []*TopicPartition + topics []string + peerStore raft.PeerStore + transport raft.Transport raft *raft.Raft store *raftboltdb.BoltStore } -func New(dataDir, bindAddr string) *Store { +func New(options Options) *Store { return &Store{ - dataDir: dataDir, - bindAddr: bindAddr, + Options: options, } } @@ -54,12 +91,14 @@ func (s *Store) Open() error { return errors.Wrap(err, "resolve bind addr failed") } - transport, err := raft.NewTCPTransport(s.bindAddr, addr, 3, timeout, os.Stderr) - if err != nil { - return errors.Wrap(err, "tcp transport failede") + if s.transport == nil { + s.transport, err = raft.NewTCPTransport(s.bindAddr, addr, 3, timeout, os.Stderr) + if err != nil { + return errors.Wrap(err, "tcp transport failede") + } } - s.peerStore = raft.NewJSONPeers(s.dataDir, transport) + s.peerStore = raft.NewJSONPeers(s.dataDir, s.transport) snapshots, err := raft.NewFileSnapshotStore(s.dataDir, 2, os.Stderr) if err != nil { @@ -72,49 +111,110 @@ func (s *Store) Open() error { } s.store = boltStore - raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, transport) + raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, s.transport) if err != nil { return errors.Wrap(err, "raft failed") } - s.raft = raft return nil } -func (s *Store) Get(key []byte) ([]byte, error) { - // add cache - return s.store.Get(key) +func (s *Store) Close() error { + return s.raft.Shutdown().Error() +} + +func (s *Store) IsController() bool { + return s.raft.State() == raft.Leader +} + +func (s *Store) ControllerID() string { + return s.raft.Leader() +} + +func (s *Store) BrokerID() string { + return s.transport.LocalAddr() +} + +func (s *Store) Brokers() ([]string, error) { + return s.peerStore.Peers() +} + +func (s *Store) Partitions() ([]*TopicPartition, error) { + return s.partitions, nil +} + +func (s *Store) NumPartitions() (int, error) { + // TODO: need to get to get from store + return s.numPartitions, nil } -func (s *Store) Set(key, value []byte) error { - c := &command{ - Op: []byte("set"), - Key: key, - Value: value, +func (s *Store) AddPartition(partition TopicPartition) error { + return s.apply(addPartition, partition) +} + +func (s *Store) apply(cmdType CmdType, data interface{}) error { + c, err := newCommand(cmdType, data) + if err != nil { + return err } - b, err := msgpack.Marshal(c) + b, err := json.Marshal(c) if err != nil { - return errors.Wrap(err, "msgpack failed") + return err } f := s.raft.Apply(b, timeout) return f.Error() } -func (s *Store) Delete(key []byte) error { - return nil +func (s *Store) addPartition(partition TopicPartition) { + s.mu.Lock() + defer s.mu.Unlock() + s.partitions = append(s.partitions, &partition) } -func (s *Store) Join(addr []byte) error { - f := s.raft.AddPeer(string(addr)) +func (s *Store) IsLeaderOfPartition(partition TopicPartition) bool { + // TODO: switch this to a map for perf + s.mu.Lock() + defer s.mu.Unlock() + for _, p := range s.partitions { + if p.Topic == partition.Topic && p.Partition == partition.Partition { + if partition.Leader == s.BrokerID() { + return true + } + break + } + } + return false +} + +func (s *Store) Topics() []string { + return s.topics +} + +func (s *Store) Join(addr string) error { + f := s.raft.AddPeer(addr) return f.Error() } func (s *Store) Apply(l *raft.Log) interface{} { - return nil -} + var c command + if err := json.Unmarshal(l.Data, &c); err != nil { + panic(errors.Wrap(err, "json unmarshal failed")) + } + + switch c.Cmd { + case addPartition: + var p TopicPartition + b, err := c.Data.MarshalJSON() + if err != nil { + panic(errors.Wrap(err, "json unmarshal failed")) + } + if err := json.Unmarshal(b, &p); err != nil { + panic(errors.Wrap(err, "json unmarshal failed")) + } + s.addPartition(p) + } -func (s *Store) applySet(k, v []byte) interface{} { return nil } @@ -134,3 +234,41 @@ func (f *FSMSnapshot) Release() {} func (s *Store) Snapshot() (raft.FSMSnapshot, error) { return &FSMSnapshot{}, nil } + +func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { + tick := time.NewTicker(waitDelay) + defer tick.Stop() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-tick.C: + l := s.raft.Leader() + if l != "" { + return l, nil + } + case <-timer.C: + return "", fmt.Errorf("timeout expired") + } + } +} + +func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { + tick := time.NewTicker(waitDelay) + defer tick.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-tick.C: + if s.raft.AppliedIndex() >= idx { + return nil + } + case <-timer.C: + return fmt.Errorf("timeout expired") + } + } +} diff --git a/store/store_test.go b/store/store_test.go index a40386ce..98fbbae9 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -11,9 +12,58 @@ import ( func TestStoreOpen(t *testing.T) { dataDir, _ := ioutil.TempDir("", "storetest") defer os.RemoveAll(dataDir) - bindAddr := "127.0.0.1:0" + bindAddr := "127.0.0.1:4000" - s := New(dataDir, bindAddr) - assert.NotNil(t, s) - assert.NoError(t, s.Open()) + s0 := New(Options{ + dataDir: dataDir, + bindAddr: bindAddr, + numPartitions: 2, + }) + assert.NotNil(t, s0) + + err := s0.Open() + assert.NoError(t, err) + defer s0.Close() + + _, err = s0.WaitForLeader(10 * time.Second) + assert.NoError(t, err) + + dataDir, _ = ioutil.TempDir("", "storetest") + defer os.RemoveAll(dataDir) + bindAddr = "127.0.0.1:4001" + s1 := New(Options{ + dataDir: dataDir, + bindAddr: bindAddr, + numPartitions: 2, + }) + err = s1.Open() + assert.NoError(t, err) + defer s1.Close() + + err = s0.Join(s1.BrokerID()) + assert.NoError(t, err) + + tp := TopicPartition{ + Topic: "test", + Partition: 0, + Leader: s0.BrokerID(), + PreferredLeader: s0.BrokerID(), + Replicas: []string{s0.BrokerID()}, + } + + err = s0.AddPartition(tp) + assert.NoError(t, err) + + isLeader := s0.IsLeaderOfPartition(tp) + assert.True(t, isLeader) + + err = s1.WaitForAppliedIndex(2, 10*time.Second) + assert.NoError(t, err) + + ps, err := s1.Partitions() + assert.NoError(t, err) + for _, p := range ps { + assert.Equal(t, tp.Topic, p.Topic) + assert.Equal(t, tp.Leader, p.Leader) + } }