Skip to content

Commit

Permalink
cherry pick pingcap#2483 to release-1.1
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
Yisaer authored and sre-bot committed Jun 8, 2020
1 parent 4bade93 commit e35a68e
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 22 deletions.
12 changes: 12 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -15299,6 +15299,18 @@ string
<p>Name is the name of TidbMonitor object</p>
</td>
</tr>
<tr>
<td>
<code>grafanaEnabled</code></br>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>GrafanaEnabled indicate whether the grafana is enabled for this target tidbmonitor</p>
</td>
</tr>
</tbody>
</table>
<h3 id="tidbmonitorspec">TidbMonitorSpec</h3>
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6127,6 +6127,8 @@ spec:
type: string
monitor:
properties:
grafanaEnabled:
type: boolean
name:
type: string
namespace:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/pingcap/v1alpha1/tidbclusterautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type TidbMonitorRef struct {

// Name is the name of TidbMonitor object
Name string `json:"name"`

// GrafanaEnabled indicate whether the grafana is enabled for this target tidbmonitor
// +optional
GrafanaEnabled bool `json:"grafanaEnabled,omitempty"`
}

// +k8s:openapi-gen=true
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func NewController(
setControl,
),
mm.NewTidbDiscoveryManager(typedControl),
mm.NewTidbClusterStatusManager(cli),
mm.NewTidbClusterStatusManager(kubeCli, cli),
podRestarter,
&tidbClusterConditionUpdater{},
recorder,
Expand Down
143 changes: 133 additions & 10 deletions pkg/manager/member/tidbcluster_status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,160 @@
package member

import (
"encoding/json"
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

const (
prometheusComponent = "prometheus"
grafanaComponent = "grafana"
//TODO support AlertManager, move to UCP
alertmanager = "alertmanager"
componentPrefix = "/topology"
)

type TidbClusterStatusManager struct {
cli versioned.Interface
cli versioned.Interface
pdControl pdapi.PDControlInterface
}

func NewTidbClusterStatusManager(cli versioned.Interface) *TidbClusterStatusManager {
func NewTidbClusterStatusManager(kubeCli kubernetes.Interface, cli versioned.Interface) *TidbClusterStatusManager {
return &TidbClusterStatusManager{
cli: cli,
cli: cli,
pdControl: pdapi.NewDefaultPDControl(kubeCli),
}
}

func (tcsm *TidbClusterStatusManager) Sync(tc *v1alpha1.TidbCluster) error {
return tcsm.syncTidbMonitorRefAndKey(tc)
}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRefAndKey(tc *v1alpha1.TidbCluster) error {
tm, err := tcsm.syncTidbMonitorRef(tc)
if err != nil {
return err
}
return tcsm.syncDashboardMetricStorage(tc, tm)
}

func (tcsm *TidbClusterStatusManager) syncTidbMonitorRef(tc *v1alpha1.TidbCluster) (*v1alpha1.TidbMonitor, error) {
if tc.Status.Monitor == nil {
return nil, nil
}
tmRef := tc.Status.Monitor
tm, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
err = nil
}
return nil, err
}
tcRef := tm.Spec.Clusters
if len(tcRef) < 1 {
tc.Status.Monitor = nil
return nil, nil
}
if len(tcRef[0].Namespace) < 1 {
tcRef[0].Namespace = tm.Namespace
}
if tcRef[0].Name != tc.Name || tcRef[0].Namespace != tc.Namespace {
tc.Status.Monitor = nil
return nil, nil
}
tc.Status.Monitor.GrafanaEnabled = true
if tm.Spec.Grafana == nil {
tc.Status.Monitor.GrafanaEnabled = false
}

return tm, nil
}

func (tcsm *TidbClusterStatusManager) syncDashboardMetricStorage(tc *v1alpha1.TidbCluster, tm *v1alpha1.TidbMonitor) error {
pdEtcdClient, err := tcsm.pdControl.GetPDEtcdClient(pdapi.Namespace(tc.Namespace), tc.Name, tc.IsTLSClusterEnabled())
if err != nil {
return err
}
var prometheusExist bool
var grafanaExist bool
if tc.Status.Monitor != nil {
tmRef := tc.Status.Monitor
_, err := tcsm.cli.PingcapV1alpha1().TidbMonitors(tmRef.Namespace).Get(tmRef.Name, metav1.GetOptions{})
prometheusExist = true
grafanaExist = tc.Status.Monitor.GrafanaEnabled
} else {
prometheusExist = false
grafanaExist = false
}

// sync prometheus key
err = syncComponent(prometheusExist, tm, prometheusComponent, 9090, pdEtcdClient)
if err != nil {
return err
}

// sync grafana key
err = syncComponent(grafanaExist, tm, grafanaComponent, 3000, pdEtcdClient)
if err != nil {
return err
}
return nil
}

func syncComponent(exist bool, tm *v1alpha1.TidbMonitor, componentName string, port int, etcdClient pdapi.PDEtcdClient) error {
key := buildComponentKey(componentName)
if exist {
v, err := buildComponentValue(tm, componentName, port)
if err != nil {
klog.Error(err.Error())
return err
}
err = etcdClient.PutKey(key, v)
if err != nil {
klog.Error(err.Error())
return err
}
} else {
err := etcdClient.DeleteKey(key)
if err != nil {
if errors.IsNotFound(err) {
tc.Status.Monitor = nil
} else {
return err
}
klog.Error(err.Error())
return err
}
}
return nil
}

func buildComponentKey(component string) string {
return fmt.Sprintf("%s/%s", componentPrefix, component)
}

func buildComponentValue(tm *v1alpha1.TidbMonitor, componentName string, port int) (string, error) {
return buildEtcdValue(fmt.Sprintf("%s-%s.%s.svc", tm.Name, componentName, tm.Namespace), port)
}

type componentTopology struct {
IP string `json:"ip"`
Port int `json:"port"`
}

func buildEtcdValue(host string, port int) (string, error) {
topology := componentTopology{
IP: host,
Port: port,
}
data, err := json.Marshal(topology)
if err != nil {
return "", err
}
return string(data), nil
}

type FakeTidbClusterStatusManager struct {
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/monitor/monitor/monitor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,16 @@ func (mm *MonitorManager) patchTidbClusterStatus(tcRef *v1alpha1.TidbClusterRef,
}
var mergePatch []byte
if tcRef != nil {
grafanaEnabled := true
if monitor.Spec.Grafana == nil {
grafanaEnabled = false
}
mergePatch, err = json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"monitor": map[string]interface{}{
"name": monitor.Name,
"namespace": monitor.Namespace,
"name": monitor.Name,
"namespace": monitor.Namespace,
"grafanaEnabled": grafanaEnabled,
},
},
})
Expand Down
46 changes: 42 additions & 4 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ type Namespace string
type PDControlInterface interface {
// GetPDClient provides PDClient of the tidb cluster.
GetPDClient(Namespace, string, bool) PDClient
// GetPDEtcdClient provides PD etcd Client of the tidb cluster.
GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error)
}

// defaultPDControl is the default implementation of PDControlInterface.
type defaultPDControl struct {
mutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
mutex sync.Mutex
etcdmutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
pdEtcdClients map[string]PDEtcdClient
}

// NewDefaultPDControl returns a defaultPDControl instance
func NewDefaultPDControl(kubeCli kubernetes.Interface) PDControlInterface {
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}}
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}}
}

// GetTLSConfig returns *tls.Config for given TiDB cluster.
Expand All @@ -72,6 +76,32 @@ func GetTLSConfig(kubeCli kubernetes.Interface, namespace Namespace, tcName stri
return crypto.LoadTlsConfigFromSecret(secret, caCert)
}

func (pdc *defaultPDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) {
pdc.etcdmutex.Lock()
defer pdc.etcdmutex.Unlock()

var tlsConfig *tls.Config
var err error

if tlsEnabled {
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil)
if err != nil {
klog.Errorf("Unable to get tls config for tidb cluster %q, pd etcd client may not work: %v", tcName, err)
return nil, err
}
return NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, tlsConfig)
}
key := pdEtcdClientKey(namespace, tcName)
if _, ok := pdc.pdEtcdClients[key]; !ok {
pdetcdClient, err := NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, nil)
if err != nil {
return nil, err
}
pdc.pdEtcdClients[key] = pdetcdClient
}
return pdc.pdEtcdClients[key], nil
}

// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
func (pdc *defaultPDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient {
pdc.mutex.Lock()
Expand Down Expand Up @@ -104,11 +134,19 @@ func pdClientKey(scheme string, namespace Namespace, clusterName string) string
return fmt.Sprintf("%s.%s.%s", scheme, clusterName, string(namespace))
}

func pdEtcdClientKey(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s.%s", clusterName, string(namespace))
}

// pdClientUrl builds the url of pd client
func PdClientURL(namespace Namespace, clusterName string, scheme string) string {
return fmt.Sprintf("%s://%s-pd.%s:2379", scheme, clusterName, string(namespace))
}

func PDEtcdClientURL(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s-pd.%s:2379", clusterName, string(namespace))
}

// PDClient provides pd server's api
type PDClient interface {
// GetHealth returns the PD's health info
Expand Down
21 changes: 16 additions & 5 deletions pkg/pdapi/pdetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"time"

etcdclientv3 "github.com/coreos/etcd/clientv3"
etcdclientv3util "github.com/coreos/etcd/clientv3/clientv3util"
)

type PDEtcdApi interface {
type PDEtcdClient interface {
// PutKey will put key to the target pd etcd cluster
PutKey(key, value string) error

// DeleteKey will delete key from the target pd etcd cluster
DeleteKey(key string) error
}

Expand All @@ -32,7 +34,7 @@ type pdEtcdClient struct {
etcdClient *etcdclientv3.Client
}

func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdApi, error) {
func NewPdEtcdClient(url string, timeout time.Duration, tlsConfig *tls.Config) (PDEtcdClient, error) {
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{url},
DialTimeout: timeout,
Expand All @@ -57,6 +59,15 @@ func (pec *pdEtcdClient) PutKey(key, value string) error {
func (pec *pdEtcdClient) DeleteKey(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), pec.timeout)
defer cancel()
_, err := pec.etcdClient.Delete(ctx, key)
return err
kvc := etcdclientv3.NewKV(pec.etcdClient)

// perform a delete only if key already exists
_, err := kvc.Txn(ctx).
If(etcdclientv3util.KeyExists(key)).
Then(etcdclientv3.OpDelete(key)).
Commit()
if err != nil {
return err
}
return nil
}

0 comments on commit e35a68e

Please sign in to comment.