Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Dashboard Metrics Storage Key #2483

Merged
merged 23 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewController(
tiflashUpgrader,
),
mm.NewTidbDiscoveryManager(typedControl),
mm.NewTidbClusterStatusManager(cli),
mm.NewTidbClusterStatusManager(kubeCli, cli),
podRestarter,
&tidbClusterConditionUpdater{},
recorder,
Expand Down
163 changes: 153 additions & 10 deletions pkg/manager/member/tidbcluster_status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,180 @@
package member

import (
"encoding/json"
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

const (
prometheusEtcdKey = "/topology/prometheus"
grafanaEtcdKey = "/topology/grafana"
//TODO support AlertManager, move to UCP
alertManagerEtcdKey = "/topology/alertmanager"
)

type TidbClusterStatusManager struct {
cli versioned.Interface
cli versioned.Interface
pdControl pdapi.PDControlInterface
}

func NewTidbClusterStatusManager(cli versioned.Interface) *TidbClusterStatusManager {
func NewTidbClusterStatusManager(kubeCli kubernetes.Interface, cli versioned.Interface) *TidbClusterStatusManager {
return &TidbClusterStatusManager{
cli: cli,
cli: cli,
pdControl: pdapi.NewDefaultPDControl(kubeCli),
}
}

func (tcsm *TidbClusterStatusManager) Sync(tc *v1alpha1.TidbCluster) error {
return tcsm.syncTidbMonitorRefAndKey(tc)
}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRefAndKey(tc *v1alpha1.TidbCluster) error {
tm, err := tcsm.syncTidbMonitorRef(tc)
if err != nil {
return err
}
return tcsm.syncDashboardMetricStorage(tc, tm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status manager should focus on the status update, can we move the etcd key operation to a separate function out of the status manager?

Copy link
Contributor Author

@Yisaer Yisaer May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, i think it is not necessary as we only sync etcd key for MonitorRef. If there are more new actions for ectd operation, it will be ok for me to split it as an independent manager.

}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRef(tc *v1alpha1.TidbCluster) (*v1alpha1.TidbMonitor, error) {
if tc.Status.Monitor == nil {
return nil, nil
}
tmRef := tc.Status.Monitor
tm, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
err = nil
}
return nil, err
}
tcRef := tm.Spec.Clusters
if len(tcRef) < 1 {
tc.Status.Monitor = nil
return nil, nil
}
if len(tcRef[0].Namespace) < 1 {
tcRef[0].Namespace = tm.Namespace
}
if tcRef[0].Name != tc.Name || tcRef[0].Namespace != tc.Namespace {
tc.Status.Monitor = nil
return nil, nil
}

return tm, nil
}

func (tcsm *TidbClusterStatusManager) syncDashboardMetricStorage(tc *v1alpha1.TidbCluster, tm *v1alpha1.TidbMonitor) error {
pdEtcdClient, err := tcsm.pdControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
if err != nil {
return err
}
var prometheusExist bool
var grafanaExist bool
if tc.Status.Monitor != nil {
tmRef := tc.Status.Monitor
_, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
prometheusExist = true
if tm.Spec.Grafana == nil {
grafanaExist = false
} else {
grafanaExist = true
}
} else {
prometheusExist = false
grafanaExist = false
}

// sync prometheus key
if prometheusExist {
v, err := buildPrometheusEtcdValue(tm)
if err != nil {
klog.Error(err.Error())
return err
}
err = putPrometheusKey(pdEtcdClient, v)
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
} else {
return err
}
klog.Error(err.Error())
return err
}
} else {
err = cleanPrometheusKey(pdEtcdClient)
if err != nil {
klog.Error(err.Error())
return err
}
}

// sync grafana key
if grafanaExist {
v, err := buildGrafanaEtcdValue(tm)
if err != nil {
klog.Error(err.Error())
return err
}
err = putGrafanaKey(pdEtcdClient, v)
if err != nil {
klog.Error(err.Error())
return err
}
} else {
err = cleanGrafanaKey(pdEtcdClient)
if err != nil {
klog.Error(err.Error())
return err
}
}
return nil
}

func putGrafanaKey(etcdClient pdapi.PDEtcdClient, value string) error {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
return etcdClient.PutKey(grafanaEtcdKey, value)
}

func putPrometheusKey(etcdClient pdapi.PDEtcdClient, value string) error {
return etcdClient.PutKey(prometheusEtcdKey, value)
}

func cleanPrometheusKey(etcdClient pdapi.PDEtcdClient) error {
return etcdClient.DeleteKey(prometheusEtcdKey)
}

func cleanGrafanaKey(etcdClient pdapi.PDEtcdClient) error {
return etcdClient.DeleteKey(grafanaEtcdKey)
}

type componentTopology struct {
IP string `json:"ip"`
Port int `json:"port"`
}

func buildGrafanaEtcdValue(tm *v1alpha1.TidbMonitor) (string, error) {
return buildEtcdValue(fmt.Sprintf("%s-grafana.%s.svc", tm.Name, tm.Namespace), 3000)
}

func buildPrometheusEtcdValue(tm *v1alpha1.TidbMonitor) (string, error) {
return buildEtcdValue(fmt.Sprintf("%s-prometheus.%s.svc", tm.Name, tm.Namespace), 9090)
}

func buildEtcdValue(host string, port int) (string, error) {
topology := componentTopology{
IP: host,
Port: port,
}
data, err := json.Marshal(topology)
if err != nil {
return "", err
}
return string(data), nil
}

type FakeTidbClusterStatusManager struct {
}

Expand Down
45 changes: 41 additions & 4 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ type Namespace string
type PDControlInterface interface {
// GetPDClient provides PDClient of the tidb cluster.
GetPDClient(Namespace, string, bool) PDClient
// GetPDEtcdClient provides PD etcd Client of the tidb cluster.
GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error)
}

// defaultPDControl is the default implementation of PDControlInterface.
type defaultPDControl struct {
mutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
mutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
pdEtcdClients map[string]PDEtcdClient
}

// NewDefaultPDControl returns a defaultPDControl instance
func NewDefaultPDControl(kubeCli kubernetes.Interface) PDControlInterface {
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}}
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}}
}

// GetTLSConfig returns *tls.Config for given TiDB cluster.
Expand All @@ -72,6 +75,32 @@ func GetTLSConfig(kubeCli kubernetes.Interface, namespace Namespace, tcName stri
return crypto.LoadTlsConfigFromSecret(secret, caCert)
}

func (pdc *defaultPDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) {
pdc.mutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest define separate mutex here, it's an independent client with existing PDClient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

defer pdc.mutex.Unlock()

var tlsConfig *tls.Config
var err error

if tlsEnabled {
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil)
if err != nil {
klog.Errorf("Unable to get tls config for tidb cluster %q, pd etcd client may not work: %v", tcName, err)
return nil, err
}
return NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, tlsConfig)
}
key := pdEtcdClientKey(namespace, tcName)
if _, ok := pdc.pdEtcdClients[key]; !ok {
pdetcdClient, err := NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, nil)
if err != nil {
return nil, err
}
pdc.pdEtcdClients[key] = pdetcdClient
}
return pdc.pdEtcdClients[key], nil
}

// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
func (pdc *defaultPDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient {
pdc.mutex.Lock()
Expand Down Expand Up @@ -104,11 +133,19 @@ func pdClientKey(scheme string, namespace Namespace, clusterName string) string
return fmt.Sprintf("%s.%s.%s", scheme, clusterName, string(namespace))
}

func pdEtcdClientKey(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s.%s", clusterName, string(namespace))
}

// pdClientUrl builds the url of pd client
func PdClientURL(namespace Namespace, clusterName string, scheme string) string {
return fmt.Sprintf("%s://%s-pd.%s:2379", scheme, clusterName, string(namespace))
}

func PDEtcdClientURL(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s-pd.%s:2379", clusterName, string(namespace))
}

// PDClient provides pd server's api
type PDClient interface {
// GetHealth returns the PD's health info
Expand Down
21 changes: 16 additions & 5 deletions pkg/pdapi/pdetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"time"

etcdclientv3 "github.com/coreos/etcd/clientv3"
etcdclientv3util "github.com/coreos/etcd/clientv3/clientv3util"
)

type PDEtcdApi interface {
type PDEtcdClient interface {
// PutKey would put key to the target pd etcd cluster
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use would here? At the end of the function, the put/delete operation may not finish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is misleading, I should use will here.

PutKey(key, value string) error

// DeleteKey would delete key from the target pd etcd cluster
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
DeleteKey(key string) error
}

Expand All @@ -32,7 +34,7 @@ type pdEtcdClient struct {
etcdClient *etcdclientv3.Client
}

func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdApi, error) {
func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdClient, error) {
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{url},
DialTimeout: timeout,
Expand All @@ -57,6 +59,15 @@ func (pec *pdEtcdClient) PutKey(key, value string) error {
func (pec *pdEtcdClient) DeleteKey(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), pec.timeout)
defer cancel()
_, err := pec.etcdClient.Delete(ctx, key)
return err
kvc := etcdclientv3.NewKV(pec.etcdClient)

// perform a delete only if key already exists
_, err := kvc.Txn(ctx).
If(etcdclientv3util.KeyExists(key)).
Then(etcdclientv3.OpDelete(key)).
Commit()
if err != nil {
return err
}
return nil
}