From 8b55ab89e511112a11c78772bc665e2a0a9f58bd Mon Sep 17 00:00:00 2001 From: Peng Liu Date: Fri, 24 May 2019 18:56:48 +0800 Subject: [PATCH] Drain node and reboot --- Gopkg.lock | 17 + .../sriovnetworknodepolicy_controller.go | 3 - pkg/daemon/daemon.go | 81 ++- pkg/daemon/utils.go | 2 +- vendor/github.com/go-log/log/LICENSE | 19 + .../go-log/log/appengine/appengine.go | 27 + vendor/github.com/go-log/log/fmt/fmt.go | 25 + vendor/github.com/go-log/log/log.go | 30 + vendor/github.com/go-log/log/log/log.go | 25 + vendor/github.com/go-log/log/logrus/logrus.go | 38 ++ .../openshift/kubernetes-drain/LICENSE | 202 ++++++ .../openshift/kubernetes-drain/drain.go | 594 ++++++++++++++++++ 12 files changed, 1058 insertions(+), 5 deletions(-) create mode 100644 vendor/github.com/go-log/log/LICENSE create mode 100644 vendor/github.com/go-log/log/appengine/appengine.go create mode 100644 vendor/github.com/go-log/log/fmt/fmt.go create mode 100644 vendor/github.com/go-log/log/log.go create mode 100644 vendor/github.com/go-log/log/log/log.go create mode 100644 vendor/github.com/go-log/log/logrus/logrus.go create mode 100644 vendor/github.com/openshift/kubernetes-drain/LICENSE create mode 100644 vendor/github.com/openshift/kubernetes-drain/drain.go diff --git a/Gopkg.lock b/Gopkg.lock index 294a01f26..679e942f9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -175,6 +175,14 @@ revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" version = "v1.0.0" +[[projects]] + digest = "1:ef6b5f0c2e8e7ebbe6be27c5cb3cac15281ffe319b8676990cdbce1ec92092c2" + name = "github.com/go-log/log" + packages = ["."] + pruneopts = "NT" + revision = "37e2e1f19306361e1fc152a1839f1236149cb4e4" + version = "v0.1.0" + [[projects]] digest = "1:d421af4c4fe51d399667d573982d663fe1fa67020a88d3ae43466ebfe8e2b5c9" name = "github.com/go-logr/logr" @@ -515,6 +523,14 @@ revision = "90e289841c1ed79b7a598a7cd9959750cb5e89e2" version = "v1.5.0" +[[projects]] + branch = "master" + digest = "1:f7646c654e93258958dba300641f8f674d5a9ed015c11119793ba1156e2acbe9" + name = "github.com/openshift/kubernetes-drain" + packages = ["."] + pruneopts = "NT" + revision = "c2e51be1758efa30d71a4d30dc4e2db86b70a4df" + [[projects]] branch = "master" digest = "1:29bf43ecd02d43977bb41f4e8d86b3bac5e6d4380c888c9503b9f4a288cd342a" @@ -1281,6 +1297,7 @@ "github.com/intel/sriov-network-device-plugin/pkg/utils", "github.com/jaypipes/ghw", "github.com/onsi/gomega", + "github.com/openshift/kubernetes-drain", "github.com/operator-framework/operator-sdk/pkg/k8sutil", "github.com/operator-framework/operator-sdk/pkg/leader", "github.com/operator-framework/operator-sdk/pkg/log/zap", diff --git a/pkg/controller/sriovnetworknodepolicy/sriovnetworknodepolicy_controller.go b/pkg/controller/sriovnetworknodepolicy/sriovnetworknodepolicy_controller.go index 9a8945f22..072825912 100644 --- a/pkg/controller/sriovnetworknodepolicy/sriovnetworknodepolicy_controller.go +++ b/pkg/controller/sriovnetworknodepolicy/sriovnetworknodepolicy_controller.go @@ -267,9 +267,6 @@ func (r *ReconcileSriovNetworkNodePolicy) syncAllSriovNetworkNodeStates(dp *srio for _, node := range nl.Items { logger.Info("Sync SriovNetworkNodeState CR", "name", node.Name) - //////////////////////////////// - // TODO: drain node before sync - //////////////////////////////// ns := &sriovnetworkv1.SriovNetworkNodeState{} ns.Name = node.Name ns.Namespace = Namespace diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 3a2acea08..537457480 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -2,13 +2,17 @@ package daemon import ( "os" + "os/exec" "reflect" "time" "github.com/golang/glog" + drain "github.com/openshift/kubernetes-drain" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" // "k8s.io/client-go/informers" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" // "k8s.io/client-go/kubernetes/scheme" @@ -108,12 +112,39 @@ func (dn *Daemon) nodeStateChangeHandler(old, new interface{}) { glog.V(2).Infof("nodeStateChangeHandler(): Interface not changed") return } + + node, err := dn.kubeClient.CoreV1().Nodes().Get(oldState.GetName(), metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get node: %v", err) + } + + if needRestartNode(oldState, newState) { + dn.drainNode(node) + //////////////////////////// + /// TODO: call vendor plugin + //////////////////////////// + cmd := exec.Command("systemctl", "reboot") + if err := cmd.Run(); err != nil { + glog.Error("failed to reboot node") + } + } else { + if err := drain.Uncordon(dn.kubeClient.CoreV1().Nodes(), node, nil); err != nil { + glog.Errorf("failed to drain node: %v", err) + } + } + + restartDP := needRestartDevicePlugin(oldState, newState) + if restartDP { + dn.drainNode(node) + } + glog.V(2).Infof("nodeStateChangeHandler(): sync %s", newState.GetName()) if err := syncNodeState(newState); err != nil { glog.Warningf("nodeStateChangeHandler(): Failed to sync newNodeState. ERR: %s", err) return } - if needRestartDevicePlugin(oldState, newState) { + + if restartDP { glog.V(2).Infof("nodeStateChangeHandler(): Need to restart device plugin pod") pods, err := dn.kubeClient.CoreV1().Pods(namespace).List(metav1.ListOptions{ LabelSelector: "app=sriov-device-plugin", @@ -129,10 +160,45 @@ func (dn *Daemon) nodeStateChangeHandler(old, new interface{}) { glog.Warningf("nodeStateChangeHandler(): Failed to delete device plugin pod. ERR: %s", err) return } + if err := drain.Uncordon(dn.kubeClient.CoreV1().Nodes(), node, nil); err != nil { + glog.Errorf("failed to drain node: %v", err) + } } dn.refreshCh <- struct{}{} } +func (dn *Daemon) drainNode(node *corev1.Node) { + glog.Info("Update prepared; beginning drain") + + backoff := wait.Backoff{ + Steps: 5, + Duration: 10 * time.Second, + Factor: 2, + } + var lastErr error + + if err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := drain.Drain(dn.kubeClient, []*corev1.Node{node}, &drain.DrainOptions{ + DeleteLocalData: true, + Force: true, + GracePeriodSeconds: 600, + IgnoreDaemonsets: true, + }) + if err == nil { + return true, nil + } + lastErr = err + glog.Infof("Draining failed with: %v, retrying", err) + return false, nil + }); err != nil { + if err == wait.ErrWaitTimeout { + glog.Errorf("failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr) + } + glog.Errorf("failed to drain node: %v", err) + } + glog.Info("drain complete, reboot node") +} + func needRestartDevicePlugin(oldState, newState *sriovnetworkv1.SriovNetworkNodeState) bool { var found bool for _, in := range newState.Spec.Interfaces { @@ -151,3 +217,16 @@ func needRestartDevicePlugin(oldState, newState *sriovnetworkv1.SriovNetworkNode } return false } + +func needRestartNode(oldState, newState *sriovnetworkv1.SriovNetworkNodeState) bool { + for _, in := range newState.Spec.Interfaces { + for _, io := range oldState.Status.Interfaces { + if in.PciAddress == io.PciAddress { + if io.Vendor == "0x15b3" && in.NumVfs < io.TotalVfs { + return true + } + } + } + } + return false +} diff --git a/pkg/daemon/utils.go b/pkg/daemon/utils.go index a027981cd..2ae1f8ad7 100644 --- a/pkg/daemon/utils.go +++ b/pkg/daemon/utils.go @@ -313,4 +313,4 @@ func LoadKernelModule(name string) error { return err } return nil -} \ No newline at end of file +} diff --git a/vendor/github.com/go-log/log/LICENSE b/vendor/github.com/go-log/log/LICENSE new file mode 100644 index 000000000..0522e076e --- /dev/null +++ b/vendor/github.com/go-log/log/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2017 Go Log + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/go-log/log/appengine/appengine.go b/vendor/github.com/go-log/log/appengine/appengine.go new file mode 100644 index 000000000..215da2f89 --- /dev/null +++ b/vendor/github.com/go-log/log/appengine/appengine.go @@ -0,0 +1,27 @@ +package appengine + +import ( + "errors" + + "golang.org/x/net/context" + appenginelog "google.golang.org/appengine/log" +) + +type appengineLogger struct { + context context.Context +} + +func (a *appengineLogger) Log(v ...interface{}) { + appenginelog.Debugf(a.context, "%v", v...) +} + +func (a *appengineLogger) Logf(format string, v ...interface{}) { + appenginelog.Debugf(a.context, format, v...) +} + +func New(ctx context.Context) (*appengineLogger, error) { + if ctx == nil { + return nil, errors.New("appengine context required") + } + return &appengineLogger{context: ctx}, nil +} diff --git a/vendor/github.com/go-log/log/fmt/fmt.go b/vendor/github.com/go-log/log/fmt/fmt.go new file mode 100644 index 000000000..97bd04771 --- /dev/null +++ b/vendor/github.com/go-log/log/fmt/fmt.go @@ -0,0 +1,25 @@ +package fmt + +import ( + "fmt" + + "github.com/go-log/log" +) + +type fmtLogger struct{} + +var ( + _ log.Logger = New() +) + +func (t *fmtLogger) Log(v ...interface{}) { + fmt.Print(v...) +} + +func (t *fmtLogger) Logf(format string, v ...interface{}) { + fmt.Printf(format, v...) +} + +func New() *fmtLogger { + return &fmtLogger{} +} diff --git a/vendor/github.com/go-log/log/log.go b/vendor/github.com/go-log/log/log.go new file mode 100644 index 000000000..443864ffb --- /dev/null +++ b/vendor/github.com/go-log/log/log.go @@ -0,0 +1,30 @@ +// Package log provides a log interface +package log + +// Logger is a generic logging interface +type Logger interface { + Log(v ...interface{}) + Logf(format string, v ...interface{}) +} + +var ( + // The global default logger + DefaultLogger Logger = &noOpLogger{} +) + +// noOpLogger is used as a placeholder for the default logger +type noOpLogger struct{} + +func (n *noOpLogger) Log(v ...interface{}) {} + +func (n *noOpLogger) Logf(format string, v ...interface{}) {} + +// Log logs using the default logger +func Log(v ...interface{}) { + DefaultLogger.Log(v...) +} + +// Logf logs formatted using the default logger +func Logf(format string, v ...interface{}) { + DefaultLogger.Logf(format, v...) +} diff --git a/vendor/github.com/go-log/log/log/log.go b/vendor/github.com/go-log/log/log/log.go new file mode 100644 index 000000000..38c238210 --- /dev/null +++ b/vendor/github.com/go-log/log/log/log.go @@ -0,0 +1,25 @@ +package log + +import ( + golog "log" + + "github.com/go-log/log" +) + +type logLogger struct{} + +var ( + _ log.Logger = New() +) + +func (t *logLogger) Log(v ...interface{}) { + golog.Print(v...) +} + +func (t *logLogger) Logf(format string, v ...interface{}) { + golog.Printf(format, v...) +} + +func New() *logLogger { + return &logLogger{} +} diff --git a/vendor/github.com/go-log/log/logrus/logrus.go b/vendor/github.com/go-log/log/logrus/logrus.go new file mode 100644 index 000000000..dcac62d58 --- /dev/null +++ b/vendor/github.com/go-log/log/logrus/logrus.go @@ -0,0 +1,38 @@ +package logrus + +import ( + "github.com/go-log/log" + "github.com/sirupsen/logrus" +) + +type logrusLogger struct { + *logrus.Entry +} + +var ( + _ log.Logger = New() +) + +func (l *logrusLogger) Log(v ...interface{}) { + if l.Entry != nil { + l.Entry.Print(v...) + } else { + logrus.Print(v...) + } +} + +func (l *logrusLogger) Logf(format string, v ...interface{}) { + if l.Entry != nil { + l.Entry.Printf(format, v...) + } else { + logrus.Printf(format, v...) + } +} + +func WithFields(f logrus.Fields) log.Logger { + return &logrusLogger{logrus.WithFields(f)} +} + +func New() *logrusLogger { + return &logrusLogger{} +} diff --git a/vendor/github.com/openshift/kubernetes-drain/LICENSE b/vendor/github.com/openshift/kubernetes-drain/LICENSE new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/vendor/github.com/openshift/kubernetes-drain/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/openshift/kubernetes-drain/drain.go b/vendor/github.com/openshift/kubernetes-drain/drain.go new file mode 100644 index 000000000..a6e21ee77 --- /dev/null +++ b/vendor/github.com/openshift/kubernetes-drain/drain.go @@ -0,0 +1,594 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "errors" + "fmt" + "math" + "sort" + "strings" + "time" + + golog "github.com/go-log/log" + + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + typedextensionsv1beta1 "k8s.io/client-go/kubernetes/typed/extensions/v1beta1" + typedpolicyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1" +) + +type DrainOptions struct { + // Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet. + Force bool + + // Ignore DaemonSet-managed pods. + IgnoreDaemonsets bool + + // Period of time in seconds given to each pod to terminate + // gracefully. If negative, the default value specified in the pod + // will be used. + GracePeriodSeconds int + + // The length of time to wait before giving up on deletion or + // eviction. Zero means infinite. + Timeout time.Duration + + // Continue even if there are pods using emptyDir (local data that + // will be deleted when the node is drained). + DeleteLocalData bool + + // Namespace to filter pods on the node. + Namespace string + + // Label selector to filter pods on the node. + Selector labels.Selector + + // Logger allows callers to plug in their preferred logger. + Logger golog.Logger +} + +// Takes a pod and returns a bool indicating whether or not to operate on the +// pod, an optional warning message, and an optional fatal error. +type podFilter func(corev1.Pod) (include bool, w *warning, f *fatal) +type warning struct { + string +} +type fatal struct { + string +} + +const ( + EvictionKind = "Eviction" + EvictionSubresource = "pods/eviction" + + kDaemonsetFatal = "DaemonSet-managed pods (use IgnoreDaemonsets to ignore)" + kDaemonsetWarning = "ignoring DaemonSet-managed pods" + kLocalStorageFatal = "pods with local storage (use DeleteLocalData to override)" + kLocalStorageWarning = "deleting pods with local storage" + kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use Force to override)" + kUnmanagedWarning = "deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" +) + +// GetNodes looks up the nodes (either given by name as arguments or +// by the Selector option). +func GetNodes(client typedcorev1.NodeInterface, nodes []string, selector string) (out []*corev1.Node, err error) { + if len(nodes) == 0 && len(selector) == 0 { + return nil, nil + } + + if len(selector) > 0 && len(nodes) > 0 { + return nil, errors.New("cannot specify both node names and a selector option") + } + + out = []*corev1.Node{} + + for _, node := range nodes { + node, err := client.Get(node, metav1.GetOptions{}) + if err != nil { + return nil, err + } + out = append(out, node) + } + + if len(selector) > 0 { + nodes, err := client.List(metav1.ListOptions{ + LabelSelector: selector, + }) + if err != nil { + return nil, err + } + for _, node := range nodes.Items { + out = append(out, &node) + } + } + + return out, nil +} + +// Drain nodes in preparation for maintenance. +// +// The given nodes will be marked unschedulable to prevent new pods from arriving. +// Drain evicts the pods if the APIServer supports eviction +// (http://kubernetes.io/docs/admin/disruptions/). Otherwise, it will use normal DELETE +// to delete the pods. +// Drain evicts or deletes all pods except mirror pods (which cannot be deleted through +// the API server). If there are DaemonSet-managed pods, Drain will not proceed +// without IgnoreDaemonsets, and regardless it will not delete any +// DaemonSet-managed pods, because those pods would be immediately replaced by the +// DaemonSet controller, which ignores unschedulable markings. If there are any +// pods that are neither mirror pods nor managed by ReplicationController, +// ReplicaSet, DaemonSet, StatefulSet or Job, then Drain will not delete any pods unless you +// use Force. Force will also allow deletion to proceed if the managing resource of one +// or more pods is missing. +// +// Drain waits for graceful termination. You should not operate on the machine until +// the command completes. +// +// When you are ready to put the nodes back into service, use Uncordon, which +// will make the nodes schedulable again. +// +// ![Workflow](http://kubernetes.io/images/docs/kubectl_drain.svg) +func Drain(client kubernetes.Interface, nodes []*corev1.Node, options *DrainOptions) (err error) { + nodeInterface := client.CoreV1().Nodes() + for _, node := range nodes { + if err := Cordon(nodeInterface, node, options.Logger); err != nil { + return err + } + } + + drainedNodes := sets.NewString() + var fatal error + + for _, node := range nodes { + err := DeleteOrEvictPods(client, node, options) + if err == nil { + drainedNodes.Insert(node.Name) + logf(options.Logger, "drained node %q", node.Name) + } else { + log(options.Logger, err) + logf(options.Logger, "unable to drain node %q", node.Name) + remainingNodes := []string{} + fatal = err + for _, remainingNode := range nodes { + if drainedNodes.Has(remainingNode.Name) { + continue + } + remainingNodes = append(remainingNodes, remainingNode.Name) + } + + if len(remainingNodes) > 0 { + sort.Strings(remainingNodes) + logf(options.Logger, "there are pending nodes to be drained: %s", strings.Join(remainingNodes, ",")) + } + } + } + + return fatal +} + +// DeleteOrEvictPods deletes or (where supported) evicts pods from the +// target node and waits until the deletion/eviction completes, +// Timeout elapses, or an error occurs. +func DeleteOrEvictPods(client kubernetes.Interface, node *corev1.Node, options *DrainOptions) error { + pods, err := getPodsForDeletion(client, node, options) + if err != nil { + return err + } + + err = deleteOrEvictPods(client, pods, options) + if err != nil { + pendingPods, newErr := getPodsForDeletion(client, node, options) + if newErr != nil { + return newErr + } + pendingNames := make([]string, len(pendingPods)) + for i, pendingPod := range pendingPods { + pendingNames[i] = pendingPod.Name + } + sort.Strings(pendingNames) + logf(options.Logger, "failed to evict pods from node %q (pending pods: %s): %v", node.Name, strings.Join(pendingNames, ","), err) + } + return err +} + +func getPodController(pod corev1.Pod) *metav1.OwnerReference { + return metav1.GetControllerOf(&pod) +} + +func (o *DrainOptions) unreplicatedFilter(pod corev1.Pod) (bool, *warning, *fatal) { + // any finished pod can be removed + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return true, nil, nil + } + + controllerRef := getPodController(pod) + if controllerRef != nil { + return true, nil, nil + } + if o.Force { + return true, &warning{kUnmanagedWarning}, nil + } + + return false, nil, &fatal{kUnmanagedFatal} +} + +type DaemonSetFilterOptions struct { + client typedextensionsv1beta1.ExtensionsV1beta1Interface + force bool + ignoreDaemonSets bool +} + +func (o *DaemonSetFilterOptions) daemonSetFilter(pod corev1.Pod) (bool, *warning, *fatal) { + // Note that we return false in cases where the pod is DaemonSet managed, + // regardless of flags. We never delete them, the only question is whether + // their presence constitutes an error. + // + // The exception is for pods that are orphaned (the referencing + // management resource - including DaemonSet - is not found). + // Such pods will be deleted if Force is used. + controllerRef := getPodController(pod) + if controllerRef == nil || controllerRef.Kind != "DaemonSet" { + return true, nil, nil + } + + if _, err := o.client.DaemonSets(pod.Namespace).Get(controllerRef.Name, metav1.GetOptions{}); err != nil { + // remove orphaned pods with a warning if Force is used + if apierrors.IsNotFound(err) && o.force { + return true, &warning{err.Error()}, nil + } + return false, nil, &fatal{err.Error()} + } + + if !o.ignoreDaemonSets { + return false, nil, &fatal{kDaemonsetFatal} + } + + return false, &warning{kDaemonsetWarning}, nil +} + +func mirrorPodFilter(pod corev1.Pod) (bool, *warning, *fatal) { + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + return false, nil, nil + } + return true, nil, nil +} + +func hasLocalStorage(pod corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (o *DrainOptions) localStorageFilter(pod corev1.Pod) (bool, *warning, *fatal) { + if !hasLocalStorage(pod) { + return true, nil, nil + } + if !o.DeleteLocalData { + return false, nil, &fatal{kLocalStorageFatal} + } + return true, &warning{kLocalStorageWarning}, nil +} + +// Map of status message to a list of pod names having that status. +type podStatuses map[string][]string + +func (ps podStatuses) message() string { + msgs := []string{} + + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +// getPodsForDeletion receives resource info for a node, and returns all the pods from the given node that we +// are planning on deleting. If there are any pods preventing us from deleting, we return that list in an error. +func getPodsForDeletion(client kubernetes.Interface, node *corev1.Node, options *DrainOptions) (pods []corev1.Pod, err error) { + listOptions := metav1.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String(), + } + if options.Selector != nil { + listOptions.LabelSelector = options.Selector.String() + } + podList, err := client.CoreV1().Pods(options.Namespace).List(listOptions) + if err != nil { + return pods, err + } + + ws := podStatuses{} + fs := podStatuses{} + + daemonSetOptions := &DaemonSetFilterOptions{ + client: client.ExtensionsV1beta1(), + force: options.Force, + ignoreDaemonSets: options.IgnoreDaemonsets, + } + + for _, pod := range podList.Items { + podOk := true + for _, filt := range []podFilter{daemonSetOptions.daemonSetFilter, mirrorPodFilter, options.localStorageFilter, options.unreplicatedFilter} { + filterOk, w, f := filt(pod) + + podOk = podOk && filterOk + if w != nil { + ws[w.string] = append(ws[w.string], pod.Name) + } + if f != nil { + fs[f.string] = append(fs[f.string], pod.Name) + } + + // short-circuit as soon as pod not ok + // at that point, there is no reason to run pod + // through any additional filters + if !podOk { + break + } + } + if podOk { + pods = append(pods, pod) + } + } + + if len(fs) > 0 { + return []corev1.Pod{}, errors.New(fs.message()) + } + if len(ws) > 0 { + log(options.Logger, ws.message()) + } + return pods, nil +} + +func evictPod(client typedpolicyv1beta1.PolicyV1beta1Interface, pod corev1.Pod, policyGroupVersion string, gracePeriodSeconds int) error { + deleteOptions := &metav1.DeleteOptions{} + if gracePeriodSeconds >= 0 { + gracePeriod := int64(gracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriod + } + eviction := &policyv1beta1.Eviction{ + TypeMeta: metav1.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: deleteOptions, + } + return client.Evictions(eviction.Namespace).Evict(eviction) +} + +// deleteOrEvictPods deletes or evicts the pods on the api server +func deleteOrEvictPods(client kubernetes.Interface, pods []corev1.Pod, options *DrainOptions) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := SupportEviction(client) + if err != nil { + return err + } + + getPodFn := func(namespace, name string) (*corev1.Pod, error) { + return client.CoreV1().Pods(options.Namespace).Get(name, metav1.GetOptions{}) + } + + if len(policyGroupVersion) > 0 { + // Remember to change change the URL manipulation func when Evction's version change + return evictPods(client.PolicyV1beta1(), pods, policyGroupVersion, options, getPodFn) + } else { + return deletePods(client.CoreV1(), pods, options, getPodFn) + } +} + +func evictPods(client typedpolicyv1beta1.PolicyV1beta1Interface, pods []corev1.Pod, policyGroupVersion string, options *DrainOptions, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + returnCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod corev1.Pod, returnCh chan error) { + var err error + for { + err = evictPod(client, pod, policyGroupVersion, options.GracePeriodSeconds) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + returnCh <- nil + return + } else if apierrors.IsTooManyRequests(err) { + logf(options.Logger, "error when evicting pod %q (will retry after 5s): %v", pod.Name, err) + time.Sleep(5 * time.Second) + } else { + returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + podArray := []corev1.Pod{pod} + _, err = waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, options.Logger, getPodFn) + if err == nil { + returnCh <- nil + } else { + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, returnCh) + } + + doneCount := 0 + var errors []error + + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if options.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = options.Timeout + } + globalTimeoutCh := time.After(globalTimeout) + numPods := len(pods) + for doneCount < numPods { + select { + case err := <-returnCh: + doneCount++ + if err != nil { + errors = append(errors, err) + } + case <-globalTimeoutCh: + return fmt.Errorf("Drain did not complete within %v", globalTimeout) + } + } + return utilerrors.NewAggregate(errors) +} + +func deletePods(client typedcorev1.CoreV1Interface, pods []corev1.Pod, options *DrainOptions, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if options.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = options.Timeout + } + deleteOptions := &metav1.DeleteOptions{} + if options.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(options.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + for _, pod := range pods { + err := client.Pods(pod.Namespace).Delete(pod.Name, deleteOptions) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + _, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, options.Logger, getPodFn) + return err +} + +func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, logger golog.Logger, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) { + var verbStr string + if usingEviction { + verbStr = "evicted" + } else { + verbStr = "deleted" + } + + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []corev1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + logf(logger, "pod %q removed (%s)", pod.Name, verbStr) + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} + +// SupportEviction uses Discovery API to find out if the server +// supports the eviction subresource. If supported, it will return +// its groupVersion; otherwise it will return an empty string. +func SupportEviction(clientset kubernetes.Interface) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + +// Cordon marks a node "Unschedulable". This method is idempotent. +func Cordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger) error { + return cordonOrUncordon(client, node, logger, true) +} + +// Uncordon marks a node "Schedulable". This method is idempotent. +func Uncordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger) error { + return cordonOrUncordon(client, node, logger, false) +} + +func cordonOrUncordon(client typedcorev1.NodeInterface, node *corev1.Node, logger golog.Logger, desired bool) error { + unsched := node.Spec.Unschedulable + if unsched == desired { + return nil + } + + patch := []byte(fmt.Sprintf("{\"spec\":{\"unschedulable\":%t}}", desired)) + _, err := client.Patch(node.Name, types.StrategicMergePatchType, patch) + if err == nil { + verbStr := "cordoned" + if !desired { + verbStr = "un" + verbStr + } + logf(logger, "%s node %q", verbStr, node.Name) + } + return err +} + +func log(logger golog.Logger, v ...interface{}) { + if logger != nil { + logger.Log(v...) + } +} + +func logf(logger golog.Logger, format string, v ...interface{}) { + if logger != nil { + logger.Logf(format, v...) + } +}