Skip to content

Commit

Permalink
etcdserver: support read index
Browse files Browse the repository at this point in the history
Use read index to achieve l-read.
  • Loading branch information
xiang90 committed Sep 23, 2016
1 parent cfe717e commit 89a5378
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 17 deletions.
11 changes: 11 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply

// a chan to send out readState
readStateC chan raft.ReadState

// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
Expand Down Expand Up @@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) {
}
}

if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-r.stopped:
return
}
}

raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
Expand Down
14 changes: 13 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,16 @@ type EtcdServer struct {

snapCount uint64

w wait.Wait
w wait.Wait

readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// readNotifier is used to notify the read routine that it can process the request
// when there is no error
readNotifier *notifier

stop chan struct{}
done chan struct{}
errorc chan error
Expand Down Expand Up @@ -384,6 +393,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Expand Down Expand Up @@ -471,6 +481,7 @@ func (s *EtcdServer) Start() {
go s.purgeFile()
go monitorFileDescriptor(s.done)
go s.monitorVersions()
go s.linearizableReadLoop()
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand All @@ -485,6 +496,7 @@ func (s *EtcdServer) start() {
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.readwaitc = make(chan struct{}, 1)
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
}
return longest, true
}

type notifier struct {
c chan struct{}
err error
}

func newNotifier() *notifier {
return &notifier{
c: make(chan struct{}, 0),
}
}

func (nc *notifier) close(err error) {
nc.err = err
close(nc.c)
}
113 changes: 97 additions & 16 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package etcdserver

import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +29,7 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -86,26 +90,22 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
return resp, err
}
result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Range: r})
if err != nil {
return nil, err
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
if result.err != nil {
return nil, result.err
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return result.resp.(*pb.RangeResponse), nil
return resp, err
}

func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
Expand Down Expand Up @@ -641,3 +641,84 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern

// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState

ctx := make([]byte, 8)
s.readNotifier = newNotifier()

for {
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())

select {
case <-s.readwaitc:
case <-s.done:
return
}

nextnr := newNotifier()

s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

if err := s.r.ReadIndex(context.Background(), ctx); err != nil {
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
nr.close(err)
continue
}

select {
case rs = <-s.r.readStateC:
if !bytes.Equal(rs.RequestCtx, ctx) {
err := fmt.Errorf("unexpected read index context value (want %v, got %v)", rs.RequestCtx, ctx)
plog.Errorf(err.Error())
nr.close(err)
continue
}
case <-time.After(time.Second):
plog.Warningf("time out waiting for read index response")
nr.close(context.DeadlineExceeded)
continue
case <-s.done:
return
}

if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(ai):
case <-s.done:
return
}
}
// unblock all l-reads requested at indices before rs.Index
nr.close(nil)
}
}

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()

// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}

// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}

0 comments on commit 89a5378

Please sign in to comment.