Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snapshot Controller #7

Merged
merged 30 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2663b13
Add Snapshot Controller
xing-yang Jul 12, 2018
8a08d42
Handle Secrets in CreateSnapshot
xing-yang Aug 9, 2018
c227372
Add VolumeSnapshotClass to VolumeSnapshotContent
xing-yang Aug 13, 2018
da5647a
Add generated file
xing-yang Aug 13, 2018
2c3b68f
Handle Secrets in DeleteSnapshot
xing-yang Aug 13, 2018
337564a
Address review comments in the APIs.
xing-yang Aug 15, 2018
afd80c5
Add review comments in cmd and controller
xing-yang Aug 15, 2018
ce56c87
Merge pull request #9 from xing-yang/snapshot_controller_snapclass
xing-yang Aug 15, 2018
1ee6dd2
Address review comments in controller
xing-yang Aug 16, 2018
faf16a6
Change ResourceList to int64 in API
xing-yang Aug 16, 2018
3e12fd6
Update generated deepcopy file
xing-yang Aug 16, 2018
870fd8e
Handle snapshot error, get default storage class, and other small
jingxu97 Aug 17, 2018
bfb7c69
Set VolumeSnapshotClass in checkandBindSnapshotContent
xing-yang Aug 18, 2018
9eb5892
Use resource.Quantity for Size in API
xing-yang Aug 19, 2018
db9e975
Add generated deepcopy file
xing-yang Aug 19, 2018
fb866ef
Use CreationName in CSIVolumeSnapshotSource in controller
xing-yang Aug 19, 2018
84fc75e
Modify controller to use resource.Quantity as size
xing-yang Aug 20, 2018
233b717
Rename Size to RestoreSize
xing-yang Aug 22, 2018
66b982d
Re-generate deepcopy file
xing-yang Aug 22, 2018
61c67ae
Change Size to RestoreSize in snapshot controller
xing-yang Aug 22, 2018
d95ff46
Split GetClassFromVolumeSnapshot to two functions
xing-yang Aug 22, 2018
7baa5bf
Change VolumeSnapshotClassName to pointer to a string
xing-yang Aug 23, 2018
1893402
Re-generate deepcopy file after VolumeSnapshotClass change
xing-yang Aug 23, 2018
7140b77
Change controller to use VolumeSnapshotClassName as pointer
xing-yang Aug 23, 2018
de25b16
Fix a typo
xing-yang Aug 23, 2018
a9dd5c8
Address review comments
xing-yang Aug 23, 2018
4f5aec4
Merge pull request #11 from xing-yang/fixes
xing-yang Aug 23, 2018
25be5fd
shouldProcessSnapshot should return *VolumeSnapshot
xing-yang Aug 24, 2018
9f3146b
Fix error when checking and updating snapshotclass
xing-yang Aug 24, 2018
17c7e1b
Allow new discovered error to show up
xing-yang Aug 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cmd/csi-snapshotter/create_crd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"reflect"

"github.com/golang/glog"
crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1alpha1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
)

const (
// SnapshotPVCAnnotation is "snapshot.alpha.kubernetes.io/snapshot"
SnapshotPVCAnnotation = "volumesnapshot.csi.k8s.io/snapshot"
)

// NewClient creates a new RESTClient
func NewClient(cfg *rest.Config) (*rest.RESTClient, *runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := crdv1.AddToScheme(scheme); err != nil {
return nil, nil, err
}

config := *cfg
config.GroupVersion = &crdv1.SchemeGroupVersion
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}

client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, nil, err
}

return client, scheme, nil
}

// CreateCRD creates CustomResourceDefinition
func CreateCRD(clientset apiextensionsclient.Interface) error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotClassResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotClassResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotClass{}).Name(),
},
},
}
res, err := clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)

if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}

crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotContentResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.ClusterScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotContentResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshotContent{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)

if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotContentResource: %#v, err: %#v",
res, err)
}

crd = &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdv1.VolumeSnapshotResourcePlural + "." + crdv1.GroupName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: crdv1.GroupName,
Version: crdv1.SchemeGroupVersion.Version,
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: crdv1.VolumeSnapshotResourcePlural,
Kind: reflect.TypeOf(crdv1.VolumeSnapshot{}).Name(),
},
},
}
res, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)

if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Fatalf("failed to create VolumeSnapshotResource: %#v, err: %#v",
res, err)
}

return nil
}
197 changes: 195 additions & 2 deletions cmd/csi-snapshotter/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,200 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import "fmt"
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"time"

"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/kubernetes-csi/external-snapshotter/pkg/connection"
"github.com/kubernetes-csi/external-snapshotter/pkg/controller"

clientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
)

const (
// Number of worker threads
threads = 10

// Default timeout of short CSI calls like GetPluginInfo
csiTimeout = time.Second
)

// Command line flags
var (
snapshotter = flag.String("snapshotter", "", "Name of the snapshotter. The snapshotter will only create snapshot content for snapshot that requests a VolumeSnapshotClass with a snapshotter field set equal to this name.")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
connectionTimeout = flag.Duration("connection-timeout", 1*time.Minute, "Timeout for waiting for CSI driver socket.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
)

func main() {
fmt.Println("vim-go")
flag.Set("logtostderr", "true")
flag.Parse()

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

snapClient, err := clientset.NewForConfig(config)
if err != nil {
glog.Errorf("Error building snapshot clientset: %s", err.Error())
os.Exit(1)
}

factory := informers.NewSharedInformerFactory(snapClient, *resyncPeriod)

// Create CRD resource
aeclientset, err := apiextensionsclient.NewForConfig(config)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

// initialize CRD resource if it does not exist
err = CreateCRD(aeclientset)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

// Connect to CSI.
csiConn, err := connection.New(*csiAddress, *connectionTimeout)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}

// Pass a context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

// Find driver name
if *snapshotter == "" {
*snapshotter, err = csiConn.GetDriverName(ctx)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
}
glog.V(2).Infof("CSI driver name: %q", *snapshotter)

// Check it's ready
if err = waitForDriverReady(csiConn, *connectionTimeout); err != nil {
glog.Error(err.Error())
os.Exit(1)
}

// Find out if the driver supports create/delete snapshot.
supportsCreateSnapshot, err := csiConn.SupportsControllerCreateSnapshot(ctx)
if err != nil {
glog.Error(err.Error())
os.Exit(1)
}
if !supportsCreateSnapshot {
glog.Errorf("CSI driver %s does not support ControllerCreateSnapshot", *snapshotter)
os.Exit(1)
}

if len(*snapshotNamePrefix) == 0 {
glog.Error("Snapshot name prefix cannot be of length 0")
os.Exit(1)
}

glog.V(2).Infof("Start NewCSISnapshotController with snapshotter [%s] kubeconfig [%s] connectionTimeout [%+v] csiAddress [%s] createSnapshotContentRetryCount [%d] createSnapshotContentInterval [%+v] resyncPeriod [%+v] snapshotNamePrefix [%s] snapshotNameUUIDLength [%d]", *snapshotter, *kubeconfig, *connectionTimeout, *csiAddress, createSnapshotContentRetryCount, *createSnapshotContentInterval, *resyncPeriod, *snapshotNamePrefix, snapshotNameUUIDLength)

ctrl := controller.NewCSISnapshotController(
snapClient,
kubeClient,
*snapshotter,
factory.Volumesnapshot().V1alpha1().VolumeSnapshots(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotContents(),
factory.Volumesnapshot().V1alpha1().VolumeSnapshotClasses(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
csiConn,
*connectionTimeout,
*resyncPeriod,
*snapshotNamePrefix,
*snapshotNameUUIDLength,
)

// run...
stopCh := make(chan struct{})
factory.Start(stopCh)
go ctrl.Run(threads, stopCh)

// ...until SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
close(stopCh)
}

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
return rest.InClusterConfig()
}

func waitForDriverReady(csiConn connection.CSIConnection, timeout time.Duration) error {
now := time.Now()
finish := now.Add(timeout)
var err error
for {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
err = csiConn.Probe(ctx)
if err == nil {
glog.V(2).Infof("Probe succeeded")
return nil
}
glog.V(2).Infof("Probe failed with %s", err)

now := time.Now()
if now.After(finish) {
return fmt.Errorf("failed to probe the controller: %s", err)
}
time.Sleep(time.Second)
}
}
Loading