Skip to content

Commit

Permalink
Integrate backend::hooks with consistent_index.
Browse files Browse the repository at this point in the history
Every transaction committed to backend is writing most recent consistent_index.
Makes sure that even automatically trigger commits of batch-transactions
stays "really" consistent a.d. the most recent WAL log index applied.
  • Loading branch information
ptabor committed Apr 12, 2021
1 parent 0ab2e4e commit cec56af
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 20 deletions.
15 changes: 8 additions & 7 deletions server/etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.uber.org/zap"
)

func newBackend(cfg config.ServerConfig) backend.Backend {
func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.BackendPath()
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
Expand All @@ -50,28 +50,29 @@ func newBackend(cfg config.ServerConfig) backend.Backend {
// permit 10% excess over quota for disarm
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
}
bcfg.Hooks = hooks
return backend.New(bcfg)
}

// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
}
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg), nil
return openBackend(cfg, hooks), nil
}

// openBackend returns a backend using the current etcd db.
func openBackend(cfg config.ServerConfig) backend.Backend {
func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
fn := cfg.BackendPath()

now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
beOpened <- newBackend(cfg, hooks)
}()

select {
Expand All @@ -94,7 +95,7 @@ func openBackend(cfg config.ServerConfig) backend.Backend {
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
Expand All @@ -104,5 +105,5 @@ func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
}
20 changes: 10 additions & 10 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,11 @@ type consistentIndex struct {
// it caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
mutex sync.Mutex
mutex sync.Mutex
}

func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
return &consistentIndex{tx: tx}
}

func (ci *consistentIndex) ConsistentIndex() uint64 {
Expand All @@ -85,11 +82,14 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {
}

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
bs := ci.bytesBuf8
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
index := atomic.LoadUint64(&ci.consistentIndex)
if index > 0 {
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}
}

func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
Expand Down
26 changes: 23 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ type EtcdServer struct {
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
beHooks backend.Hooks
authStore auth.AuthStore
alarmStore *v3alarm.AlarmStore

Expand Down Expand Up @@ -294,6 +295,17 @@ type EtcdServer struct {
*AccessController
}

type backendHooks struct {
indexer cindex.ConsistentIndexer
lg *zap.Logger
}

func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
if bh.indexer != nil {
bh.indexer.UnsafeSave(tx)
}
}

// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
Expand Down Expand Up @@ -342,7 +354,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {

bepath := cfg.BackendPath()
beExist := fileutil.Exist(bepath)
be := openBackend(cfg)

beHooks := &backendHooks{lg: cfg.Logger}
be := openBackend(cfg, beHooks)

defer func() {
if err != nil {
Expand Down Expand Up @@ -460,7 +474,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)

if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil {
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
s1, s2 := be.Size(), be.SizeInUse()
Expand Down Expand Up @@ -534,6 +548,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)

srv.be = be
srv.beHooks = beHooks
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
Expand All @@ -560,6 +575,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

// Backend should contain 'meta' bucket going forward.
beHooks.indexer = srv.consistIndex

kvindex := srv.consistIndex.ConsistentIndex()
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
if beExist {
Expand Down Expand Up @@ -1161,7 +1180,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
// wait for raftNode to persist snapshot onto the disk
<-apply.notifyc

newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {
lg.Panic("failed to open snapshot backend", zap.Error(err))
}
Expand Down Expand Up @@ -2096,6 +2115,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))

if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
Expand Down

0 comments on commit cec56af

Please sign in to comment.