diff --git a/cmd/csi-snapshotter/create_crd.go b/cmd/csi-snapshotter/create_crd.go new file mode 100644 index 000000000..d2c64a3e4 --- /dev/null +++ b/cmd/csi-snapshotter/create_crd.go @@ -0,0 +1,139 @@ +/* +Copyright 2018 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "reflect" + "time" + + "github.com/golang/glog" + crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/rest" +) + +const ( + // SnapshotPVCAnnotation is "snapshot.alpha.kubernetes.io/snapshot" + SnapshotPVCAnnotation = "volumesnapshot.csi.k8s.io/snapshot" +) + +// NewClient creates a new RESTClient +func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) { + scheme := runtime.NewScheme() + if err := crdv1.AddToScheme(scheme); err != nil { + return nil, nil, err + } + + config := *cfg + config.GroupVersion = &crdv1.SchemeGroupVersion + config.APIPath = "/apis" + config.ContentType = runtime.ContentTypeJSON + config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)} + + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, nil, err + } + + return client, scheme, nil +} + +// CreateCRD creates CustomResourceDefinition +func CreateCRD(clientset apiextensionsclient.Interface) error { + crd := &apiextensionsv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdv1.SnapshotClassResourcePlural + "." + crdv1.GroupName, + }, + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ + Group: crdv1.GroupName, + Version: crdv1.SchemeGroupVersion.Version, + Scope: apiextensionsv1beta1.ClusterScoped, + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ + Plural: crdv1.SnapshotClassResourcePlural, + Kind: reflect.TypeOf(crdv1.SnapshotClass{}).Name(), + }, + }, + } + res, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + + if err != nil && !apierrors.IsAlreadyExists(err) { + glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v", + res, err) + } + + crd = &apiextensionsv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdv1.VolumeSnapshotDataResourcePlural + "." + crdv1.GroupName, + }, + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ + Group: crdv1.GroupName, + Version: crdv1.SchemeGroupVersion.Version, + Scope: apiextensionsv1beta1.ClusterScoped, + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ + Plural: crdv1.VolumeSnapshotDataResourcePlural, + Kind: reflect.TypeOf(crdv1.VolumeSnapshotData{}).Name(), + }, + }, + } + res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + + if err != nil && !apierrors.IsAlreadyExists(err) { + glog.Fatalf("failed to create VolumeSnapshotDataResource: %#v, err: %#v", + res, err) + } + + crd = &apiextensionsv1beta1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: crdv1.VolumeSnapshotResourcePlural + "." + crdv1.GroupName, + }, + Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{ + Group: crdv1.GroupName, + Version: crdv1.SchemeGroupVersion.Version, + Scope: apiextensionsv1beta1.NamespaceScoped, + Names: apiextensionsv1beta1.CustomResourceDefinitionNames{ + Plural: crdv1.VolumeSnapshotResourcePlural, + Kind: reflect.TypeOf(crdv1.VolumeSnapshot{}).Name(), + }, + }, + } + res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) + + if err != nil && !apierrors.IsAlreadyExists(err) { + glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v", + res, err) + } + + return nil +} + +// WaitForSnapshotResource waits for the snapshot resource +func WaitForSnapshotResource(snapshotClient *rest.RESTClient) error { + return wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) { + _, err := snapshotClient.Get(). + Resource(crdv1.VolumeSnapshotDataResourcePlural).DoRaw() + if err == nil { + return true, nil + } + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + }) +} diff --git a/cmd/csi-snapshotter/main.go b/cmd/csi-snapshotter/main.go index 50e8d8d39..080a36b4e 100644 --- a/cmd/csi-snapshotter/main.go +++ b/cmd/csi-snapshotter/main.go @@ -1,7 +1,195 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package main -import "fmt" +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "time" + + "github.com/golang/glog" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.com/kubernetes-csi/external-snapshotter/pkg/connection" + "github.com/kubernetes-csi/external-snapshotter/pkg/controller" + + clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned" + informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" +) + +const ( + // Number of worker threads + threads = 10 + + // Default timeout of short CSI calls like GetPluginInfo + csiTimeout = time.Second +) + +// Command line flags +var ( + snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot data for snapshot that request a StorageClass with a snapshotter field set equal to this name.") + kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") + resync = flag.Duration("resync", 10*time.Second, "Resync interval of the controller.") + connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.") + csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") + createSnapshotDataRetryCount = flag.Int("createSnapshotDataRetryCount", 5, "Number of retries when we create a snapshot data object for a snapshot.") + createSnapshotDataInterval = flag.Duration("createSnapshotDataInterval", 10*time.Second, "Interval between retries when we create a snapshot data object for a snapshot.") + resyncPeriod = flag.Duration("resyncPeriod", 60*time.Second, "The period that should be used to re-sync the snapshot.") +) func main() { - fmt.Println("vim-go") + flag.Set("logtostderr", "true") + flag.Parse() + + // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. + config, err := buildConfig(*kubeconfig) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + snapClient, err := clientset.NewForConfig(config) + if err != nil { + glog.Errorf("Error building snapshot clientset: %s", err.Error()) + os.Exit(1) + } + + factory := informers.NewSharedInformerFactory(snapClient, *resync) + + // Create CRD resource + aeclientset, err := apiextensionsclient.NewForConfig(config) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // initialize CRD resource if it does not exist + err = CreateCRD(aeclientset) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // make a new config for our extension's API group, using the first config as a baseline + crdClient, _, err := NewClient(config) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // wait until CRD gets processed + err = WaitForSnapshotResource(crdClient) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // Connect to CSI. + csiConn, err := connection.New(*csiAddress, *connectionTimeout) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // Find driver name. + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + + // Check it's ready + if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + + // Find out if the driver supports create/delete snapshot. + supportsCreateSnapshot, err := csiConn.SupportsControllerCreateSnapshot(ctx) + if err != nil { + glog.Error(err.Error()) + os.Exit(1) + } + if !supportsCreateSnapshot { + glog.Error("CSI driver does not support ControllerCreateSnapshot") + os.Exit(1) + } + + glog.V(2).Infof("Start NewCSISnapshotController with snapshotter %s", *snapshotter) + + ctrl := controller.NewCSISnapshotController( + snapClient, + kubeClient, + *snapshotter, + factory.Volumesnapshot().V1alpha1().VolumeSnapshots(), + factory.Volumesnapshot().V1alpha1().VolumeSnapshotDatas(), + *createSnapshotDataRetryCount, + *createSnapshotDataInterval, + csiConn, + *connectionTimeout, + *resyncPeriod, + ) + + // run... + stopCh := make(chan struct{}) + factory.Start(stopCh) + go ctrl.Run(threads, stopCh) + + // ...until SIGINT + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + close(stopCh) +} + +func buildConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} + +func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error { + now := time.Now() + finish := now.Add(timeout) + var err error + for { + ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) + defer cancel() + err = csiConn.Probe(ctx) + if err == nil { + glog.V(2).Infof("Probe succeeded") + return nil + } + glog.V(2).Infof("Probe failed with %s", err) + + now := time.Now() + if now.After(finish) { + return fmt.Errorf("failed to probe the controller: %s", err) + } + time.Sleep(time.Second) + } } diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go new file mode 100644 index 000000000..01e1b21b3 --- /dev/null +++ b/pkg/connection/connection.go @@ -0,0 +1,266 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package connection + +import ( + "context" + "fmt" + "net" + "strings" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi/v0" + "github.com/golang/glog" + crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "k8s.io/api/core/v1" +) + +// CSIConnection is gRPC connection to a remote CSI driver and abstracts all +// CSI calls. +type CSIConnection interface { + // GetDriverName returns driver name as discovered by GetPluginInfo() + // gRPC call. + GetDriverName(ctx context.Context) (string, error) + + // SupportsControllerCreateSnapshot returns true if the CSI driver reports + // CREATE_DELETE_SNAPSHOT in ControllerGetCapabilities() gRPC call. + SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) + + // SupportsControllerListSnapshots returns true if the CSI driver reports + // LIST_SNAPSHOTS in ControllerGetCapabilities() gRPC call. + SupportsControllerListSnapshots(ctx context.Context) (bool, error) + + // CreateSnapshot creates a snapshot for a volume + CreateSnapshot(ctx context.Context, snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (driverName string, snapshotId string, timestamp int64, status *csi.SnapshotStatus, err error) + + // DeleteSnapshot deletes a snapshot from a volume + DeleteSnapshot(ctx context.Context, snapshotID string) (err error) + + // ListSnapshots lists snapshot from a volume + ListSnapshots(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, error) + + // Probe checks that the CSI driver is ready to process requests + Probe(ctx context.Context) error + + // Close the connection + Close() error +} + +type csiConnection struct { + conn *grpc.ClientConn +} + +var ( + _ CSIConnection = &csiConnection{} +) + +func New(address string, timeout time.Duration) (CSIConnection, error) { + conn, err := connect(address, timeout) + if err != nil { + return nil, err + } + return &csiConnection{ + conn: conn, + }, nil +} + +func connect(address string, timeout time.Duration) (*grpc.ClientConn, error) { + glog.V(2).Infof("Connecting to %s", address) + dialOptions := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBackoffMaxDelay(time.Second), + grpc.WithUnaryInterceptor(logGRPC), + } + if strings.HasPrefix(address, "/") { + dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + })) + } + conn, err := grpc.Dial(address, dialOptions...) + + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for { + if !conn.WaitForStateChange(ctx, conn.GetState()) { + glog.V(4).Infof("Connection timed out") + // subsequent GetPluginInfo will show the real connection error + return conn, nil + } + if conn.GetState() == connectivity.Ready { + glog.V(3).Infof("Connected") + return conn, nil + } + glog.V(4).Infof("Still trying, connection is %s", conn.GetState()) + } +} + +func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) { + client := csi.NewIdentityClient(c.conn) + + req := csi.GetPluginInfoRequest{} + + rsp, err := client.GetPluginInfo(ctx, &req) + if err != nil { + return "", err + } + name := rsp.GetName() + if name == "" { + return "", fmt.Errorf("name is empty") + } + return name, nil +} + +func (c *csiConnection) Probe(ctx context.Context) error { + client := csi.NewIdentityClient(c.conn) + + req := csi.ProbeRequest{} + + _, err := client.Probe(ctx, &req) + if err != nil { + return err + } + return nil +} + +func (c *csiConnection) SupportsControllerCreateSnapshot(ctx context.Context) (bool, error) { + client := csi.NewControllerClient(c.conn) + req := csi.ControllerGetCapabilitiesRequest{} + + rsp, err := client.ControllerGetCapabilities(ctx, &req) + if err != nil { + return false, err + } + caps := rsp.GetCapabilities() + for _, cap := range caps { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + if rpc.GetType() == csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT { + return true, nil + } + } + return false, nil +} + +func (c *csiConnection) SupportsControllerListSnapshots(ctx context.Context) (bool, error) { + client := csi.NewControllerClient(c.conn) + req := csi.ControllerGetCapabilitiesRequest{} + + rsp, err := client.ControllerGetCapabilities(ctx, &req) + if err != nil { + return false, err + } + caps := rsp.GetCapabilities() + for _, cap := range caps { + if cap == nil { + continue + } + rpc := cap.GetRpc() + if rpc == nil { + continue + } + if rpc.GetType() == csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS { + return true, nil + } + } + return false, nil +} + +func (c *csiConnection) CreateSnapshot(ctx context.Context, snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string) (string, string, int64, *csi.SnapshotStatus, error) { + glog.V(5).Infof("CSI CreateSnapshot: %s", snapshot.Name) + if volume.Spec.CSI == nil { + return "", "", 0, nil, fmt.Errorf("CSIPersistentVolumeSource not defined in spec") + } + + client := csi.NewControllerClient(c.conn) + + driverName, err := c.GetDriverName(ctx) + if err != nil { + return "", "", 0, nil, err + } + + req := csi.CreateSnapshotRequest{ + SourceVolumeId: volume.Spec.CSI.VolumeHandle, + Name: snapshot.Name, + Parameters: parameters, + CreateSnapshotSecrets: nil, + } + + rsp, err := client.CreateSnapshot(ctx, &req) + if err != nil { + return "", "", 0, nil, err + } + + glog.V(5).Infof("CSI CreateSnapshot: %s driver name [%s] snapshot ID [%s] time stamp [%s] status [%s]", snapshot.Name, driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, *rsp.Snapshot.Status) + return driverName, rsp.Snapshot.Id, rsp.Snapshot.CreatedAt, rsp.Snapshot.Status, nil +} + +func (c *csiConnection) DeleteSnapshot(ctx context.Context, snapshotID string) (err error) { + client := csi.NewControllerClient(c.conn) + + req := csi.DeleteSnapshotRequest{ + SnapshotId: snapshotID, + DeleteSnapshotSecrets: nil, + } + + if _, err := client.DeleteSnapshot(ctx, &req); err != nil { + return err + } + + return nil +} + +func (c *csiConnection) ListSnapshots(ctx context.Context, snapshotID string) (*csi.SnapshotStatus, error) { + client := csi.NewControllerClient(c.conn) + + req := csi.ListSnapshotsRequest{ + SnapshotId: snapshotID, + } + + rsp, err := client.ListSnapshots(ctx, &req) + if err != nil { + return nil, err + } + + if rsp.Entries == nil || len(rsp.Entries) == 0 { + return nil, fmt.Errorf("can not find snapshot for snapshotID %s", snapshotID) + } + + return rsp.Entries[0].Snapshot.Status, nil +} + +func (c *csiConnection) Close() error { + return c.conn.Close() +} + +func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + glog.V(5).Infof("GRPC call: %s", method) + glog.V(5).Infof("GRPC request: %+v", req) + err := invoker(ctx, method, req, reply, cc, opts...) + glog.V(5).Infof("GRPC response: %+v", reply) + glog.V(5).Infof("GRPC error: %v", err) + return err +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go new file mode 100644 index 000000000..6c4a35663 --- /dev/null +++ b/pkg/controller/controller.go @@ -0,0 +1,651 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "time" + + "github.com/golang/glog" + crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned" + storageinformers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions/volumesnapshot/v1alpha1" + storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1alpha1" + "github.com/kubernetes-csi/external-snapshotter/pkg/connection" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/util/goroutinemap" + "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" +) + +var ( + // Statuses of snapshot creation process + statusReady string = "ready" + statusError string = "error" + statusUploading string = "uploading" + statusNew string = "new" +) + +type CSISnapshotController struct { + clientset clientset.Interface + client kubernetes.Interface + snapshotterName string + eventRecorder record.EventRecorder + vsQueue workqueue.RateLimitingInterface + vsdQueue workqueue.RateLimitingInterface + + vsLister storagelisters.VolumeSnapshotLister + vsListerSynced cache.InformerSynced + vsdLister storagelisters.VolumeSnapshotDataLister + vsdListerSynced cache.InformerSynced + + vsStore cache.Store + vsdStore cache.Store + + handler Handler + // Map of scheduled/running operations. + runningOperations goroutinemap.GoRoutineMap + + createSnapshotDataRetryCount int + createSnapshotDataInterval time.Duration + resyncPeriod time.Duration +} + +// NewCSISnapshotController returns a new *CSISnapshotController +func NewCSISnapshotController( + clientset clientset.Interface, + client kubernetes.Interface, + snapshotterName string, + volumeSnapshotInformer storageinformers.VolumeSnapshotInformer, + volumeSnapshotDataInformer storageinformers.VolumeSnapshotDataInformer, + createSnapshotDataRetryCount int, + createSnapshotDataInterval time.Duration, + conn connection.CSIConnection, + timeout time.Duration, + resyncPeriod time.Duration, +) *CSISnapshotController { + broadcaster := record.NewBroadcaster() + broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.Core().Events(v1.NamespaceAll)}) + var eventRecorder record.EventRecorder + eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("csi-snapshotter %s", snapshotterName)}) + + ctrl := &CSISnapshotController{ + clientset: clientset, + client: client, + snapshotterName: snapshotterName, + eventRecorder: eventRecorder, + handler: NewCSIHandler(clientset, client, snapshotterName, eventRecorder, conn, timeout, createSnapshotDataRetryCount, createSnapshotDataInterval), + runningOperations: goroutinemap.NewGoRoutineMap(true), + createSnapshotDataRetryCount: createSnapshotDataRetryCount, + createSnapshotDataInterval: createSnapshotDataInterval, + resyncPeriod: resyncPeriod, + vsStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + vsdStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), + vsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-vs"), + vsdQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-vsd"), + } + + volumeSnapshotInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.enqueueVsWork(obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueVsWork(newObj) }, + DeleteFunc: func(obj interface{}) { ctrl.enqueueVsWork(obj) }, + }, + ctrl.resyncPeriod, + ) + ctrl.vsLister = volumeSnapshotInformer.Lister() + ctrl.vsListerSynced = volumeSnapshotInformer.Informer().HasSynced + + volumeSnapshotDataInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.enqueueVsdWork(obj) }, + UpdateFunc: func(oldObj, newObj interface{}) { ctrl.enqueueVsdWork(newObj) }, + DeleteFunc: func(obj interface{}) { ctrl.enqueueVsdWork(obj) }, + }, + ctrl.resyncPeriod, + ) + ctrl.vsdLister = volumeSnapshotDataInformer.Lister() + ctrl.vsdListerSynced = volumeSnapshotDataInformer.Informer().HasSynced + + return ctrl +} + +func (ctrl *CSISnapshotController) Run(workers int, stopCh <-chan struct{}) { + defer ctrl.vsQueue.ShutDown() + defer ctrl.vsdQueue.ShutDown() + + glog.Infof("Starting CSI snapshotter") + defer glog.Infof("Shutting CSI snapshotter") + + if !cache.WaitForCacheSync(stopCh, ctrl.vsListerSynced, ctrl.vsdListerSynced) { + glog.Errorf("Cannot sync caches") + return + } + + ctrl.initializeCaches(ctrl.vsLister, ctrl.vsdLister) + + for i := 0; i < workers; i++ { + go wait.Until(ctrl.vsWorker, 0, stopCh) + go wait.Until(ctrl.vsdWorker, 0, stopCh) + } + + <-stopCh +} + +// enqueueVsWork adds snapshot to given work queue. +func (ctrl *CSISnapshotController) enqueueVsWork(obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + if vs, ok := obj.(*crdv1.VolumeSnapshot); ok { + objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vs) + if err != nil { + glog.Errorf("failed to get key from object: %v, %v", err, vs) + return + } + glog.V(5).Infof("enqueued %q for sync", objName) + ctrl.vsQueue.Add(objName) + } +} + +// enqueueVsdWork adds snapshot data to given work queue. +func (ctrl *CSISnapshotController) enqueueVsdWork(obj interface{}) { + // Beware of "xxx deleted" events + if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil { + obj = unknown.Obj + } + if vsd, ok := obj.(*crdv1.VolumeSnapshotData); ok { + objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(vsd) + if err != nil { + glog.Errorf("failed to get key from object: %v, %v", err, vsd) + return + } + glog.V(5).Infof("enqueued %q for sync", objName) + ctrl.vsdQueue.Add(objName) + } +} + +// vsWorker processes items from vsQueue. It must run only once, +// syncSnapshot is not assured to be reentrant. +func (ctrl *CSISnapshotController) vsWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.vsQueue.Get() + if quit { + return true + } + defer ctrl.vsQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("vsWorker[%s]", key) + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + glog.V(5).Infof("vsWorker: snapshot namespace [%s] name [%s]", namespace, name) + if err != nil { + glog.V(4).Infof("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err) + return false + } + snapshot, err := ctrl.vsLister.VolumeSnapshots(namespace).Get(name) + if err == nil { + if ctrl.shouldProcessVS(snapshot) { + // The volume snapshot still exists in informer cache, the event must have + // been add/update/sync + glog.V(4).Infof("should process snapshot") + ctrl.updateVs(snapshot) + } + return false + } + if err != nil && !errors.IsNotFound(err) { + glog.V(2).Infof("error getting snapshot %q from informer: %v", key, err) + return false + } + // The snapshot is not in informer cache, the event must have been "delete" + vsObj, found, err := ctrl.vsStore.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting snapshot %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the snapshot from its cache + glog.V(2).Infof("deletion of vs %q was already processed", key) + return false + } + snapshot, ok := vsObj.(*crdv1.VolumeSnapshot) + if !ok { + glog.Errorf("expected vs, got %+v", vsObj) + return false + } + if ctrl.shouldProcessVS(snapshot) { + ctrl.deleteVS(snapshot) + } + return false + } + + for { + if quit := workFunc(); quit { + glog.Infof("snapshot worker queue shutting down") + return + } + } +} + +// vsdWorker processes items from vsdQueue. It must run only once, +// syncVsd is not assured to be reentrant. +func (ctrl *CSISnapshotController) vsdWorker() { + workFunc := func() bool { + keyObj, quit := ctrl.vsdQueue.Get() + if quit { + return true + } + defer ctrl.vsdQueue.Done(keyObj) + key := keyObj.(string) + glog.V(5).Infof("vsdWorker[%s]", key) + + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + glog.V(4).Infof("error getting name of snapshotData %q to get snapshotData from informer: %v", key, err) + return false + } + vsd, err := ctrl.vsdLister.Get(name) + if err == nil { + // The volume still exists in informer cache, the event must have + // been add/update/sync + ctrl.updateVsd(vsd) + return false + } + if !errors.IsNotFound(err) { + glog.V(2).Infof("error getting vsd %q from informer: %v", key, err) + return false + } + + // The vsd is not in informer cache, the event must have been + // "delete" + vsdObj, found, err := ctrl.vsdStore.GetByKey(key) + if err != nil { + glog.V(2).Infof("error getting vsd %q from cache: %v", key, err) + return false + } + if !found { + // The controller has already processed the delete event and + // deleted the volume from its cache + glog.V(2).Infof("deletion of vsd %q was already processed", key) + return false + } + vsd, ok := vsdObj.(*crdv1.VolumeSnapshotData) + if !ok { + glog.Errorf("expected vsd, got %+v", vsd) + return false + } + ctrl.deleteVSD(vsd) + return false + } + + for { + if quit := workFunc(); quit { + glog.Infof("vsd worker queue shutting down") + return + } + } +} + +// shouldProcessVS detect if snapshotter in the SnapshotClass is the same as the snapshotter +// in external controller. +func (ctrl *CSISnapshotController) shouldProcessVS(snapshot *crdv1.VolumeSnapshot) bool { + class, err := ctrl.handler.GetClassFromVolumeSnapshot(snapshot) + if err != nil { + return false + } + glog.V(5).Infof("SnapshotClass Snapshotter [%s] Snapshot Controller snapshotterName [%s]", class.Snapshotter, ctrl.snapshotterName) + if class.Snapshotter != ctrl.snapshotterName { + glog.V(4).Infof("Skipping VolumeSnapshot %s for snapshotter [%s] in SnapshotClass because it does not match with the snapshotter for controller [%s]", vsToVsKey(snapshot), class.Snapshotter, ctrl.snapshotterName) + return false + } + return true +} + +// updateVs runs in worker thread and handles "vs added", +// "vs updated" and "periodic sync" events. +func (ctrl *CSISnapshotController) updateVs(vs *crdv1.VolumeSnapshot) { + // Store the new vs version in the cache and do not process it if this is + // an old version. + glog.V(5).Infof("updateVs %q", vsToVsKey(vs)) + newVS, err := ctrl.storeVSUpdate(vs) + if err != nil { + glog.Errorf("%v", err) + } + if !newVS { + return + } + err = ctrl.syncVS(vs) + if err != nil { + if errors.IsConflict(err) { + // Version conflict error happens quite often and the controller + // recovers from it easily. + glog.V(3).Infof("could not sync claim %q: %+v", vsToVsKey(vs), err) + } else { + glog.Errorf("could not sync volume %q: %+v", vsToVsKey(vs), err) + } + } +} + +// updateVsd runs in worker thread and handles "vsd added", +// "vsd updated" and "periodic sync" events. +func (ctrl *CSISnapshotController) updateVsd(vsd *crdv1.VolumeSnapshotData) { + // Store the new vs version in the cache and do not process it if this is + // an old version. + new, err := ctrl.storeVSDUpdate(vsd) + if err != nil { + glog.Errorf("%v", err) + } + if !new { + return + } + err = ctrl.syncVSD(vsd) + if err != nil { + if errors.IsConflict(err) { + // Version conflict error happens quite often and the controller + // recovers from it easily. + glog.V(3).Infof("could not sync vsd %q: %+v", vsd.Name, err) + } else { + glog.Errorf("could not sync vsd %q: %+v", vsd.Name, err) + } + } +} + +// syncVSD deals with one key off the queue. It returns false when it's time to quit. +func (ctrl *CSISnapshotController) syncVSD(vsd *crdv1.VolumeSnapshotData) error { + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]", vsd.Name) + + // VolumeSnapshotData is not bind to any VolumeSnapshot, this case rare and we just return err + if vsd.Spec.VolumeSnapshotRef == nil { + // Vsd is not bind + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]: vsd is not bind", vsd.Name) + return fmt.Errorf("volumeSnapshotData %s is not bind to any VolumeSnapshot", vsd.Name) + } else { + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]: vsd is bound to vs %s", vsd.Name, vsrefToVsKey(vsd.Spec.VolumeSnapshotRef)) + // Get the VS by _name_ + var vs *crdv1.VolumeSnapshot + vsName := vsrefToVsKey(vsd.Spec.VolumeSnapshotRef) + obj, found, err := ctrl.vsStore.GetByKey(vsName) + if err != nil { + return err + } + if !found { + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]: vs %s not found", vsd.Name, vsrefToVsKey(vsd.Spec.VolumeSnapshotRef)) + // Fall through with vs = nil + } else { + var ok bool + vs, ok = obj.(*crdv1.VolumeSnapshot) + if !ok { + return fmt.Errorf("cannot convert object from vs cache to vs %q!?: %#v", vsd.Name, obj) + } + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]: vs %s found", vsd.Name, vsrefToVsKey(vsd.Spec.VolumeSnapshotRef)) + } + if vs != nil && vs.UID != vsd.Spec.VolumeSnapshotRef.UID { + // The vs that the vsd was pointing to was deleted, and another + // with the same name created. + glog.V(4).Infof("synchronizing VolumeSnapshotData[%s]: vsd %s has different UID, the old one must have been deleted", vsd.Name, vsrefToVsKey(vsd.Spec.VolumeSnapshotRef)) + // Treat the volume as bound to a missing claim. + vs = nil + } + if vs == nil { + ctrl.deleteSnapshotData(vsd) + } + } + return nil +} + +// syncVS deals with one key off the queue. It returns false when it's time to quit. +func (ctrl *CSISnapshotController) syncVS(vs *crdv1.VolumeSnapshot) error { + var result *crdv1.VolumeSnapshot + var err error + uniqueSnapshotName := vsToVsKey(vs) + + // vs has not been bound + glog.V(5).Infof("syncVS %s", uniqueSnapshotName) + if vs.Spec.SnapshotDataName == "" { + var snapshotDataObj *crdv1.VolumeSnapshotData + // Check whether snapshotData object is already created or not. If yes, snapshot is already + // triggered through cloud provider, bind it and return pending state + if snapshotDataObj = ctrl.getMatchVsd(vs); snapshotDataObj != nil { + glog.Infof("Find snapshot data object %s from snapshot %s", snapshotDataObj.Name, uniqueSnapshotName) + if result, err = ctrl.handler.BindandUpdateVolumeSnapshot(snapshotDataObj, vs, nil); err != nil { + return err + } + _, err = ctrl.storeVSUpdate(result) + if err != nil { + // We will get an "snapshot update" event soon, this is not a big error + glog.V(4).Infof("syncVS [%s]: cannot update internal cache: %v", vsToVsKey(vs), err) + } + return nil + } + + glog.Infof("syncSnapshot: Creating snapshot %s ...", uniqueSnapshotName) + if err := ctrl.createSnapshot(vs); err != nil { + return err + } + return nil + } else { + obj, found, err := ctrl.vsdStore.GetByKey(vs.Spec.SnapshotDataName) + if err != nil { + return err + } + if !found { + // vs is bound to a non-existing vsd. + return fmt.Errorf("snapshot %s is bound to a non-existing vsd %s", uniqueSnapshotName, vs.Spec.SnapshotDataName) + } + vsd, ok := obj.(*crdv1.VolumeSnapshotData) + if !ok { + return fmt.Errorf("cannot convert object from vsd cache to vsd %q!?: %#v", vs.Spec.SnapshotDataName, obj) + } + status := ctrl.handler.GetSimplifiedSnapshotStatus(vs.Status.Conditions) + + switch status { + case statusReady: + glog.Infof("Snapshot %s created successfully.", uniqueSnapshotName) + return nil + case statusError: + glog.Infof("syncSnapshot: Error creating snapshot %s.", uniqueSnapshotName) + return fmt.Errorf("error creating snapshot %s", uniqueSnapshotName) + case statusUploading: + glog.V(4).Infof("syncSnapshot: Snapshot %s is Uploading.", uniqueSnapshotName) + // Query the driver for the status of the snapshot with snapshot id + // from VolumeSnapshotData object. + csiSnapshotStatus, err := ctrl.handler.ListSnapshots(vsd) + if err != nil { + return fmt.Errorf("failed to check snapshot state %s with error %v", uniqueSnapshotName, err) + } + snapCondition := ConvertSnapshotStatus(csiSnapshotStatus) + if result, err = ctrl.handler.UpdateVolumeSnapshotStatus(vs, &snapCondition); err != nil { + return err + } + _, err = ctrl.storeVSUpdate(result) + if err != nil { + // We will get an "snapshot update" event soon, this is not a big error + glog.V(4).Infof("syncVS [%s]: cannot update internal cache: %v", vsToVsKey(vs), err) + } + return nil + case statusNew: + glog.Infof("syncSnapshot: Binding snapshot %s ...", uniqueSnapshotName) + if result, err = ctrl.handler.BindandUpdateVolumeSnapshot(vsd, vs, &vs.Status); err != nil { + return err + } + _, err = ctrl.storeVSUpdate(result) + if err != nil { + // We will get an "snapshot update" event soon, this is not a big error + glog.V(4).Infof("syncVS [%s]: cannot update internal cache: %v", vsToVsKey(vs), err) + } + return nil + } + return fmt.Errorf("error occurred when creating snapshot %s, unknown status %s", uniqueSnapshotName, status) + } +} + +// getMatchVsd looks up VolumeSnapshotData for a VolumeSnapshot named snapshotName +func (ctrl *CSISnapshotController) getMatchVsd(vs *crdv1.VolumeSnapshot) *crdv1.VolumeSnapshotData { + var snapshotDataObj *crdv1.VolumeSnapshotData + var found bool + + objs := ctrl.vsdStore.List() + for _, obj := range objs { + vsd := obj.(*crdv1.VolumeSnapshotData) + if vsd.Spec.VolumeSnapshotRef != nil && + vsd.Spec.VolumeSnapshotRef.Name == vs.Name && + vsd.Spec.VolumeSnapshotRef.Namespace == vs.Namespace && + vsd.Spec.VolumeSnapshotRef.UID == vs.UID { + found = true + snapshotDataObj = vsd + break + } + } + + if !found { + glog.V(4).Infof("No VolumeSnapshotData for VolumeSnapshot %s found", vsToVsKey(vs)) + return nil + } + + return snapshotDataObj +} + +// deleteVS runs in worker thread and handles "snapshot deleted" event. +func (ctrl *CSISnapshotController) deleteVS(vs *crdv1.VolumeSnapshot) { + _ = ctrl.vsStore.Delete(vs) + glog.V(4).Infof("vs %q deleted", vsToVsKey(vs)) + + snapshotDataName := vs.Spec.SnapshotDataName + if snapshotDataName == "" { + glog.V(5).Infof("deleteVS[%q]: vsd not bound", vsToVsKey(vs)) + return + } + // sync the vsd when its vs is deleted. Explicitly sync'ing the + // vsd here in response to vs deletion prevents the vsd from + // waiting until the next sync period for its Release. + glog.V(5).Infof("deleteVS[%q]: scheduling sync of vsd %s", vsToVsKey(vs), snapshotDataName) + ctrl.vsdQueue.Add(snapshotDataName) +} + +// deleteVSD runs in worker thread and handles "snapshot deleted" event. +func (ctrl *CSISnapshotController) deleteVSD(vsd *crdv1.VolumeSnapshotData) { + _ = ctrl.vsdStore.Delete(vsd) + glog.V(4).Infof("vsd %q deleted", vsd.Name) + + snapshotName := vsrefToVsKey(vsd.Spec.VolumeSnapshotRef) + if snapshotName == "" { + glog.V(5).Infof("deleteVSD[%q]: vsd not bound", vsd.Name) + return + } + // sync the vs when its vs is deleted. Explicitly sync'ing the + // vs here in response to vsd deletion prevents the vs from + // waiting until the next sync period for its Release. + glog.V(5).Infof("deleteVSD[%q]: scheduling sync of vs %s", vsd.Name, snapshotName) + ctrl.vsdQueue.Add(snapshotName) +} + +// initializeCaches fills all controller caches with initial data from etcd in +// order to have the caches already filled when first addVS/addVSD to +// perform initial synchronization of the controller. +func (ctrl *CSISnapshotController) initializeCaches(vsLister storagelisters.VolumeSnapshotLister, vsdLister storagelisters.VolumeSnapshotDataLister) { + vsList, err := vsLister.List(labels.Everything()) + if err != nil { + glog.Errorf("CSISnapshotController can't initialize caches: %v", err) + return + } + for _, vs := range vsList { + vsClone := vs.DeepCopy() + if _, err = ctrl.storeVSUpdate(vsClone); err != nil { + glog.Errorf("error updating volume snapshot cache: %v", err) + } + } + + vsdList, err := vsdLister.List(labels.Everything()) + if err != nil { + glog.Errorf("CSISnapshotController can't initialize caches: %v", err) + return + } + for _, vsd := range vsdList { + vsdClone := vsd.DeepCopy() + if _, err = ctrl.storeVSUpdate(vsdClone); err != nil { + glog.Errorf("error updating volume snapshot cache: %v", err) + } + } + + glog.V(4).Infof("controller initialized") +} + +// deleteSnapshotData starts delete action. +func (ctrl *CSISnapshotController) deleteSnapshotData(vsd *crdv1.VolumeSnapshotData) { + operationName := fmt.Sprintf("delete-%s[%s]", vsd.Name, string(vsd.UID)) + glog.V(4).Infof("Snapshotter is about to delete volume snapshot and the operation named %s", operationName) + ctrl.scheduleOperation(operationName, func() error { + return ctrl.handler.DeleteSnapshotDataOperation(vsd) + }) +} + +// scheduleOperation starts given asynchronous operation on given volume. It +// makes sure the operation is already not running. +func (ctrl *CSISnapshotController) scheduleOperation(operationName string, operation func() error) { + glog.V(4).Infof("scheduleOperation[%s]", operationName) + + err := ctrl.runningOperations.Run(operationName, operation) + if err != nil { + switch { + case goroutinemap.IsAlreadyExists(err): + glog.V(4).Infof("operation %q is already running, skipping", operationName) + case exponentialbackoff.IsExponentialBackoff(err): + glog.V(4).Infof("operation %q postponed due to exponential backoff", operationName) + default: + glog.Errorf("error scheduling operation %q: %v", operationName, err) + } + } +} + +func (ctrl *CSISnapshotController) storeVSUpdate(vs interface{}) (bool, error) { + return storeObjectUpdate(ctrl.vsStore, vs, "vs") +} + +func (ctrl *CSISnapshotController) storeVSDUpdate(vsd interface{}) (bool, error) { + return storeObjectUpdate(ctrl.vsdStore, vsd, "vsd") +} + +// createSnapshot starts new asynchronous operation to create snapshot data for snapshot +func (ctrl *CSISnapshotController) createSnapshot(vs *crdv1.VolumeSnapshot) error { + glog.V(4).Infof("createSnapshot[%s]: started", vsToVsKey(vs)) + opName := fmt.Sprintf("create-%s[%s]", vsToVsKey(vs), string(vs.UID)) + ctrl.scheduleOperation(opName, func() error { + snapshotObj, err := ctrl.handler.CreateSnapshotOperation(vs) + if err != nil { + glog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", vsToVsKey(snapshotObj), err) + return err + } + _, updateErr := ctrl.storeVSUpdate(snapshotObj) + if updateErr != nil { + // We will get an "snapshot update" event soon, this is not a big error + glog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", vsToVsKey(snapshotObj), updateErr) + } + + return nil + }) + return nil +} diff --git a/pkg/controller/csi_handler.go b/pkg/controller/csi_handler.go new file mode 100644 index 000000000..a1a2c4691 --- /dev/null +++ b/pkg/controller/csi_handler.go @@ -0,0 +1,447 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi/v0" + "github.com/golang/glog" + crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned" + "github.com/kubernetes-csi/external-snapshotter/pkg/connection" + "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/record" +) + +// Handler is responsible for handling VolumeSnapshot events from informer. +type Handler interface { + CreateSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) + DeleteSnapshotDataOperation(vsd *crdv1.VolumeSnapshotData) error + ListSnapshots(vsd *crdv1.VolumeSnapshotData) (*csi.SnapshotStatus, error) + BindandUpdateVolumeSnapshot(snapshotData *crdv1.VolumeSnapshotData, snapshot *crdv1.VolumeSnapshot, status *crdv1.VolumeSnapshotStatus) (*crdv1.VolumeSnapshot, error) + GetClassFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*crdv1.SnapshotClass, error) + UpdateVolumeSnapshotStatus(snapshot *crdv1.VolumeSnapshot, condition *crdv1.VolumeSnapshotCondition) (*crdv1.VolumeSnapshot, error) + GetSimplifiedSnapshotStatus(conditions []crdv1.VolumeSnapshotCondition) string +} + +// csiHandler is a handler that calls CSI to create/delete volume snapshot. +type csiHandler struct { + clientset clientset.Interface + client kubernetes.Interface + snapshotterName string + eventRecorder record.EventRecorder + csiConnection connection.CSIConnection + timeout time.Duration + createSnapshotDataRetryCount int + createSnapshotDataInterval time.Duration +} + +func NewCSIHandler( + clientset clientset.Interface, + client kubernetes.Interface, + snapshotterName string, + eventRecorder record.EventRecorder, + csiConnection connection.CSIConnection, + timeout time.Duration, + createSnapshotDataRetryCount int, + createSnapshotDataInterval time.Duration, +) Handler { + return &csiHandler{ + clientset: clientset, + client: client, + snapshotterName: snapshotterName, + eventRecorder: eventRecorder, + csiConnection: csiConnection, + timeout: timeout, + createSnapshotDataRetryCount: createSnapshotDataRetryCount, + createSnapshotDataInterval: createSnapshotDataInterval, + } +} + +func (handler *csiHandler) takeSnapshot(snapshot *crdv1.VolumeSnapshot, + volume *v1.PersistentVolume, parameters map[string]string) (*crdv1.VolumeSnapshotData, *crdv1.VolumeSnapshotStatus, error) { + glog.V(5).Infof("takeSnapshot: [%s]", snapshot.Name) + ctx, cancel := context.WithTimeout(context.Background(), handler.timeout) + defer cancel() + + driverName, snapshotId, timestamp, csiSnapshotStatus, err := handler.csiConnection.CreateSnapshot(ctx, snapshot, volume, parameters) + if err != nil { + return nil, nil, fmt.Errorf("failed to take snapshot of the volume %s: %q", volume.Name, err) + } + + snapDataName := GetSnapshotDataNameForSnapshot(snapshot) + + // Create VolumeSnapshotData in the database + snapshotData := &crdv1.VolumeSnapshotData{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapDataName, + }, + Spec: crdv1.VolumeSnapshotDataSpec{ + VolumeSnapshotRef: &v1.ObjectReference{ + Kind: "VolumeSnapshot", + Namespace: snapshot.Namespace, + Name: snapshot.Name, + UID: snapshot.UID, + APIVersion: "v1alpha1", + }, + PersistentVolumeRef: &v1.ObjectReference{ + Kind: "PersistentVolume", + Name: volume.Name, + }, + VolumeSnapshotSource: crdv1.VolumeSnapshotSource{ + CSI: &crdv1.CSIVolumeSnapshotSource{ + Driver: driverName, + SnapshotHandle: snapshotId, + CreatedAt: timestamp, + }, + }, + }, + } + + status := &crdv1.VolumeSnapshotStatus{ + Conditions: []crdv1.VolumeSnapshotCondition{ + ConvertSnapshotStatus(csiSnapshotStatus), + }, + } + + glog.V(5).Infof("takeSnapshot: Created snapshot [%s]. Snapshot object [%#v] Status [%#v]", snapshot.Name, snapshotData, status) + return snapshotData, status, nil +} + +func (handler *csiHandler) deleteSnapshot(vsd *crdv1.VolumeSnapshotData) error { + if vsd.Spec.CSI == nil { + return fmt.Errorf("CSISnapshot not defined in spec") + } + ctx, cancel := context.WithTimeout(context.Background(), handler.timeout) + defer cancel() + + err := handler.csiConnection.DeleteSnapshot(ctx, vsd.Spec.CSI.SnapshotHandle) + if err != nil { + return fmt.Errorf("failed to delete snapshot data %s: %q", vsd.Name, err) + } + + return nil +} + +func (handler *csiHandler) ListSnapshots(vsd *crdv1.VolumeSnapshotData) (*csi.SnapshotStatus, error) { + if vsd.Spec.CSI == nil { + return nil, fmt.Errorf("CSISnapshot not defined in spec") + } + ctx, cancel := context.WithTimeout(context.Background(), handler.timeout) + defer cancel() + + csiSnapshotStatus, err := handler.csiConnection.ListSnapshots(ctx, vsd.Spec.CSI.SnapshotHandle) + if err != nil { + return nil, fmt.Errorf("failed to list snapshot data %s: %q", vsd.Name, err) + } + + return csiSnapshotStatus, nil +} + +// The function goes through the whole snapshot creation process. +// 1. Update VolumeSnapshot metadata to include the snapshotted PV name, timestamp and snapshot uid, also generate tag for cloud provider +// 2. Trigger the snapshot through cloud provider and attach the tag to the snapshot. +// 3. Create the VolumeSnapshotData object with the snapshot id information returned from step 2. +// 4. Bind the VolumeSnapshot and VolumeSnapshotData object +// 5. Query the snapshot status through cloud provider and update the status until snapshot is ready or fails. +func (handler *csiHandler) CreateSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) { + glog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", vsToVsKey(snapshot)) + var result *crdv1.VolumeSnapshot + var err error + class, err := handler.GetClassFromVolumeSnapshot(snapshot) + if err != nil { + glog.Errorf("creatSnapshotOperation failed to getClassFromVolumeSnapshot %s", err) + return nil, err + } + + // A previous createSnapshot may just have finished while we were waiting for + // the locks. Check that snapshot data (with deterministic name) hasn't been created + // yet. + snapDataName := GetSnapshotDataNameForSnapshot(snapshot) + + vsd, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshotDatas().Get(snapDataName, metav1.GetOptions{}) + if err == nil && vsd != nil { + // Volume snapshot data has been already created, nothing to do. + glog.V(4).Infof("createSnapshot [%s]: volume snapshot data already exists, skipping", vsToVsKey(snapshot)) + return nil, nil + } + glog.V(5).Infof("createSnapshotOperation [%s]: VolumeSnapshotData does not exist yet", vsToVsKey(snapshot)) + + volume, err := handler.getVolumeFromVolumeSnapshot(snapshot) + if err != nil { + glog.Errorf("createSnapshotOperation failed [%s]: Error: [%#v]", snapshot.Name, err) + return nil, err + } + snapshotData, status, err := handler.takeSnapshot(snapshot, volume, class.Parameters) + if err != nil { + return nil, fmt.Errorf("failed to take snapshot of the volume %s: %q", volume.Name, err) + } + + // Try to create the VSD object several times + for i := 0; i < handler.createSnapshotDataRetryCount; i++ { + glog.V(4).Infof("createSnapshot [%s]: trying to save volume snapshot data %s", vsToVsKey(snapshot), snapshotData.Name) + if _, err = handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshotDatas().Create(snapshotData); err == nil || apierrs.IsAlreadyExists(err) { + // Save succeeded. + if err != nil { + glog.V(3).Infof("volume snapshot data %q for snapshot %q already exists, reusing", snapshotData.Name, vsToVsKey(snapshot)) + err = nil + } else { + glog.V(3).Infof("volume snapshot data %q for snapshot %q saved", snapshotData.Name, vsToVsKey(snapshot)) + } + break + } + // Save failed, try again after a while. + glog.V(3).Infof("failed to save volume snapshot data %q for snapshot %q: %v", snapshotData.Name, vsToVsKey(snapshot), err) + time.Sleep(handler.createSnapshotDataInterval) + } + + if err != nil { + // Save failed. Now we have a storage asset outside of Kubernetes, + // but we don't have appropriate volumesnapshotdata object for it. + // Emit some event here and try to delete the storage asset several + // times. + strerr := fmt.Sprintf("Error creating volume snapshot data object for snapshot %s: %v. Deleting the snapshot data.", vsToVsKey(snapshot), err) + glog.Error(strerr) + handler.eventRecorder.Event(snapshot, v1.EventTypeWarning, "CreateSnapshotDataFailed", strerr) + + for i := 0; i < handler.createSnapshotDataRetryCount; i++ { + if err = handler.deleteSnapshot(snapshotData); err == nil { + // Delete succeeded + glog.V(4).Infof("createSnapshot [%s]: cleaning snapshot data %s succeeded", vsToVsKey(snapshot), snapshotData.Name) + break + } + // Delete failed, try again after a while. + glog.Infof("failed to delete snapshot data %q: %v", snapshotData.Name, err) + time.Sleep(handler.createSnapshotDataInterval) + } + + if err != nil { + // Delete failed several times. There is an orphaned volume snapshot data and there + // is nothing we can do about it. + strerr := fmt.Sprintf("Error cleaning volume snapshot data for snapshot %s: %v. Please delete manually.", vsToVsKey(snapshot), err) + glog.Error(strerr) + handler.eventRecorder.Event(snapshot, v1.EventTypeWarning, "SnapshotDataCleanupFailed", strerr) + } + } else { + // save succeeded, bind and update status for snapshot. + result, err = handler.BindandUpdateVolumeSnapshot(snapshotData, snapshot, status) + if err != nil { + return nil, err + } + } + + return result, err +} + +// Delete a snapshot +// 1. Find the SnapshotData corresponding to Snapshot +// 1a: Not found => finish (it's been deleted already) +// 2. Ask the backend to remove the snapshot device +// 3. Delete the SnapshotData object +// 4. Remove the Snapshot from vsStore +// 5. Finish +func (handler *csiHandler) DeleteSnapshotDataOperation(vsd *crdv1.VolumeSnapshotData) error { + glog.V(4).Infof("deleteSnapshotOperation [%s] started", vsd.Name) + + err := handler.deleteSnapshot(vsd) + if err != nil { + return fmt.Errorf("failed to delete snapshot %#v, err: %v", vsd.Name, err) + } + + err = handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshotDatas().Delete(vsd.Name, &metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("failed to delete VolumeSnapshotData %s from API server: %q", vsd.Name, err) + } + + return nil +} + +func (handler *csiHandler) BindandUpdateVolumeSnapshot(snapshotData *crdv1.VolumeSnapshotData, snapshot *crdv1.VolumeSnapshot, status *crdv1.VolumeSnapshotStatus) (*crdv1.VolumeSnapshot, error) { + glog.V(4).Infof("bindandUpdateVolumeSnapshot for snapshot [%s]: snapshotData [%s] status [%#v]", snapshot.Name, snapshotData.Name, status) + snapshotObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Get(snapshot.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error get snapshot %s from api server: %v", vsToVsKey(snapshot), err) + } + + // Copy the snapshot object before updating it + snapshotCopy := snapshotObj.DeepCopy() + var updateSnapshot *crdv1.VolumeSnapshot + if snapshotObj.Spec.SnapshotDataName == snapshotData.Name { + glog.Infof("bindVolumeSnapshotDataToVolumeSnapshot: VolumeSnapshot %s already bind to volumeSnapshotData [%s]", snapshot.Name, snapshotData.Name) + } else { + glog.Infof("bindVolumeSnapshotDataToVolumeSnapshot: before bind VolumeSnapshot %s to volumeSnapshotData [%s]", snapshot.Name, snapshotData.Name) + snapshotCopy.Spec.SnapshotDataName = snapshotData.Name + updateSnapshot, err = handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy) + if err != nil { + glog.Infof("bindVolumeSnapshotDataToVolumeSnapshot: Error binding VolumeSnapshot %s to volumeSnapshotData [%s]. Error [%#v]", snapshot.Name, snapshotData.Name, err) + return nil, fmt.Errorf("error updating snapshot object %s on the API server: %v", vsToVsKey(updateSnapshot), err) + } + snapshotCopy = updateSnapshot + } + + if status != nil && status.Conditions != nil && len(status.Conditions) > 0 { + snapshotCopy.Status = *(status.DeepCopy()) + updateSnapshot2, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotCopy) + if err != nil { + return nil, fmt.Errorf("error updating snapshot object %s on the API server: %v", vsToVsKey(snapshotCopy), err) + } + snapshotCopy = updateSnapshot2 + } + + glog.V(5).Infof("bindandUpdateVolumeSnapshot for snapshot completed [%#v]", snapshotCopy) + return snapshotCopy, nil +} + +// UpdateVolumeSnapshotStatus update VolumeSnapshot status if the condition is changed. +func (handler *csiHandler) UpdateVolumeSnapshotStatus(snapshot *crdv1.VolumeSnapshot, condition *crdv1.VolumeSnapshotCondition) (*crdv1.VolumeSnapshot, error) { + snapshotObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Get(snapshot.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error get volume snapshot %s from api server: %s", vsToVsKey(snapshot), err) + } + + oldStatus := snapshotObj.Status.DeepCopy() + status := snapshotObj.Status + isEqual := false + if oldStatus.Conditions == nil || + len(oldStatus.Conditions) == 0 || + condition.Type != oldStatus.Conditions[len(oldStatus.Conditions)-1].Type { + status.Conditions = append(status.Conditions, *condition) + } else { + oldCondition := oldStatus.Conditions[len(oldStatus.Conditions)-1] + if condition.Status == oldCondition.Status { + condition.LastTransitionTime = oldCondition.LastTransitionTime + } + status.Conditions[len(status.Conditions)-1] = *condition + isEqual = condition.Type == oldCondition.Type && + condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastTransitionTime.Equal(&oldCondition.LastTransitionTime) + } + + if !isEqual { + snapshotObj.Status = status + newSnapshotObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshots(snapshot.Namespace).Update(snapshotObj) + if err != nil { + return nil, fmt.Errorf("error update status for volume snapshot %s: %s", vsToVsKey(snapshot), err) + } + + glog.Infof("UpdateVolumeSnapshotStatus finishes %+v", newSnapshotObj) + return newSnapshotObj, nil + } + + return nil, nil +} + +// getSimplifiedSnapshotStatus get status for snapshot. +func (handler *csiHandler) GetSimplifiedSnapshotStatus(conditions []crdv1.VolumeSnapshotCondition) string { + if conditions == nil { + glog.Errorf("No conditions for this snapshot yet.") + return statusNew + } + if len(conditions) == 0 { + glog.Errorf("Empty condition.") + return statusNew + } + + //index := len(conditions) - 1 + lastCondition := conditions[len(conditions)-1] + switch lastCondition.Type { + case crdv1.VolumeSnapshotConditionReady: + if lastCondition.Status == v1.ConditionTrue { + return statusReady + } + case crdv1.VolumeSnapshotConditionError: + return statusError + case crdv1.VolumeSnapshotConditionUploading: + if lastCondition.Status == v1.ConditionTrue || + lastCondition.Status == v1.ConditionUnknown { + return statusUploading + } + } + return statusNew +} + +// getVolumeFromVolumeSnapshot is a helper function to get PV from VolumeSnapshot. +func (handler *csiHandler) getVolumeFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolume, error) { + pvc, err := handler.getClaimFromVolumeSnapshot(snapshot) + if err != nil { + return nil, err + } + + pvName := pvc.Spec.VolumeName + pv, err := handler.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to retrieve PV %s from the API server: %q", pvName, err) + } + + glog.V(5).Infof("getVolumeFromVolumeSnapshot: snapshot [%s] PV name [%s]", snapshot.Name, pvName) + + return pv, nil +} + +// getClassFromVolumeSnapshot is a helper function to get storage class from VolumeSnapshot. +func (handler *csiHandler) GetClassFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*crdv1.SnapshotClass, error) { + className := snapshot.Spec.SnapshotClassName + glog.V(5).Infof("getClassFromVolumeSnapshot [%s]: SnapshotClassName [%s]", snapshot.Name, className) + class, err := handler.clientset.VolumesnapshotV1alpha1().SnapshotClasses().Get(className, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to retrieve storage class %s from the API server: %q", className, err) + //return nil, fmt.Errorf("failed to retrieve storage class %s from the API server: %q", className, err) + } + return class, nil +} + +// getClaimFromVolumeSnapshot is a helper function to get PV from VolumeSnapshot. +func (handler *csiHandler) getClaimFromVolumeSnapshot(snapshot *crdv1.VolumeSnapshot) (*v1.PersistentVolumeClaim, error) { + pvcName := snapshot.Spec.PersistentVolumeClaimName + if pvcName == "" { + return nil, fmt.Errorf("the PVC name is not specified in snapshot %s", vsToVsKey(snapshot)) + } + + pvc, err := handler.client.CoreV1().PersistentVolumeClaims(snapshot.Namespace).Get(pvcName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to retrieve PVC %s from the API server: %q", pvcName, err) + } + if pvc.Status.Phase != v1.ClaimBound { + return nil, fmt.Errorf("the PVC %s not yet bound to a PV, will not attempt to take a snapshot yet", pvcName) + } + + return pvc, nil +} + +// getSnapshotDataFromSnapshot looks up VolumeSnapshotData from a VolumeSnapshot. +func (handler *csiHandler) getSnapshotDataFromSnapshot(vs *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshotData, error) { + snapshotDataName := vs.Spec.SnapshotDataName + if snapshotDataName == "" { + return nil, fmt.Errorf("could not find snapshot data object for %s: SnapshotDataName in snapshot spec is empty", vsToVsKey(vs)) + } + + snapshotDataObj, err := handler.clientset.VolumesnapshotV1alpha1().VolumeSnapshotDatas().Get(snapshotDataName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("Error retrieving the VolumeSnapshotData objects from API server: %v", err) + return nil, fmt.Errorf("could not get snapshot data object %s: %v", snapshotDataName, err) + } + + return snapshotDataObj, nil +} diff --git a/pkg/controller/util.go b/pkg/controller/util.go new file mode 100644 index 000000000..bd26f124e --- /dev/null +++ b/pkg/controller/util.go @@ -0,0 +1,152 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "github.com/container-storage-interface/spec/lib/go/csi/v0" + "github.com/golang/glog" + crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + "strconv" + "strings" +) + +var ( + keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + +// GetNameAndNameSpaceFromSnapshotName retrieves the namespace and +// the short name of a snapshot from its full name +func GetNameAndNameSpaceFromSnapshotName(name string) (string, string, error) { + strs := strings.Split(name, "/") + if len(strs) != 2 { + return "", "", fmt.Errorf("invalid snapshot name") + } + return strs[0], strs[1], nil +} + +func vsToVsKey(vs *crdv1.VolumeSnapshot) string { + return fmt.Sprintf("%s/%s", vs.Namespace, vs.Name) +} + +func vsrefToVsKey(vsref *v1.ObjectReference) string { + return fmt.Sprintf("%s/%s", vsref.Namespace, vsref.Name) +} + +// storeObjectUpdate updates given cache with a new object version from Informer +// callback (i.e. with events from etcd) or with an object modified by the +// controller itself. Returns "true", if the cache was updated, false if the +// object is an old version and should be ignored. +func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) { + objName, err := keyFunc(obj) + if err != nil { + return false, fmt.Errorf("Couldn't get key for object %+v: %v", obj, err) + } + oldObj, found, err := store.Get(obj) + if err != nil { + return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err) + } + + objAccessor, err := meta.Accessor(obj) + if err != nil { + return false, err + } + + if !found { + // This is a new object + glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Add(obj); err != nil { + return false, fmt.Errorf("error adding %s %q to controller cache: %v", className, objName, err) + } + return true, nil + } + + oldObjAccessor, err := meta.Accessor(oldObj) + if err != nil { + return false, err + } + + objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err) + } + oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64) + if err != nil { + return false, fmt.Errorf("error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err) + } + + // Throw away only older version, let the same version pass - we do want to + // get periodic sync events. + if oldObjResourceVersion > objResourceVersion { + glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion()) + return false, nil + } + + glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion()) + if err = store.Update(obj); err != nil { + return false, fmt.Errorf("error updating %s %q in controller cache: %v", className, objName, err) + } + return true, nil +} + +// ConvertSnapshotStatus converts snapshot status to crdv1.VolumeSnapshotCondition +func ConvertSnapshotStatus(status *csi.SnapshotStatus) crdv1.VolumeSnapshotCondition { + var snapDataCondition crdv1.VolumeSnapshotCondition + + switch status.Type { + case csi.SnapshotStatus_READY: + snapDataCondition = crdv1.VolumeSnapshotCondition{ + Type: crdv1.VolumeSnapshotConditionReady, + Status: v1.ConditionTrue, + Message: status.Details, + LastTransitionTime: metav1.Now(), + } + case csi.SnapshotStatus_ERROR_UPLOADING: + snapDataCondition = crdv1.VolumeSnapshotCondition{ + Type: crdv1.VolumeSnapshotConditionError, + Status: v1.ConditionTrue, + Message: status.Details, + LastTransitionTime: metav1.Now(), + } + case csi.SnapshotStatus_UPLOADING: + snapDataCondition = crdv1.VolumeSnapshotCondition{ + Type: crdv1.VolumeSnapshotConditionUploading, + Status: v1.ConditionTrue, + Message: status.Details, + LastTransitionTime: metav1.Now(), + } + case csi.SnapshotStatus_UNKNOWN: + snapDataCondition = crdv1.VolumeSnapshotCondition{ + Type: crdv1.VolumeSnapshotConditionCreating, + Status: v1.ConditionUnknown, + Message: status.Details, + LastTransitionTime: metav1.Now(), + } + } + + return snapDataCondition +} + +// getSnapshotDataNameForSnapshot returns SnapshotData.Name for the create VolumeSnapshotData. +// The name must be unique. +func GetSnapshotDataNameForSnapshot(snapshot *crdv1.VolumeSnapshot) string { + return "snapdata-" + string(snapshot.UID) +}