Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Dashboard Metrics Storage Key #2483

Merged
merged 23 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -14880,6 +14880,18 @@ string
<p>Name is the name of TidbMonitor object</p>
</td>
</tr>
<tr>
<td>
<code>grafanaEnabled</code></br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>GrafanaEnabled indicate whether the grafana is enabled for this target tidbmonitor</p>
</td>
</tr>
</tbody>
</table>
<h3 id="tidbmonitorspec">TidbMonitorSpec</h3>
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6148,6 +6148,8 @@ spec:
type: string
monitor:
properties:
grafanaEnabled:
type: boolean
name:
type: string
namespace:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbclusterautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ type TidbMonitorRef struct {

// Name is the name of TidbMonitor object
Name string `json:"name"`

// GrafanaEnabled indicate whether the grafana is enabled for this target tidbmonitor
// +optional
GrafanaEnabled bool `json:"grafanaEnabled,omitempty"`
}

// +k8s:openapi-gen=true
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func NewController(
setControl,
),
mm.NewTidbDiscoveryManager(typedControl),
mm.NewTidbClusterStatusManager(cli),
mm.NewTidbClusterStatusManager(kubeCli, cli),
podRestarter,
&tidbClusterConditionUpdater{},
recorder,
Expand Down
143 changes: 133 additions & 10 deletions pkg/manager/member/tidbcluster_status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,160 @@
package member

import (
"encoding/json"
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

const (
prometheusComponent = "prometheus"
grafanaComponent = "grafana"
//TODO support AlertManager, move to UCP
alertmanager = "alertmanager"
componentPrefix = "/topology"
)

type TidbClusterStatusManager struct {
cli versioned.Interface
cli versioned.Interface
pdControl pdapi.PDControlInterface
}

func NewTidbClusterStatusManager(cli versioned.Interface) *TidbClusterStatusManager {
func NewTidbClusterStatusManager(kubeCli kubernetes.Interface, cli versioned.Interface) *TidbClusterStatusManager {
return &TidbClusterStatusManager{
cli: cli,
cli: cli,
pdControl: pdapi.NewDefaultPDControl(kubeCli),
}
}

func (tcsm *TidbClusterStatusManager) Sync(tc *v1alpha1.TidbCluster) error {
return tcsm.syncTidbMonitorRefAndKey(tc)
}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRefAndKey(tc *v1alpha1.TidbCluster) error {
tm, err := tcsm.syncTidbMonitorRef(tc)
if err != nil {
return err
}
return tcsm.syncDashboardMetricStorage(tc, tm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status manager should focus on the status update, can we move the etcd key operation to a separate function out of the status manager?

Copy link
Contributor Author

@Yisaer Yisaer May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, i think it is not necessary as we only sync etcd key for MonitorRef. If there are more new actions for ectd operation, it will be ok for me to split it as an independent manager.

}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRef(tc *v1alpha1.TidbCluster) (*v1alpha1.TidbMonitor, error) {
if tc.Status.Monitor == nil {
return nil, nil
}
tmRef := tc.Status.Monitor
tm, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
err = nil
}
return nil, err
}
tcRef := tm.Spec.Clusters
if len(tcRef) < 1 {
tc.Status.Monitor = nil
return nil, nil
}
if len(tcRef[0].Namespace) < 1 {
tcRef[0].Namespace = tm.Namespace
}
if tcRef[0].Name != tc.Name || tcRef[0].Namespace != tc.Namespace {
tc.Status.Monitor = nil
return nil, nil
}
tc.Status.Monitor.GrafanaEnabled = true
if tm.Spec.Grafana == nil {
tc.Status.Monitor.GrafanaEnabled = false
}

return tm, nil
}

func (tcsm *TidbClusterStatusManager) syncDashboardMetricStorage(tc *v1alpha1.TidbCluster, tm *v1alpha1.TidbMonitor) error {
pdEtcdClient, err := tcsm.pdControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
if err != nil {
return err
}
var prometheusExist bool
var grafanaExist bool
if tc.Status.Monitor != nil {
tmRef := tc.Status.Monitor
_, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
prometheusExist = true
grafanaExist = tc.Status.Monitor.GrafanaEnabled
} else {
prometheusExist = false
grafanaExist = false
}

// sync prometheus key
err = syncComponent(prometheusExist, tm, prometheusComponent, 9090, pdEtcdClient)
if err != nil {
return err
}

// sync grafana key
err = syncComponent(grafanaExist, tm, grafanaComponent, 3000, pdEtcdClient)
if err != nil {
return err
}
return nil
}

func syncComponent(exist bool, tm *v1alpha1.TidbMonitor, componentName string, port int, etcdClient pdapi.PDEtcdClient) error {
key := buildComponentKey(componentName)
if exist {
v, err := buildComponentValue(tm, componentName, port)
if err != nil {
klog.Error(err.Error())
return err
}
err = etcdClient.PutKey(key, v)
if err != nil {
klog.Error(err.Error())
return err
}
} else {
err := etcdClient.DeleteKey(key)
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
} else {
return err
}
klog.Error(err.Error())
return err
}
}
return nil
}

func buildComponentKey(component string) string {
return fmt.Sprintf("%s/%s", componentPrefix, component)
}

func buildComponentValue(tm *v1alpha1.TidbMonitor, componentName string, port int) (string, error) {
return buildEtcdValue(fmt.Sprintf("%s-%s.%s.svc", tm.Name, componentName, tm.Namespace), port)
}

type componentTopology struct {
IP string `json:"ip"`
Port int `json:"port"`
}

func buildEtcdValue(host string, port int) (string, error) {
topology := componentTopology{
IP: host,
Port: port,
}
data, err := json.Marshal(topology)
if err != nil {
return "", err
}
return string(data), nil
}

type FakeTidbClusterStatusManager struct {
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/monitor/monitor/monitor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,16 @@ func (mm *MonitorManager) patchTidbClusterStatus(tcRef *v1alpha1.TidbClusterRef,
}
var mergePatch []byte
if tcRef != nil {
grafanaEnabled := true
if monitor.Spec.Grafana == nil {
grafanaEnabled = false
}
mergePatch, err = json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"monitor": map[string]interface{}{
"name": monitor.Name,
"namespace": monitor.Namespace,
"name": monitor.Name,
"namespace": monitor.Namespace,
"grafanaEnabled": grafanaEnabled,
},
},
})
Expand Down
46 changes: 42 additions & 4 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ type Namespace string
type PDControlInterface interface {
// GetPDClient provides PDClient of the tidb cluster.
GetPDClient(Namespace, string, bool) PDClient
// GetPDEtcdClient provides PD etcd Client of the tidb cluster.
GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error)
}

// defaultPDControl is the default implementation of PDControlInterface.
type defaultPDControl struct {
mutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
mutex sync.Mutex
etcdmutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
pdEtcdClients map[string]PDEtcdClient
}

// NewDefaultPDControl returns a defaultPDControl instance
func NewDefaultPDControl(kubeCli kubernetes.Interface) PDControlInterface {
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}}
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}}
}

// GetTLSConfig returns *tls.Config for given TiDB cluster.
Expand All @@ -72,6 +76,32 @@ func GetTLSConfig(kubeCli kubernetes.Interface, namespace Namespace, tcName stri
return crypto.LoadTlsConfigFromSecret(secret, caCert)
}

func (pdc *defaultPDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) {
pdc.etcdmutex.Lock()
defer pdc.etcdmutex.Unlock()

var tlsConfig *tls.Config
var err error

if tlsEnabled {
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil)
if err != nil {
klog.Errorf("Unable to get tls config for tidb cluster %q, pd etcd client may not work: %v", tcName, err)
return nil, err
}
return NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, tlsConfig)
}
key := pdEtcdClientKey(namespace, tcName)
if _, ok := pdc.pdEtcdClients[key]; !ok {
pdetcdClient, err := NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, nil)
if err != nil {
return nil, err
}
pdc.pdEtcdClients[key] = pdetcdClient
}
return pdc.pdEtcdClients[key], nil
}

// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
func (pdc *defaultPDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient {
pdc.mutex.Lock()
Expand Down Expand Up @@ -104,11 +134,19 @@ func pdClientKey(scheme string, namespace Namespace, clusterName string) string
return fmt.Sprintf("%s.%s.%s", scheme, clusterName, string(namespace))
}

func pdEtcdClientKey(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s.%s", clusterName, string(namespace))
}

// pdClientUrl builds the url of pd client
func PdClientURL(namespace Namespace, clusterName string, scheme string) string {
return fmt.Sprintf("%s://%s-pd.%s:2379", scheme, clusterName, string(namespace))
}

func PDEtcdClientURL(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s-pd.%s:2379", clusterName, string(namespace))
}

// PDClient provides pd server's api
type PDClient interface {
// GetHealth returns the PD's health info
Expand Down
21 changes: 16 additions & 5 deletions pkg/pdapi/pdetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"time"

etcdclientv3 "github.com/coreos/etcd/clientv3"
etcdclientv3util "github.com/coreos/etcd/clientv3/clientv3util"
)

type PDEtcdApi interface {
type PDEtcdClient interface {
// PutKey will put key to the target pd etcd cluster
PutKey(key, value string) error

// DeleteKey will delete key from the target pd etcd cluster
DeleteKey(key string) error
}

Expand All @@ -32,7 +34,7 @@ type pdEtcdClient struct {
etcdClient *etcdclientv3.Client
}

func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdApi, error) {
func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdClient, error) {
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{url},
DialTimeout: timeout,
Expand All @@ -57,6 +59,15 @@ func (pec *pdEtcdClient) PutKey(key, value string) error {
func (pec *pdEtcdClient) DeleteKey(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), pec.timeout)
defer cancel()
_, err := pec.etcdClient.Delete(ctx, key)
return err
kvc := etcdclientv3.NewKV(pec.etcdClient)

// perform a delete only if key already exists
_, err := kvc.Txn(ctx).
If(etcdclientv3util.KeyExists(key)).
Then(etcdclientv3.OpDelete(key)).
Commit()
if err != nil {
return err
}
return nil
}