Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(no)StoreV2 (Part 4): Backend hooks: precommit updates consistency_index #12855

Merged
merged 8 commits into from
May 8, 2021
4 changes: 1 addition & 3 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
ci := cindex.NewConsistentIndex(tx)
ci.SetConsistentIndex(idx)
ci.UnsafeSave(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.
Expand Down
2 changes: 1 addition & 1 deletion etcdctl/ctlv3/command/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
}()

readKeys(reader, be)
cindex.UpdateConsistentIndex(be.BatchTx(), index)
cindex.UpdateConsistentIndex(be.BatchTx(), index, true)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
Expand Down
5 changes: 2 additions & 3 deletions etcdctl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
func (s *v3Manager) updateCIndex(commit uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(commit)
ci.UnsafeSave(be.BatchTx())

cindex.UpdateConsistentIndex(be.BatchTx(), commit, false)
return nil
}
23 changes: 1 addition & 22 deletions server/auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"

"go.uber.org/zap"
Expand Down Expand Up @@ -215,7 +214,6 @@ type authStore struct {

tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
ci cindex.ConsistentIndexer
}

func (as *authStore) AuthEnable() error {
Expand Down Expand Up @@ -266,7 +264,6 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()

Expand Down Expand Up @@ -424,7 +421,6 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(as.lg, tx, newUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("added a user", zap.String("user-name", r.Name))
return &pb.AuthUserAddResponse{}, nil
Expand All @@ -448,7 +444,6 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -491,7 +486,6 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(as.lg, tx, updatedUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -540,7 +534,6 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"granted a role to a user",
Expand Down Expand Up @@ -619,7 +612,6 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"revoked a role from a user",
Expand Down Expand Up @@ -690,7 +682,6 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"revoked a permission on range",
Expand Down Expand Up @@ -742,7 +733,6 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("deleted a role", zap.String("role-name", r.Role))
return &pb.AuthRoleDeleteResponse{}, nil
Expand All @@ -769,7 +759,6 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(as.lg, tx, newRole)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("created a role", zap.String("role-name", r.Name))
return &pb.AuthRoleAddResponse{}, nil
Expand Down Expand Up @@ -829,7 +818,6 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"granted/updated a permission to a user",
Expand Down Expand Up @@ -1021,7 +1009,7 @@ func (as *authStore) IsAuthEnabled() bool {
}

// NewAuthStore creates a new AuthStore.
func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore {
func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we consider a non-breaking change here?

Copy link
Contributor Author

@ptabor ptabor Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's neither client/v3/... nor server/embed API

Are you aware about any cases of usage of it outside of etcd ?

If I understand right, it should be part of "internal" package ?
Also customers (if they do exist) need to adopt due to package etcd/server/(v3/)auth rename.

if lg == nil {
lg = zap.NewNop()
}
Expand Down Expand Up @@ -1056,7 +1044,6 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexe
revision: getRevision(tx),
lg: lg,
be: be,
ci: ci,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
tokenProvider: tp,
Expand Down Expand Up @@ -1317,14 +1304,6 @@ func (as *authStore) BcryptCost() int {
return as.bcryptCost
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.ci != nil {
as.ci.UnsafeSave(tx)
} else {
as.lg.Error("failed to save consistentIndex,consistentIndexer is nil")
}
}

func (as *authStore) setupMetricsReporter() {
reportCurrentAuthRevMu.Lock()
reportCurrentAuthRev = func() float64 {
Expand Down
16 changes: 8 additions & 8 deletions server/auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand All @@ -64,7 +64,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
defer b2.Close()
as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost)
as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost)
defer as.Close()
new := as.Revision()

Expand All @@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {

invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost)
as := NewAuthStore(zap.NewExample(), b, tp, invalidCost)
defer as.Close()
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
Expand All @@ -105,7 +105,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
defer as.Close()

donec := make(chan struct{})
Expand Down Expand Up @@ -730,7 +730,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost)
as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost)
defer as2.Close()

if !as2.IsAuthEnabled() {
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestRolesOrder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
defer as.Close()
err = enableAuthAndCreateRoot(as)
if err != nil {
Expand Down Expand Up @@ -867,7 +867,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
defer as.Close()

if err = enableAuthAndCreateRoot(as); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

type KVGetter interface {
KV() mvcc.ConsistentWatchableKV
KV() mvcc.WatchableKV
}

type BackendGetter interface {
Expand Down
19 changes: 9 additions & 10 deletions server/etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.uber.org/zap"
)

func newBackend(cfg config.ServerConfig) backend.Backend {
func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
bcfg := backend.DefaultBackendConfig()
bcfg.Path = cfg.BackendPath()
bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync
Expand All @@ -51,29 +51,29 @@ func newBackend(cfg config.ServerConfig) backend.Backend {
bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
}
bcfg.Mlock = cfg.ExperimentalMemoryMlock

bcfg.Hooks = hooks
return backend.New(bcfg)
}

// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) {
snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
if err != nil {
return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
}
if err := os.Rename(snapPath, cfg.BackendPath()); err != nil {
return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
}
return openBackend(cfg), nil
return openBackend(cfg, hooks), nil
}

// openBackend returns a backend using the current etcd db.
func openBackend(cfg config.ServerConfig) backend.Backend {
func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
fn := cfg.BackendPath()

now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
beOpened <- newBackend(cfg)
beOpened <- newBackend(cfg, hooks)
}()

select {
Expand All @@ -96,15 +96,14 @@ func openBackend(cfg config.ServerConfig) backend.Backend {
// before updating the backend db after persisting raft snapshot to disk,
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
consistentIndex = ci.ConsistentIndex()
consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil
}
oldbe.Close()
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
}
Loading