diff --git a/pkg/webhook/admission_hooks.go b/pkg/webhook/admission_hooks.go index 33e99722f5e..82249c66e49 100644 --- a/pkg/webhook/admission_hooks.go +++ b/pkg/webhook/admission_hooks.go @@ -19,15 +19,19 @@ import ( "time" "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper" + "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/features" "github.com/pingcap/tidb-operator/pkg/pdapi" "github.com/pingcap/tidb-operator/pkg/webhook/pod" "github.com/pingcap/tidb-operator/pkg/webhook/strategy" "github.com/pingcap/tidb-operator/pkg/webhook/util" admission "k8s.io/api/admission/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" + eventv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" "k8s.io/klog" asappsv1 "github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1" @@ -148,7 +152,15 @@ func (a *AdmissionHook) Initialize(cfg *rest.Config, stopCh <-chan struct{}) err // init pdControl pdControl := pdapi.NewDefaultPDControl(kubeCli) - pc := pod.NewPodAdmissionControl(kubeCli, cli, pdControl, strings.Split(a.ExtraServiceAccounts, ","), a.EvictRegionLeaderTimeout) + + //init recorder + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{ + Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")}) + recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "tidb-admission-controller"}) + + pc := pod.NewPodAdmissionControl(kubeCli, cli, pdControl, strings.Split(a.ExtraServiceAccounts, ","), a.EvictRegionLeaderTimeout, recorder) a.podAC = pc klog.Info("pod admission webhook initialized successfully") a.stsAC = statefulset.NewStatefulSetAdmissionControl(cli) diff --git a/pkg/webhook/pod/pd_deleter.go b/pkg/webhook/pod/pd_deleter.go index 5fed10f5684..f03959e60b2 100644 --- a/pkg/webhook/pod/pd_deleter.go +++ b/pkg/webhook/pod/pd_deleter.go @@ -21,6 +21,7 @@ import ( operatorUtils "github.com/pingcap/tidb-operator/pkg/util" "github.com/pingcap/tidb-operator/pkg/webhook/util" admission "k8s.io/api/admission/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" @@ -93,6 +94,10 @@ func (pc *PodAdmissionControl) admitDeletePdPods(payload *admitPayload) *admissi return pc.transferPDLeader(payload) } + if isUpgrading { + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdUpgradeReason, podDeleteEventMessage(name)) + } + klog.Infof("pod[%s/%s] is not pd-leader,admit to delete", namespace, name) return util.ARSuccess() } @@ -127,6 +132,7 @@ func (pc *PodAdmissionControl) admitDeleteNonPDMemberPod(payload *admitPayload) pvc, err := pc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(pvcName, meta.GetOptions{}) if err != nil { if errors.IsNotFound(err) { + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdScaleInReason, podDeleteEventMessage(name)) return util.ARSuccess() } return util.ARFail(err) @@ -136,10 +142,10 @@ func (pc *PodAdmissionControl) admitDeleteNonPDMemberPod(payload *admitPayload) klog.Infof("tc[%s/%s]'s pod[%s/%s] failed to update pvc,%v", namespace, tcName, namespace, name, err) return util.ARFail(err) } + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, pdScaleInReason, podDeleteEventMessage(name)) } klog.Infof("pd pod[%s/%s] is not member of tc[%s/%s],admit to delete", namespace, name, namespace, tcName) return util.ARSuccess() - } err = payload.pdClient.DeleteMember(name) if err != nil { diff --git a/pkg/webhook/pod/pods.go b/pkg/webhook/pod/pods.go index c4ac21050a2..c7af2ef8d5f 100644 --- a/pkg/webhook/pod/pods.go +++ b/pkg/webhook/pod/pods.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" "k8s.io/klog" ) @@ -44,17 +45,24 @@ type PodAdmissionControl struct { pdControl pdapi.PDControlInterface // the map of the service account from the request which should be checked by webhook serviceAccounts sets.String + // recorder to send event + recorder record.EventRecorder } const ( stsControllerServiceAccounts = "system:serviceaccount:kube-system:statefulset-controller" + podDeleteMsgPattern = "pod [%s] deleted" + pdScaleInReason = "PDScaleIn" + pdUpgradeReason = "PDUpgrade" + tikvScaleInReason = "TiKVScaleIn" + tikvUpgradeReason = "TiKVUpgrade" ) var ( AstsControllerServiceAccounts string ) -func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned.Interface, PdControl pdapi.PDControlInterface, extraServiceAccounts []string, evictRegionLeaderTimeout time.Duration) *PodAdmissionControl { +func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned.Interface, PdControl pdapi.PDControlInterface, extraServiceAccounts []string, evictRegionLeaderTimeout time.Duration, recorder record.EventRecorder) *PodAdmissionControl { serviceAccounts := sets.NewString(stsControllerServiceAccounts) for _, sa := range extraServiceAccounts { @@ -69,6 +77,7 @@ func NewPodAdmissionControl(kubeCli kubernetes.Interface, operatorCli versioned. operatorCli: operatorCli, pdControl: PdControl, serviceAccounts: serviceAccounts, + recorder: recorder, } } diff --git a/pkg/webhook/pod/pods_test.go b/pkg/webhook/pod/pods_test.go index 2586ddb73fd..d9b62a27d90 100644 --- a/pkg/webhook/pod/pods_test.go +++ b/pkg/webhook/pod/pods_test.go @@ -16,8 +16,6 @@ package pod import ( "testing" - "k8s.io/client-go/kubernetes" - . "github.com/onsi/gomega" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" operatorClifake "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/fake" @@ -28,7 +26,9 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ) @@ -130,10 +130,12 @@ func newAdmissionRequest() *admission.AdmissionRequest { func newPodAdmissionControl(kubeCli kubernetes.Interface) *PodAdmissionControl { operatorCli := operatorClifake.NewSimpleClientset() pdControl := pdapi.NewFakePDControl(kubeCli) + recorder := record.NewFakeRecorder(10) return &PodAdmissionControl{ kubeCli: kubeCli, operatorCli: operatorCli, pdControl: pdControl, + recorder: recorder, } } diff --git a/pkg/webhook/pod/tikv_deleter.go b/pkg/webhook/pod/tikv_deleter.go index fb5fe443bdb..917595e2f07 100644 --- a/pkg/webhook/pod/tikv_deleter.go +++ b/pkg/webhook/pod/tikv_deleter.go @@ -25,6 +25,7 @@ import ( operatorUtils "github.com/pingcap/tidb-operator/pkg/util" "github.com/pingcap/tidb-operator/pkg/webhook/util" admission "k8s.io/api/admission/v1beta1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" @@ -133,6 +134,7 @@ func (pc *PodAdmissionControl) admitDeleteUselessTiKVPod(payload *admitPayload) pvc, err := pc.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(pvcName, meta.GetOptions{}) if err != nil { if errors.IsNotFound(err) { + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvScaleInReason, podDeleteEventMessage(name)) return util.ARSuccess() } return util.ARFail(err) @@ -142,6 +144,7 @@ func (pc *PodAdmissionControl) admitDeleteUselessTiKVPod(payload *admitPayload) klog.Infof("tc[%s/%s]'s tikv pod[%s/%s] failed to delete,%v", namespace, tcName, namespace, name, err) return util.ARFail(err) } + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvScaleInReason, podDeleteEventMessage(name)) } return util.ARSuccess() @@ -215,6 +218,7 @@ func (pc *PodAdmissionControl) admitDeleteUpTiKVPodDuringUpgrading(payload *admi } } + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvUpgradeReason, podDeleteEventMessage(name)) return util.ARSuccess() } @@ -231,5 +235,10 @@ func (pc *PodAdmissionControl) admitDeleteDownTikvPod(payload *admitPayload) *ad if !isInOrdinal { return pc.rejectDeleteTiKVPod() } + name := payload.pod.Name + isUpgrading := operatorUtils.IsStatefulSetUpgrading(payload.ownerStatefulSet) + if isUpgrading { + pc.recorder.Event(payload.tc, corev1.EventTypeNormal, tikvUpgradeReason, podDeleteEventMessage(name)) + } return util.ARSuccess() } diff --git a/pkg/webhook/pod/util.go b/pkg/webhook/pod/util.go index 16ea0cdaf82..951367ecd14 100644 --- a/pkg/webhook/pod/util.go +++ b/pkg/webhook/pod/util.go @@ -203,3 +203,7 @@ func appendExtraLabelsENVForTiKV(labels map[string]string, container *core.Conta }) } } + +func podDeleteEventMessage(name string) string { + return fmt.Sprintf(podDeleteMsgPattern, name) +}