Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Linearizable LeaseLeases #13882

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2213,7 +2213,13 @@
}
},
"etcdserverpbLeaseLeasesRequest": {
"type": "object"
"type": "object",
"properties": {
"linearizable": {
"type": "boolean",
"format": "boolean"
}
}
},
"etcdserverpbLeaseLeasesResponse": {
"type": "object",
Expand Down
596 changes: 319 additions & 277 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ message LeaseTimeToLiveResponse {

message LeaseLeasesRequest {
option (versionpb.etcd_version_msg) = "3.3";

bool linearizable = 1 [(versionpb.etcd_version_field)="3.6"];
}

message LeaseStatus {
Expand Down
6 changes: 3 additions & 3 deletions client/v3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ type Lease interface {
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
Leases(ctx context.Context, linearizable bool) (*LeaseLeasesResponse, error)

// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
// to the channel are not consumed promptly the channel may become full. When full, the lease
Expand Down Expand Up @@ -251,8 +251,8 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption
return gresp, nil
}

func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
func (l *lessor) Leases(ctx context.Context, linearizable bool) (*LeaseLeasesResponse, error) {
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{Linearizable: linearizable}, l.callOpts...)
if err == nil {
leases := make([]LeaseStatus, len(resp.Leases))
for i := range resp.Leases {
Expand Down
5 changes: 4 additions & 1 deletion etcdctl/ctlv3/command/lease_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,22 @@ func leaseTimeToLiveCommandFunc(cmd *cobra.Command, args []string) {
display.TimeToLive(*resp, timeToLiveKeys)
}

var linearizableLeaseList bool

// NewLeaseListCommand returns the cobra command for "lease list".
func NewLeaseListCommand() *cobra.Command {
lc := &cobra.Command{
Use: "list",
Short: "List all active leases",
Run: leaseListCommandFunc,
}
lc.Flags().BoolVar(&linearizableLeaseList, "linearizable", false, "Whether we should make a linearizable request for the list of leases")
return lc
}

// leaseListCommandFunc executes the "lease list" command.
func leaseListCommandFunc(cmd *cobra.Command, args []string) {
resp, rerr := mustClientFromCmd(cmd).Leases(context.TODO())
resp, rerr := mustClientFromCmd(cmd).Leases(context.TODO(), linearizableLeaseList)
if rerr != nil {
cobrautl.ExitWithError(cobrautl.ExitBadConnection, rerr)
}
Expand Down
6 changes: 6 additions & 0 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}

func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
if r.Linearizable {
if err := s.linearizableReadNotify(ctx); err != nil {
return nil, err
}
}

ls := s.lessor.Leases()
lss := make([]*pb.LeaseStatus, len(ls))
for i := range ls {
Expand Down
4 changes: 2 additions & 2 deletions server/proxy/grpcproxy/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -115,7 +115,7 @@ func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiv
}

func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
r, err := lp.lessor.Leases(ctx)
r, err := lp.lessor.Leases(ctx, rr.Linearizable)
if err != nil {
return nil, err
}
Expand Down
86 changes: 84 additions & 2 deletions tests/common/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestLeaseGrantTimeToLive(t *testing.T) {
}
}

func TestLeaseGrantAndList(t *testing.T) {
func TestLeaseGrantAndList_Serializable(t *testing.T) {
testRunner.BeforeTest(t)

tcs := []struct {
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestLeaseGrantAndList(t *testing.T) {
// or by hitting an up to date member.
leases := []clientv3.LeaseStatus{}
require.Eventually(t, func() bool {
resp, err := cc.LeaseList()
resp, err := cc.LeaseList(false)
if err != nil {
return false
}
Expand All @@ -161,6 +161,88 @@ func TestLeaseGrantAndList(t *testing.T) {
}
}

func TestLeaseGrantAndList_Linearizable(t *testing.T) {
testRunner.BeforeTest(t)

tcs := []struct {
name string
config config.ClusterConfig
}{
{
name: "NoTLS",
config: config.ClusterConfig{ClusterSize: 1},
},
{
name: "PeerTLS",
config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.ManualTLS},
},
{
name: "PeerAutoTLS",
config: config.ClusterConfig{ClusterSize: 3, PeerTLS: config.AutoTLS},
},
{
name: "ClientTLS",
config: config.ClusterConfig{ClusterSize: 1, ClientTLS: config.ManualTLS},
},
{
name: "ClientAutoTLS",
config: config.ClusterConfig{ClusterSize: 1, ClientTLS: config.AutoTLS},
},
}
for _, tc := range tcs {
nestedCases := []struct {
name string
leaseCount int
}{
{
name: "no_leases",
leaseCount: 0,
},
{
name: "one_lease",
leaseCount: 1,
},
{
name: "many_leases",
leaseCount: 3,
},
}

for _, nc := range nestedCases {
t.Run(tc.name+"/"+nc.name, func(t *testing.T) {
t.Logf("Creating cluster...")
clus := testRunner.NewCluster(t, tc.config)
defer clus.Close()
cc := clus.Client()
t.Logf("Created cluster and client")
testutils.ExecuteWithTimeout(t, 10*time.Second, func() {
createdLeases := []clientv3.LeaseID{}
lastRev := int64(0)
for i := 0; i < nc.leaseCount; i++ {
leaseResp, err := cc.Grant(10)
t.Logf("Grant returned: resp:%s err:%v", leaseResp.String(), err)
require.NoError(t, err)
createdLeases = append(createdLeases, leaseResp.ID)
lastRev = leaseResp.GetRevision()
}

resp, err := cc.LeaseList(true)
require.NoError(t, err)
leases := resp.Leases
require.GreaterOrEqual(t, resp.GetRevision(), lastRev)

returnedLeases := make([]clientv3.LeaseID, 0, nc.leaseCount)
for _, status := range leases {
returnedLeases = append(returnedLeases, status.ID)
}

require.ElementsMatch(t, createdLeases, returnedLeases)
})
})
}
}
}

func TestLeaseGrantTimeToLiveExpired(t *testing.T) {
testRunner.BeforeTest(t)

Expand Down
4 changes: 2 additions & 2 deletions tests/framework/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ func (ctl *EtcdctlV3) Defragment(o config.DefragOption) error {
return err
}

func (ctl *EtcdctlV3) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
func (ctl *EtcdctlV3) LeaseList(linearizable bool) (*clientv3.LeaseLeasesResponse, error) {
args := ctl.cmdArgs()
args = append(args, "lease", "list", "-w", "json")
args = append(args, "lease", "list", "--linearizable", strconv.FormatBool(linearizable), "-w", "json")
cmd, err := SpawnCmd(args, nil)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions tests/framework/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ func (c integrationClient) TimeToLive(id clientv3.LeaseID, o config.LeaseOption)
return c.Client.TimeToLive(ctx, id, leaseOpts...)
}

func (c integrationClient) LeaseList() (*clientv3.LeaseLeasesResponse, error) {
func (c integrationClient) LeaseList(linearizable bool) (*clientv3.LeaseLeasesResponse, error) {
ctx := context.Background()

return c.Client.Leases(ctx)
return c.Client.Leases(ctx, linearizable)
}

func (c integrationClient) LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Client interface {
AlarmDisarm(alarmMember *clientv3.AlarmMember) (*clientv3.AlarmResponse, error)
Grant(ttl int64) (*clientv3.LeaseGrantResponse, error)
TimeToLive(id clientv3.LeaseID, opts config.LeaseOption) (*clientv3.LeaseTimeToLiveResponse, error)
LeaseList() (*clientv3.LeaseLeasesResponse, error)
LeaseList(linearizable bool) (*clientv3.LeaseLeasesResponse, error)
LeaseKeepAliveOnce(id clientv3.LeaseID) (*clientv3.LeaseKeepAliveResponse, error)
LeaseRevoke(id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
}
4 changes: 2 additions & 2 deletions tests/integration/clientv3/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
)
Expand Down Expand Up @@ -637,7 +637,7 @@ func TestLeaseLeases(t *testing.T) {
ids = append(ids, resp.ID)
}

resp, err := cli.Leases(context.Background())
resp, err := cli.Leases(context.Background(), false)
if err != nil {
t.Fatal(err)
}
Expand Down