Skip to content

Commit

Permalink
Added APIVersions handler
Browse files Browse the repository at this point in the history
Solved bug in SetupIndex function that prevents the index to be set up

Relates to #40 and #37
  • Loading branch information
mrcrgl committed Aug 18, 2017
1 parent 54ea894 commit 34c3fde
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 22 deletions.
12 changes: 11 additions & 1 deletion commitlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func NewSegment(path string, baseOffset int64, maxBytes int64) (*Segment, error)
return s, err
}

// SetupIndex creates and initializes an index.
// Initialization is:
// - Sanity check of the loaded index
// - Truncates the index (clears it)
// - Reads the log file from the beginning and re-initializes the index
func (s *Segment) SetupIndex(path string) (err error) {
indexPath := filepath.Join(path, fmt.Sprintf(indexNameFormat, s.BaseOffset))
s.Index, err = newIndex(options{
Expand All @@ -62,7 +67,7 @@ func (s *Segment) SetupIndex(path string) (err error) {
if err != nil {
return err
}
if err = s.Index.SanityCheck(); err == nil {
if err = s.Index.SanityCheck(); err != nil {
return err
}
if err := s.Index.TruncateEntries(0); err != nil {
Expand Down Expand Up @@ -94,6 +99,9 @@ func (s *Segment) SetupIndex(path string) (err error) {
return err
}

// Reset the buffer to not get an overflow
b.Truncate(0)

err = s.Index.WriteEntry(Entry{
Offset: s.NextOffset,
Position: s.Position,
Expand All @@ -118,6 +126,8 @@ func (s *Segment) IsFull() bool {
return s.Position >= s.maxBytes
}

// Write writes a byte slice to the log at the current position.
// It increments the offset as well as sets the position to the new tail.
func (s *Segment) Write(p []byte) (n int, err error) {
s.Lock()
defer s.Unlock()
Expand Down
42 changes: 21 additions & 21 deletions protocol/api_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@ package protocol

// Protocol API keys. See: https://kafka.apache.org/protocol#protocol_api_keys
const (
ProduceKey = iota
FetchKey
OffsetsKey
MetadataKey
LeaderAndISRKey
StopReplicaKey
UpdateMetadataKey
ControlledShutdownKey
OffsetCommitKey
OffsetFetchKey
GroupCoordinatorKey
JoinGroupKey
HeartbeatKey
LeaveGroupKey
SyncGroupKey
DescribeGroupsKey
ListGroupsKey
SaslHandshakeKey
APIVersionsKey
CreateTopicsKey
DeleteTopicsKey
ProduceKey = iota // 0
FetchKey // 1
OffsetsKey // 2
MetadataKey // 3
LeaderAndISRKey // 4
StopReplicaKey // 5
UpdateMetadataKey // 6
ControlledShutdownKey // 7
OffsetCommitKey // 8
OffsetFetchKey // 9
GroupCoordinatorKey // 10
JoinGroupKey // 11
HeartbeatKey // 12
LeaveGroupKey // 13
SyncGroupKey // 14
DescribeGroupsKey // 15
ListGroupsKey // 16
SaslHandshakeKey // 17
APIVersionsKey // 18
CreateTopicsKey // 19
DeleteTopicsKey // 20
)
19 changes: 19 additions & 0 deletions protocol/api_versions_requests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package protocol

type APIVersionsRequest struct{}

func (c *APIVersionsRequest) Encode(_ PacketEncoder) error {
return nil
}

func (c *APIVersionsRequest) Decode(_ PacketDecoder) error {
return nil
}

func (c *APIVersionsRequest) Key() int16 {
return APIVersionsKey
}

func (c *APIVersionsRequest) Version() int16 {
return 0
}
57 changes: 57 additions & 0 deletions protocol/api_versions_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package protocol

type APIVersionsResponse struct {
APIVersions []APIVersion
ErrorCode int16
}

type APIVersion struct {
APIKey int16
MinVersion int16
MaxVersion int16
}

func (c *APIVersionsResponse) Encode(e PacketEncoder) error {
e.PutInt16(c.ErrorCode)

if err := e.PutArrayLength(len(c.APIVersions)); err != nil {
return err
}
for _, av := range c.APIVersions {
e.PutInt16(av.APIKey)
e.PutInt16(av.MinVersion)
e.PutInt16(av.MaxVersion)
}
return nil
}

func (c *APIVersionsResponse) Decode(d PacketDecoder) error {
l, err := d.ArrayLength()
if err != nil {
return err
}
c.APIVersions = make([]APIVersion, l)
for i := range c.APIVersions {
key, err := d.Int16()
if err != nil {
return err
}

minVersion, err := d.Int16()
if err != nil {
return err
}

maxVersion, err := d.Int16()
if err != nil {
return err
}

c.APIVersions[i] = APIVersion{
APIKey: key,
MinVersion: minVersion,
MaxVersion: maxVersion,
}
}
return nil
}
35 changes: 35 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (s *Server) handleRequest(conn net.Conn) {
s.logger.Debug("correlation id [%d], request size [%d], key [%d]", header.CorrelationID, size, header.APIKey)

switch header.APIKey {
case protocol.APIVersionsKey:
req := &protocol.APIVersionsRequest{}
s.decode(header, req, d)
if err = s.handleAPIVersions(conn, header, req); err != nil {
s.logger.Info("API Versions failed: %s", err)
}
case protocol.ProduceKey:
req := &protocol.ProduceRequest{}
s.decode(header, req, d)
Expand Down Expand Up @@ -191,6 +197,35 @@ func (s *Server) decode(header *protocol.RequestHeader, req protocol.Decoder, d
return nil
}

func (s *Server) handleAPIVersions(conn net.Conn, header *protocol.RequestHeader, req *protocol.APIVersionsRequest) (err error) {
resp := new(protocol.APIVersionsResponse)

resp.APIVersions = []protocol.APIVersion{
{APIKey: protocol.ProduceKey, MinVersion: 2, MaxVersion: 2},
{APIKey: protocol.FetchKey},
{APIKey: protocol.OffsetsKey},
{APIKey: protocol.MetadataKey},
{APIKey: protocol.LeaderAndISRKey},
{APIKey: protocol.StopReplicaKey},
{APIKey: protocol.GroupCoordinatorKey},
{APIKey: protocol.JoinGroupKey},
{APIKey: protocol.HeartbeatKey},
{APIKey: protocol.LeaveGroupKey},
{APIKey: protocol.SyncGroupKey},
{APIKey: protocol.DescribeGroupsKey},
{APIKey: protocol.ListGroupsKey},
{APIKey: protocol.APIVersionsKey},
{APIKey: protocol.CreateTopicsKey},
{APIKey: protocol.DeleteTopicsKey},
}

r := &protocol.Response{
CorrelationID: header.CorrelationID,
Body: resp,
}
return s.write(conn, header, r)
}

func (s *Server) handleCreateTopic(conn net.Conn, header *protocol.RequestHeader, reqs *protocol.CreateTopicRequests) (err error) {
resp := new(protocol.CreateTopicsResponse)
resp.TopicErrorCodes = make([]*protocol.TopicErrorCode, len(reqs.Requests))
Expand Down

0 comments on commit 34c3fde

Please sign in to comment.