Skip to content

Commit

Permalink
Add dynamic provisioning support for CSI Migration
Browse files Browse the repository at this point in the history
Signed-off-by: Deep Debroy <ddebroy@docker.com>
  • Loading branch information
ddebroy committed Mar 15, 2019
1 parent a3aa646 commit ae0c72e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 39 deletions.
29 changes: 23 additions & 6 deletions cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
csitranslationlib "k8s.io/csi-translation-lib"

utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflag "k8s.io/apiserver/pkg/util/flag"
Expand Down Expand Up @@ -155,19 +156,35 @@ func init() {
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName

provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
}

handlesMigrationFromInTreePlugin := false
handlesMigrationFromInTreePluginName := ""
if csitranslationlib.IsMigratedCSIDriverByName(provisionerName) {
handlesMigrationFromInTreePluginName, err = csitranslationlib.GetInTreeNameFromCSIName(provisionerName)
klog.V(2).Infof("Perform CSI migration for %s to %s", provisionerName, handlesMigrationFromInTreePluginName)
if err != nil {
klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %s", provisionerName, err)
}
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{handlesMigrationFromInTreePluginName}))
handlesMigrationFromInTreePlugin = true
}

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, csiAPIClient, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, handlesMigrationFromInTreePlugin, handlesMigrationFromInTreePluginName, provisionerName)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
csiProvisioner,
serverVersion.GitVersion,
controller.LeaderElection(*enableLeaderElection),
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)),
controller.Threadiness(int(*workerThreads)),
provisionerOptions...,
)
}

Expand Down
88 changes: 60 additions & 28 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
csitranslationlib "k8s.io/csi-translation-lib"
"k8s.io/klog"

"google.golang.org/grpc"
Expand Down Expand Up @@ -143,17 +144,19 @@ var (

// CSIProvisioner struct
type csiProvisioner struct {
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
client kubernetes.Interface
csiClient csi.ControllerClient
csiAPIClient csiclientset.Interface
grpcClient *grpc.ClientConn
snapshotClient snapclientset.Interface
timeout time.Duration
identity string
volumeNamePrefix string
volumeNameUUIDLength int
config *rest.Config
driverName string
handlesMigrationFromInTreePlugin bool
handlesMigrationFromInTreePluginName string
}

const (
Expand Down Expand Up @@ -238,20 +241,24 @@ func NewCSIProvisioner(client kubernetes.Interface,
volumeNameUUIDLength int,
grpcClient *grpc.ClientConn,
snapshotClient snapclientset.Interface,
handlesMigrationFromInTreePlugin bool,
handlesMigrationFromInTreePluginName string,
driverName string) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
driverName: driverName,
client: client,
grpcClient: grpcClient,
csiClient: csiClient,
csiAPIClient: csiAPIClient,
snapshotClient: snapshotClient,
timeout: connectionTimeout,
identity: identity,
volumeNamePrefix: volumeNamePrefix,
volumeNameUUIDLength: volumeNameUUIDLength,
handlesMigrationFromInTreePlugin: handlesMigrationFromInTreePlugin,
handlesMigrationFromInTreePluginName: handlesMigrationFromInTreePluginName,
driverName: driverName,
}
return provisioner
}
Expand Down Expand Up @@ -362,6 +369,24 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
return nil, fmt.Errorf("claim Selector is not supported")
}

createVolumeRequestParameters := options.Parameters
performInTreeTranslation := false
if p.handlesMigrationFromInTreePlugin {
storageClassName := options.PVC.Spec.StorageClassName
storageClass, err := p.client.StorageV1().StorageClasses().Get(*storageClassName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get storage class named %s: %v", *storageClassName, err)
}
if storageClass.Provisioner == p.handlesMigrationFromInTreePluginName {
klog.V(2).Infof("Perform CSI migration for intree plugin %s", storageClass.Provisioner)
createVolumeRequestParameters, err = csitranslationlib.TranslateInTreeStorageClassParametersToCSI(p.handlesMigrationFromInTreePluginName, options.Parameters)
if err != nil {
return nil, fmt.Errorf("failed to translate storage class parameters: %v", err)
}
performInTreeTranslation = true
}
}

var needSnapshotSupport bool
if options.PVC.Spec.DataSource != nil {
// PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object
Expand All @@ -388,7 +413,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

fsTypesFound := 0
fsType := ""
for k, v := range options.Parameters {
for k, v := range createVolumeRequestParameters {
if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey {
fsType = v
fsTypesFound++
Expand Down Expand Up @@ -416,7 +441,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
// Create a CSI CreateVolumeRequest and Response
req := csi.CreateVolumeRequest{
Name: pvName,
Parameters: options.Parameters,
Parameters: createVolumeRequestParameters,
VolumeCapabilities: volumeCaps,
CapacityRange: &csi.CapacityRange{
RequiredBytes: int64(volSizeBytes),
Expand Down Expand Up @@ -452,7 +477,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis

// Resolve provision secret credentials.
// No PVC is provided when resolving provision/delete secret names, since the PVC may or may not exist at delete time.
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.Parameters, pvName, nil)
provisionerSecretRef, err := getSecretReference(provisionerSecretParams, createVolumeRequestParameters, pvName, nil)
if err != nil {
return nil, err
}
Expand All @@ -463,20 +488,20 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
req.Secrets = provisionerCredentials

// Resolve controller publish, node stage, node publish secret references
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.Parameters, pvName, options.PVC)
controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.Parameters, pvName, options.PVC)
nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.Parameters, pvName, options.PVC)
nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, createVolumeRequestParameters, pvName, options.PVC)
if err != nil {
return nil, err
}

req.Parameters, err = removePrefixedParameters(options.Parameters)
req.Parameters, err = removePrefixedParameters(createVolumeRequestParameters)
if err != nil {
return nil, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err)
}
Expand Down Expand Up @@ -551,6 +576,13 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
pv.Spec.PersistentVolumeSource.CSI.FSType = fsType
}

if performInTreeTranslation {
pv, err = csitranslationlib.TranslateCSIPVToInTree(pv)
if err != nil {
return nil, err
}
}

klog.Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource)

return pv, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer mockController.Finish()
defer driver.Stop()

csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(nil, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName)

// Requested PVC with requestedBytes storage
opts := controller.VolumeOptions{
Expand Down Expand Up @@ -1402,7 +1402,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
clientSet = fakeclientset.NewSimpleClientset()
}

csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1759,7 +1759,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
return true, content, nil
})

csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, false, "", driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1857,7 +1857,7 @@ func TestProvisionWithTopology(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1898,7 +1898,7 @@ func TestProvisionWithMountOptions(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
csiClientSet := fakecsiclientset.NewSimpleClientset()
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName)
csiProvisioner := NewCSIProvisioner(clientSet, csiClientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, false, "", driverName)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down

0 comments on commit ae0c72e

Please sign in to comment.