diff --git a/.gitignore b/.gitignore index 5ce7f227..67306f50 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,5 @@ store.db vendor/* !vendor/vendor.json .idea -broker/**/*.log -broker/**/*.index - +*.log +*.index diff --git a/broker/broker.go b/broker/broker.go index d8cfaede..f58fea48 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,7 +18,8 @@ import ( ) var ( - ErrTopicExists = errors.New("topic exists already") + ErrTopicExists = errors.New("topic exists already") + ErrInvalidArgument = errors.New("no logger set") ) // Broker represents a broker in a Jocko cluster, like a broker in a Kafka cluster. @@ -53,6 +54,10 @@ func New(id int32, opts ...BrokerFn) (*Broker, error) { o(b) } + if b.logger == nil { + return nil, ErrInvalidArgument + } + port, err := addrPort(b.brokerAddr) if err != nil { return nil, err diff --git a/broker/broker_test.go b/broker/broker_test.go index af353e53..c21cb8ba 100644 --- a/broker/broker_test.go +++ b/broker/broker_test.go @@ -16,27 +16,32 @@ import ( func TestNew(t *testing.T) { tests := []struct { - name string - fields fields - alterFields func(f *fields) - want *Broker - wantErr bool + name string + fields fields + setFields func(f *fields) + wantErr bool }{ { - name: "new broker ok", - fields: newFields(), + name: "broker ok", + }, + { + name: "no logger error", + wantErr: true, + setFields: func(f *fields) { + f.logger = nil + }, }, { - name: "new broker port error", + name: "no broker addr error", wantErr: true, - alterFields: func(f *fields) { + setFields: func(f *fields) { f.brokerAddr = "" }, }, { - name: "new broker raft port error", + name: "no raft addr error", wantErr: true, - alterFields: func(f *fields) { + setFields: func(f *fields) { f.raft = &mock.Raft{ AddrFn: func() string { return "" @@ -45,11 +50,9 @@ func TestNew(t *testing.T) { }, }, { - // TODO: Possible bug. If a logger is not set with logger option, - // line will fail if serf.Bootstrap returns error - name: "new broker serf bootstrap error", + name: "serf bootstrap error", wantErr: true, - alterFields: func(f *fields) { + setFields: func(f *fields) { f.serf = &mock.Serf{ BootstrapFn: func(n *jocko.ClusterMember, rCh chan<- *jocko.ClusterMember) error { return errors.New("mock serf bootstrap error") @@ -58,9 +61,9 @@ func TestNew(t *testing.T) { }, }, { - name: "new broker raft bootstrap error", + name: "raft bootstrap error", wantErr: true, - alterFields: func(f *fields) { + setFields: func(f *fields) { f.raft = &mock.Raft{ AddrFn: f.raft.AddrFn, BootstrapFn: func(s jocko.Serf, sCh <-chan *jocko.ClusterMember, cCh chan<- jocko.RaftCommand) error { @@ -73,10 +76,10 @@ func TestNew(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.fields = newFields() - if tt.alterFields != nil { - tt.alterFields(&tt.fields) + if tt.setFields != nil { + tt.setFields(&tt.fields) } - tt.want = &Broker{ + want := &Broker{ logger: tt.fields.logger, id: tt.fields.id, topicMap: tt.fields.topicMap, @@ -106,10 +109,10 @@ func TestNew(t *testing.T) { if got != nil && got.shutdownCh == nil { t.Errorf("got.shutdownCh is nil") } else if got != nil { - tt.want.shutdownCh = got.shutdownCh + want.shutdownCh = got.shutdownCh } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("New() = %v, want %v", got, tt.want) + if !reflect.DeepEqual(got, want) { + t.Errorf("New() = %v, want %v", got, want) } }) } @@ -177,22 +180,22 @@ func TestBroker_Join(t *testing.T) { } err := errors.New("mock serf join error") tests := []struct { - name string - fields fields - alterFields func(f *fields) - args args - want protocol.Error + name string + fields fields + setFields func(f *fields) + args args + want protocol.Error }{ { - name: "joins with serf", + name: "ok", fields: newFields(), args: args{addrs: []string{"localhost:9082"}}, want: protocol.ErrNone, }, { - name: "join with serf error", + name: "serf errr", fields: newFields(), - alterFields: func(f *fields) { + setFields: func(f *fields) { f.serf.JoinFn = func(addrs ...string) (int, error) { return -1, err } @@ -203,8 +206,8 @@ func TestBroker_Join(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.alterFields != nil { - tt.alterFields(&tt.fields) + if tt.setFields != nil { + tt.setFields(&tt.fields) } b := &Broker{ logger: tt.fields.logger, @@ -657,11 +660,10 @@ func TestBroker_startReplica(t *testing.T) { Leader: 1, } tests := []struct { - name string - fields fields - alterFields func(f *fields) - args args - want protocol.Error + name string + setFields func(f *fields) + args args + want protocol.Error }{ { name: "started replica as leader", @@ -684,7 +686,7 @@ func TestBroker_startReplica(t *testing.T) { }, { name: "started replica with existing topic", - alterFields: func(f *fields) { + setFields: func(f *fields) { f.topicMap["existing-topic"] = []*jocko.Partition{ { ID: 1, @@ -709,7 +711,7 @@ func TestBroker_startReplica(t *testing.T) { // }, { name: "started replica with commitlog error", - alterFields: func(f *fields) { + setFields: func(f *fields) { f.logDir = "" }, args: args{ @@ -720,22 +722,21 @@ func TestBroker_startReplica(t *testing.T) { } for _, tt := range tests { fields := newFields() - if tt.alterFields != nil { - tt.alterFields(&fields) + if tt.setFields != nil { + tt.setFields(&fields) } - tt.fields = fields t.Run(tt.name, func(t *testing.T) { b := &Broker{ - logger: tt.fields.logger, - id: tt.fields.id, - topicMap: tt.fields.topicMap, - replicators: tt.fields.replicators, - brokerAddr: tt.fields.brokerAddr, - logDir: tt.fields.logDir, - raft: tt.fields.raft, - serf: tt.fields.serf, - shutdownCh: tt.fields.shutdownCh, - shutdown: tt.fields.shutdown, + logger: fields.logger, + id: fields.id, + topicMap: fields.topicMap, + replicators: fields.replicators, + brokerAddr: fields.brokerAddr, + logDir: fields.logDir, + raft: fields.raft, + serf: fields.serf, + shutdownCh: fields.shutdownCh, + shutdown: fields.shutdown, } if got := b.startReplica(tt.args.partition); got.Error() != tt.want.Error() { t.Errorf("Broker.startReplica() = %v, want %v", got, tt.want)