-
Notifications
You must be signed in to change notification settings - Fork 495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Dashboard Metrics Storage Key #2483
Changes from 10 commits
1a4b29d
0bd336e
c9875a7
c7a5472
dbecff3
18d7e3e
48172b4
d4b6d6b
b9873cc
1cb413c
6a1ef6f
89d2bb9
37be87b
9ec5db3
84b2938
6d4d700
8929a5c
081243d
516e320
7448896
66a5673
3c0ded9
8e16fd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,18 +46,21 @@ type Namespace string | |
type PDControlInterface interface { | ||
// GetPDClient provides PDClient of the tidb cluster. | ||
GetPDClient(Namespace, string, bool) PDClient | ||
// GetPDEtcdClient provides PD etcd Client of the tidb cluster. | ||
GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) | ||
} | ||
|
||
// defaultPDControl is the default implementation of PDControlInterface. | ||
type defaultPDControl struct { | ||
mutex sync.Mutex | ||
kubeCli kubernetes.Interface | ||
pdClients map[string]PDClient | ||
mutex sync.Mutex | ||
kubeCli kubernetes.Interface | ||
pdClients map[string]PDClient | ||
pdEtcdClients map[string]PDEtcdClient | ||
} | ||
|
||
// NewDefaultPDControl returns a defaultPDControl instance | ||
func NewDefaultPDControl(kubeCli kubernetes.Interface) PDControlInterface { | ||
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}} | ||
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}} | ||
} | ||
|
||
// GetTLSConfig returns *tls.Config for given TiDB cluster. | ||
|
@@ -72,6 +75,32 @@ func GetTLSConfig(kubeCli kubernetes.Interface, namespace Namespace, tcName stri | |
return crypto.LoadTlsConfigFromSecret(secret, caCert) | ||
} | ||
|
||
func (pdc *defaultPDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) { | ||
pdc.mutex.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest define separate mutex here, it's an independent client with existing PDClient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
defer pdc.mutex.Unlock() | ||
|
||
var tlsConfig *tls.Config | ||
var err error | ||
|
||
if tlsEnabled { | ||
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil) | ||
if err != nil { | ||
klog.Errorf("Unable to get tls config for tidb cluster %q, pd etcd client may not work: %v", tcName, err) | ||
return nil, err | ||
} | ||
return NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, tlsConfig) | ||
} | ||
key := pdEtcdClientKey(namespace, tcName) | ||
if _, ok := pdc.pdEtcdClients[key]; !ok { | ||
pdetcdClient, err := NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pdc.pdEtcdClients[key] = pdetcdClient | ||
} | ||
return pdc.pdEtcdClients[key], nil | ||
} | ||
|
||
// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one. | ||
func (pdc *defaultPDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient { | ||
pdc.mutex.Lock() | ||
|
@@ -104,11 +133,19 @@ func pdClientKey(scheme string, namespace Namespace, clusterName string) string | |
return fmt.Sprintf("%s.%s.%s", scheme, clusterName, string(namespace)) | ||
} | ||
|
||
func pdEtcdClientKey(namespace Namespace, clusterName string) string { | ||
return fmt.Sprintf("%s.%s", clusterName, string(namespace)) | ||
} | ||
|
||
// pdClientUrl builds the url of pd client | ||
func PdClientURL(namespace Namespace, clusterName string, scheme string) string { | ||
return fmt.Sprintf("%s://%s-pd.%s:2379", scheme, clusterName, string(namespace)) | ||
} | ||
|
||
func PDEtcdClientURL(namespace Namespace, clusterName string) string { | ||
return fmt.Sprintf("%s-pd.%s:2379", clusterName, string(namespace)) | ||
} | ||
|
||
// PDClient provides pd server's api | ||
type PDClient interface { | ||
// GetHealth returns the PD's health info | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,11 +19,13 @@ import ( | |
"time" | ||
|
||
etcdclientv3 "github.com/coreos/etcd/clientv3" | ||
etcdclientv3util "github.com/coreos/etcd/clientv3/clientv3util" | ||
) | ||
|
||
type PDEtcdApi interface { | ||
type PDEtcdClient interface { | ||
// PutKey would put key to the target pd etcd cluster | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the comment is misleading, I should use |
||
PutKey(key, value string) error | ||
|
||
// DeleteKey would delete key from the target pd etcd cluster | ||
Yisaer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DeleteKey(key string) error | ||
} | ||
|
||
|
@@ -32,7 +34,7 @@ type pdEtcdClient struct { | |
etcdClient *etcdclientv3.Client | ||
} | ||
|
||
func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdApi, error) { | ||
func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdClient, error) { | ||
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{ | ||
Endpoints: []string{url}, | ||
DialTimeout: timeout, | ||
|
@@ -57,6 +59,15 @@ func (pec *pdEtcdClient) PutKey(key, value string) error { | |
func (pec *pdEtcdClient) DeleteKey(key string) error { | ||
ctx, cancel := context.WithTimeout(context.Background(), pec.timeout) | ||
defer cancel() | ||
_, err := pec.etcdClient.Delete(ctx, key) | ||
return err | ||
kvc := etcdclientv3.NewKV(pec.etcdClient) | ||
|
||
// perform a delete only if key already exists | ||
_, err := kvc.Txn(ctx). | ||
If(etcdclientv3util.KeyExists(key)). | ||
Then(etcdclientv3.OpDelete(key)). | ||
Commit() | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Status manager should focus on the status update, can we move the etcd key operation to a separate function out of the status manager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, i think it is not necessary as we only sync etcd key for
MonitorRef
. If there are more new actions for ectd operation, it will be ok for me to split it as an independent manager.