Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker tests #67 #68

Merged
merged 10 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ store.db
vendor/*
!vendor/vendor.json
.idea
broker/**/*.log
broker/**/*.index

262 changes: 216 additions & 46 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,94 @@ import (
"reflect"
"testing"

"github.com/pkg/errors"

"github.com/travisjeffery/jocko"
"github.com/travisjeffery/jocko/protocol"
"github.com/travisjeffery/jocko/testutil/mock"
"github.com/travisjeffery/simplelog"
)

func TestNew(t *testing.T) {
type args struct {
id int32
opts []BrokerFn
}
tests := []struct {
name string
args args
fields fields
want *Broker
wantErr bool
name string
fields fields
alterFields func(f *fields)
want *Broker
wantErr bool
}{
{
name: "bootstrap raft and serf",
wantErr: false,
fields: newFields(),
name: "new broker ok",
fields: newFields(),
},
{
name: "new broker port error",
wantErr: true,
alterFields: func(f *fields) {
f.brokerAddr = ""
},
},
{
name: "new broker raft port error",
wantErr: true,
alterFields: func(f *fields) {
f.raft = &mock.Raft{
AddrFn: func() string {
return ""
},
}
},
},
{
// TODO: Possible bug. If a logger is not set with logger option,
// line will fail if serf.Bootstrap returns error
name: "new broker serf bootstrap error",
wantErr: true,
alterFields: func(f *fields) {
f.serf = &mock.Serf{
BootstrapFn: func(n *jocko.ClusterMember, rCh chan<- *jocko.ClusterMember) error {
return errors.New("mock serf bootstrap error")
},
}
},
},
{
name: "new broker raft bootstrap error",
wantErr: true,
alterFields: func(f *fields) {
f.raft = &mock.Raft{
AddrFn: f.raft.AddrFn,
BootstrapFn: func(s jocko.Serf, sCh <-chan *jocko.ClusterMember, cCh chan<- jocko.RaftCommand) error {
return errors.New("mock raft bootstrap error")
},
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := New(tt.fields.id, Addr(tt.fields.addr), Serf(tt.fields.serf), Raft(tt.fields.raft), LogDir(tt.fields.logDir))
if (err != nil) != tt.wantErr {
tt.fields = newFields()
if tt.alterFields != nil {
tt.alterFields(&tt.fields)
}
tt.want = &Broker{
logger: tt.fields.logger,
id: tt.fields.id,
topicMap: tt.fields.topicMap,
replicators: tt.fields.replicators,
brokerAddr: tt.fields.brokerAddr,
logDir: tt.fields.logDir,
raft: tt.fields.raft,
serf: tt.fields.serf,
shutdownCh: tt.fields.shutdownCh,
shutdown: tt.fields.shutdown,
}

got, err := New(tt.fields.id, Addr(tt.fields.brokerAddr), Serf(tt.fields.serf), Raft(tt.fields.raft), Logger(tt.fields.logger), LogDir(tt.fields.logDir))

if err != nil && tt.wantErr {
return
} else if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return
}
Expand All @@ -43,6 +103,14 @@ func TestNew(t *testing.T) {
if !tt.fields.raft.BootstrapInvoked {
t.Error("expected raft bootstrap invoked; did not")
}
if got != nil && got.shutdownCh == nil {
t.Errorf("got.shutdownCh is nil")
} else if got != nil {
tt.want.shutdownCh = got.shutdownCh
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("New() = %v, want %v", got, tt.want)
}
})
}
}
Expand Down Expand Up @@ -107,21 +175,37 @@ func TestBroker_Join(t *testing.T) {
type args struct {
addrs []string
}
err := errors.New("mock serf join error")
tests := []struct {
name string
fields fields
args args
want protocol.Error
name string
fields fields
alterFields func(f *fields)
args args
want protocol.Error
}{
{
name: "joins with serf",
fields: newFields(),
args: args{addrs: []string{"localhost:9082"}},
want: protocol.ErrNone,
},
{
name: "join with serf error",
fields: newFields(),
alterFields: func(f *fields) {
f.serf.JoinFn = func(addrs ...string) (int, error) {
return -1, err
}
},
args: args{addrs: []string{"localhost:9082"}},
want: protocol.ErrUnknown.WithErr(err),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.alterFields != nil {
tt.alterFields(&tt.fields)
}
b := &Broker{
logger: tt.fields.logger,
id: tt.fields.id,
Expand Down Expand Up @@ -366,7 +450,8 @@ func TestBroker_topics(t *testing.T) {
func TestBroker_partition(t *testing.T) {
f := newFields()
f.topicMap = map[string][]*jocko.Partition{
"the-topic": []*jocko.Partition{{ID: 1}},
"the-topic": []*jocko.Partition{{ID: 1}},
"empty-topic": []*jocko.Partition{},
}
type args struct {
topic string
Expand Down Expand Up @@ -399,6 +484,16 @@ func TestBroker_partition(t *testing.T) {
want: nil,
wanterr: protocol.ErrUnknownTopicOrPartition,
},
{
name: "empty partitions",
fields: f,
args: args{
topic: "empty-topic",
partition: 1,
},
want: nil,
wanterr: protocol.ErrUnknownTopicOrPartition,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -553,30 +648,82 @@ func TestBroker_clusterMember(t *testing.T) {
}

func TestBroker_startReplica(t *testing.T) {
f := newFields()
type args struct {
partition *jocko.Partition
}
partition := &jocko.Partition{
Topic: "the-topic",
ID: 1,
Topic: "the-topic",
ID: 1,
Leader: 1,
}
tests := []struct {
name string
fields fields
args args
want protocol.Error
name string
fields fields
alterFields func(f *fields)
args args
want protocol.Error
}{
{
name: "started replica",
fields: f,
name: "started replica as leader",
args: args{
partition: partition,
},
want: protocol.ErrNone,
},
{
name: "started replica as follower",
args: args{
partition: &jocko.Partition{
ID: 1,
Topic: "replica-topic",
Replicas: []int32{1},
Leader: 2,
},
},
want: protocol.ErrNone,
},
{
name: "started replica with existing topic",
alterFields: func(f *fields) {
f.topicMap["existing-topic"] = []*jocko.Partition{
{
ID: 1,
Topic: "existing-topic",
},
}
},
args: args{
partition: &jocko.Partition{ID: 2, Topic: "existing-topic"},
},
want: protocol.ErrNone,
},
// TODO: Possible bug. If a duplicate partition is added,
// the partition will be appended to the partitions as a duplicate.
// {
// name: "started replica with dupe partition",
// fields: f,
// args: args{
// partition: &jocko.Partition{ID: 1, Topic: "existing-topic"},
// },
// want: protocol.ErrNone,
// },
{
name: "started replica with commitlog error",
alterFields: func(f *fields) {
f.logDir = ""
},
args: args{
partition: &jocko.Partition{Leader: 1},
},
want: protocol.ErrUnknown.WithErr(errors.New("mkdir failed: mkdir /0: permission denied")),
},
}
for _, tt := range tests {
fields := newFields()
if tt.alterFields != nil {
tt.alterFields(&fields)
}
tt.fields = fields
t.Run(tt.name, func(t *testing.T) {
b := &Broker{
logger: tt.fields.logger,
Expand All @@ -590,13 +737,20 @@ func TestBroker_startReplica(t *testing.T) {
shutdownCh: tt.fields.shutdownCh,
shutdown: tt.fields.shutdown,
}
if got := b.startReplica(tt.args.partition); !reflect.DeepEqual(got, tt.want) {
if got := b.startReplica(tt.args.partition); got.Error() != tt.want.Error() {
t.Errorf("Broker.startReplica() = %v, want %v", got, tt.want)
}
got, err := b.partition(partition.Topic, partition.ID)
if !reflect.DeepEqual(got, partition) {
got, err := b.partition(tt.args.partition.Topic, tt.args.partition.ID)
if !reflect.DeepEqual(got, tt.args.partition) {
t.Errorf("Broker.partition() = %v, want %v", got, partition)
}
parts := map[int32]*jocko.Partition{}
for _, p := range b.topicMap[tt.args.partition.Topic] {
if _, ok := parts[p.ID]; ok {
t.Errorf("Broker.topicPartition contains dupes, dupe %v", p)
}
parts[p.ID] = p
}
if err != protocol.ErrNone {
t.Errorf("Broker.partition() err = %v, want %v", err, protocol.ErrNone)
}
Expand Down Expand Up @@ -901,7 +1055,6 @@ type fields struct {
id int32
serf *mock.Serf
raft *mock.Raft
addr string
logger *simplelog.Logger
topicMap map[string][]*jocko.Partition
replicators map[*jocko.Partition]*Replicator
Expand All @@ -913,31 +1066,48 @@ type fields struct {

func newFields() fields {
serf := &mock.Serf{
BootstrapFn: func(n *jocko.ClusterMember, rCh chan<- *jocko.ClusterMember) error {
if n == nil {
return errors.New("*jocko.ClusterMember is nil")
}
if rCh == nil {
return errors.New("chan<- *jocko.ClusterMember is nil")
}
return nil
},
JoinFn: func(addrs ...string) (int, error) {
return 1, nil
},
BootstrapFn: func(node *jocko.ClusterMember, reconcileCh chan<- *jocko.ClusterMember) error {
return nil
},
MemberFn: func(id int32) *jocko.ClusterMember {
return nil
MemberFn: func(memberID int32) *jocko.ClusterMember {
return &jocko.ClusterMember{ID: 2}
},
}
raft := &mock.Raft{
BootstrapFn: func(serf jocko.Serf, serfEventCh <-chan *jocko.ClusterMember, commandCh chan<- jocko.RaftCommand) error {
return nil
},
AddrFn: func() string {
return "localhost:9093"
},
BootstrapFn: func(s jocko.Serf, sCh <-chan *jocko.ClusterMember, cCh chan<- jocko.RaftCommand) error {
if s == nil {
return errors.New("jocko.Serf is nil")
}
if sCh == nil {
return errors.New("<-chan *jocko.ClusterMember is nil")
}
if cCh == nil {
return errors.New("chan<- jocko.RaftCommand is nil")
}
return nil
},
}
return fields{
topicMap: make(map[string][]*jocko.Partition),
serf: serf,
raft: raft,
addr: "localhost:9092",
logDir: "/tmp/jocko",
id: 1,
topicMap: make(map[string][]*jocko.Partition),
replicators: make(map[*jocko.Partition]*Replicator),
logger: simplelog.New(nopReaderWriter{}, simplelog.DEBUG, "TestNew"),
logDir: "/tmp/jocko",
serf: serf,
raft: raft,
brokerAddr: "localhost:9092",
id: 1,
}
}

Expand Down