-
Notifications
You must be signed in to change notification settings - Fork 9.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Election RPC service #7634
Election RPC service #7634
Changes from all commits
a6cab69
4b5bb7f
d1ae4cd
80c1b9c
4b4f5be
135a407
9ba69ff
dc8115a
bf047ed
78422ea
5f366db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
// Copyright 2017 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package v3election provides a v3 election service from an etcdserver. | ||
package v3election |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
// Copyright 2017 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package v3election | ||
|
||
import ( | ||
"golang.org/x/net/context" | ||
|
||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/clientv3/concurrency" | ||
epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" | ||
) | ||
|
||
type electionServer struct { | ||
c *clientv3.Client | ||
} | ||
|
||
func NewElectionServer(c *clientv3.Client) epb.ElectionServer { | ||
return &electionServer{c} | ||
} | ||
|
||
func (es *electionServer) Campaign(ctx context.Context, req *epb.CampaignRequest) (*epb.CampaignResponse, error) { | ||
s, err := es.session(ctx, req.Lease) | ||
if err != nil { | ||
return nil, err | ||
} | ||
e := concurrency.NewElection(s, string(req.Name)) | ||
if err = e.Campaign(ctx, string(req.Value)); err != nil { | ||
return nil, err | ||
} | ||
return &epb.CampaignResponse{ | ||
Header: e.Header(), | ||
Leader: &epb.LeaderKey{ | ||
Name: req.Name, | ||
Key: []byte(e.Key()), | ||
Rev: e.Rev(), | ||
Lease: int64(s.Lease()), | ||
}, | ||
}, nil | ||
} | ||
|
||
func (es *electionServer) Proclaim(ctx context.Context, req *epb.ProclaimRequest) (*epb.ProclaimResponse, error) { | ||
s, err := es.session(ctx, req.Leader.Lease) | ||
if err != nil { | ||
return nil, err | ||
} | ||
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev) | ||
if err := e.Proclaim(ctx, string(req.Value)); err != nil { | ||
return nil, err | ||
} | ||
return &epb.ProclaimResponse{Header: e.Header()}, nil | ||
} | ||
|
||
func (es *electionServer) Observe(req *epb.LeaderRequest, stream epb.Election_ObserveServer) error { | ||
s, err := es.session(stream.Context(), -1) | ||
if err != nil { | ||
return err | ||
} | ||
e := concurrency.NewElection(s, string(req.Name)) | ||
ch := e.Observe(stream.Context()) | ||
for stream.Context().Err() == nil { | ||
select { | ||
case <-stream.Context().Done(): | ||
case resp, ok := <-ch: | ||
if !ok { | ||
return nil | ||
} | ||
lresp := &epb.LeaderResponse{Header: resp.Header, Kv: resp.Kvs[0]} | ||
if err := stream.Send(lresp); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return stream.Context().Err() | ||
} | ||
|
||
func (es *electionServer) Leader(ctx context.Context, req *epb.LeaderRequest) (*epb.LeaderResponse, error) { | ||
s, err := es.session(ctx, -1) | ||
if err != nil { | ||
return nil, err | ||
} | ||
l, lerr := concurrency.NewElection(s, string(req.Name)).Leader(ctx) | ||
if lerr != nil { | ||
return nil, lerr | ||
} | ||
return &epb.LeaderResponse{Header: l.Header, Kv: l.Kvs[0]}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we expecting the client that this api to parse leader name out of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. It's returning the full kv so it's possible to do txns on the leader key and revision. The client shouldn't care about what's in the key name in particular, just that it has some way to refer to the key by name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it the user responsibility to understand what
Just a thought. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The assumption is the caller for The metadata is still useful since creation revision, modification revision, and version can all be used to reason about the status of the leader. I don't understand what the advantage is to having a totally separate data type for this-- it's a new data type that can't be used like an ordinary key and it lacks key metadata. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe separate data structure is not useful. However, my main concern is that would the caller/user know exactly what to do with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, can add some examples for the Lock and Election RPCs in another PR. |
||
} | ||
|
||
func (es *electionServer) Resign(ctx context.Context, req *epb.ResignRequest) (*epb.ResignResponse, error) { | ||
s, err := es.session(ctx, req.Leader.Lease) | ||
if err != nil { | ||
return nil, err | ||
} | ||
e := concurrency.ResumeElection(s, string(req.Leader.Name), string(req.Leader.Key), req.Leader.Rev) | ||
if err := e.Resign(ctx); err != nil { | ||
return nil, err | ||
} | ||
return &epb.ResignResponse{Header: e.Header()}, nil | ||
} | ||
|
||
func (es *electionServer) session(ctx context.Context, lease int64) (*concurrency.Session, error) { | ||
s, err := concurrency.NewSession( | ||
es.c, | ||
concurrency.WithLease(clientv3.LeaseID(lease)), | ||
concurrency.WithContext(ctx), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
s.Orphan() | ||
return s, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the comment to reflect the change? I don't think leader just return leader value anymore.