Skip to content

Commit

Permalink
Re-use Access point.
Browse files Browse the repository at this point in the history
  • Loading branch information
mskanth972 committed Jun 22, 2023
1 parent e914558 commit d09324f
Show file tree
Hide file tree
Showing 6 changed files with 442 additions and 58 deletions.
61 changes: 57 additions & 4 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
)

const (
AccessDeniedException = "AccessDeniedException"
AccessDeniedException = "AccessDeniedException"
AccessPointAlreadyExists = "AccessPointAlreadyExists"
PvcNameTagKey = "pvcName"
)

var (
Expand Down Expand Up @@ -88,7 +90,7 @@ type Efs interface {

type Cloud interface {
GetMetadata() MetadataService
CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error)
CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, usePvcName bool) (accessPoint *AccessPoint, err error)
DeleteAccessPoint(ctx context.Context, accessPointId string) (err error)
DescribeAccessPoint(ctx context.Context, accessPointId string) (accessPoint *AccessPoint, err error)
DescribeFileSystem(ctx context.Context, fileSystemId string) (fs *FileSystem, err error)
Expand Down Expand Up @@ -154,10 +156,28 @@ func (c *cloud) GetMetadata() MetadataService {
return c.metadata
}

func (c *cloud) CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
func (c *cloud) CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, usePvcName bool) (accessPoint *AccessPoint, err error) {
efsTags := parseEfsTags(accessPointOpts.Tags)

//if usePvcName is true, check for AP with same Root Directory exists in efs
// if found reuse that AP
if usePvcName {
existingAP, err := c.findAccessPointByClientToken(ctx, clientToken, accessPointOpts)
if err != nil {
return nil, fmt.Errorf("failed to find access point: %v", err)
}
if existingAP != nil {
//AP path already exists
klog.V(2).Infof("Existing AccessPoint found : %+v", existingAP)
return &AccessPoint{
AccessPointId: existingAP.AccessPointId,
FileSystemId: existingAP.FileSystemId,
CapacityGiB: accessPointOpts.CapacityGiB,
}, nil
}
}
createAPInput := &efs.CreateAccessPointInput{
ClientToken: &volumeName,
ClientToken: &clientToken,
FileSystemId: &accessPointOpts.FileSystemId,
PosixUser: &efs.PosixUser{
Gid: &accessPointOpts.Gid,
Expand All @@ -182,6 +202,7 @@ func (c *cloud) CreateAccessPoint(ctx context.Context, volumeName string, access
}
return nil, fmt.Errorf("Failed to create access point: %v", err)
}
klog.V(5).Infof("Create AP response : %+v", res)

return &AccessPoint{
AccessPointId: *res.AccessPointId,
Expand Down Expand Up @@ -233,6 +254,38 @@ func (c *cloud) DescribeAccessPoint(ctx context.Context, accessPointId string) (
}, nil
}

func (c *cloud) findAccessPointByClientToken(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
klog.V(5).Infof("AccessPointOptions to find AP : %+v", accessPointOpts)
klog.V(2).Infof("Client to find AP : %s", clientToken)
describeAPInput := &efs.DescribeAccessPointsInput{
FileSystemId: &accessPointOpts.FileSystemId,
}
res, err := c.efs.DescribeAccessPointsWithContext(ctx, describeAPInput)
if err != nil {
if isAccessDenied(err) {
return
}
if isFileSystemNotFound(err) {
return
}
err = fmt.Errorf("failed to list Access Points of efs = %s : %v", accessPointOpts.FileSystemId, err)
return
}
for _, ap := range res.AccessPoints {
// check if AP exists with same client token
klog.V(5).Infof("ClientToken found : %s", aws.StringValue(ap.ClientToken))
if aws.StringValue(ap.ClientToken) == clientToken {
return &AccessPoint{
AccessPointId: *ap.AccessPointId,
FileSystemId: *ap.FileSystemId,
AccessPointRootDir: *ap.RootDirectory.Path,
}, nil
}
}
klog.V(2).Infof("Access point does not exist")
return nil, nil
}

func (c *cloud) DescribeFileSystem(ctx context.Context, fileSystemId string) (fs *FileSystem, err error) {
describeFsInput := &efs.DescribeFileSystemsInput{FileSystemId: &fileSystemId}
klog.V(5).Infof("Calling DescribeFileSystems with input: %+v", *describeFsInput)
Expand Down
133 changes: 129 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloud
import (
"context"
"errors"
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -27,13 +28,14 @@ func TestCreateAccessPoint(t *testing.T) {
directoryPerms = "0777"
directoryPath = "/test"
volName = "volName"
clientToken = volName
)
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{
name: "Success",
name: "Success - AP does not exist",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
mockEfs := mocks.NewMockEfs(mockCtl)
Expand Down Expand Up @@ -72,9 +74,63 @@ func TestCreateAccessPoint(t *testing.T) {
},
}

describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: nil,
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(output, nil)
res, err := c.CreateAccessPoint(ctx, volName, req)
res, err := c.CreateAccessPoint(ctx, clientToken, req, true)

if err != nil {
t.Fatalf("CreateAccessPointFailed is failed: %v", err)
}

if res == nil {
t.Fatal("Result is nil")
}

if accessPointId != res.AccessPointId {
t.Fatalf("AccessPointId mismatched. Expected: %v, Actual: %v", accessPointId, res.AccessPointId)
}

if fsId != res.FileSystemId {
t.Fatalf("FileSystemId mismatched. Expected: %v, Actual: %v", fsId, res.FileSystemId)
}
mockCtl.Finish()
},
},
{
name: "Success - AP already exists",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
mockEfs := mocks.NewMockEfs(mockCtl)
c := &cloud{
efs: mockEfs,
}

tags := make(map[string]string)
tags["cluster"] = "efs"

req := &AccessPointOptions{
FileSystemId: fsId,
Uid: uid,
Gid: gid,
DirectoryPerms: directoryPerms,
DirectoryPath: directoryPath,
Tags: tags,
}

describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{
{AccessPointId: aws.String(accessPointId), FileSystemId: aws.String(fsId), ClientToken: aws.String(clientToken), RootDirectory: &efs.RootDirectory{Path: aws.String(directoryPath)}, Tags: []*efs.Tag{{Key: aws.String(PvcNameTagKey), Value: aws.String(volName)}}},
},
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
res, err := c.CreateAccessPoint(ctx, clientToken, req, true)

if err != nil {
t.Fatalf("CreateAccessPointFailed is failed: %v", err)
Expand Down Expand Up @@ -108,10 +164,14 @@ func TestCreateAccessPoint(t *testing.T) {
DirectoryPerms: directoryPerms,
DirectoryPath: directoryPath,
}
describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: nil,
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(nil, errors.New("CreateAccessPointWithContext failed"))
_, err := c.CreateAccessPoint(ctx, volName, req)
_, err := c.CreateAccessPoint(ctx, clientToken, req, true)
if err == nil {
t.Fatalf("CreateAccessPoint did not fail")
}
Expand All @@ -135,7 +195,7 @@ func TestCreateAccessPoint(t *testing.T) {

ctx := context.Background()
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(nil, awserr.New(AccessDeniedException, "Access Denied", errors.New("Access Denied")))
_, err := c.CreateAccessPoint(ctx, volName, req)
_, err := c.CreateAccessPoint(ctx, clientToken, req, false)
if err == nil {
t.Fatalf("CreateAccessPoint did not fail")
}
Expand Down Expand Up @@ -743,3 +803,68 @@ func testResult(t *testing.T, funcName string, ret interface{}, err error, expec
}
}
}

func Test_findAccessPointByPath(t *testing.T) {
fsId := "testFsId"
clientToken := "testPvcName"
dirPath := "testPath"
diffClientToken := aws.String("diff")

mockctl := gomock.NewController(t)
defer mockctl.Finish()
mockEfs := mocks.NewMockEfs(mockctl)

expectedSingleAP := &AccessPoint{
AccessPointId: "testApId",
AccessPointRootDir: dirPath,
FileSystemId: fsId,
}

type args struct {
clientToken string
accessPointOpts *AccessPointOptions
}
tests := []struct {
name string
args args
prepare func(*mocks.MockEfs)
wantAccessPoint *AccessPoint
wantErr bool
}{
{name: "Expected_ClientToken_Not_Found", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(&efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{{FileSystemId: aws.String(fsId), ClientToken: diffClientToken, AccessPointId: aws.String(expectedSingleAP.AccessPointId), RootDirectory: &efs.RootDirectory{Path: aws.String("differentPath")}}},
}, nil)
}, wantAccessPoint: nil, wantErr: false},
{name: "Expected_Path_Found_In_Multiple_APs_And_One_AP_Filtered_By_ClientToken", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(&efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{
{FileSystemId: aws.String(fsId), ClientToken: diffClientToken, AccessPointId: aws.String("differentApId"), RootDirectory: &efs.RootDirectory{Path: aws.String(expectedSingleAP.AccessPointRootDir)}},
{FileSystemId: aws.String(fsId), ClientToken: &clientToken, AccessPointId: aws.String(expectedSingleAP.AccessPointId), RootDirectory: &efs.RootDirectory{Path: aws.String(expectedSingleAP.AccessPointRootDir)}},
},
}, nil)
}, wantAccessPoint: expectedSingleAP, wantErr: false},
{name: "Fail_DescribeAccessPoints", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("access_denied"))
}, wantAccessPoint: nil, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &cloud{efs: mockEfs}
ctx := context.Background()

if tt.prepare != nil {
tt.prepare(mockEfs)
}

gotAccessPoint, err := c.findAccessPointByClientToken(ctx, tt.args.clientToken, tt.args.accessPointOpts)
if (err != nil) != tt.wantErr {
t.Errorf("findAccessPointByClientToken() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotAccessPoint, tt.wantAccessPoint) {
t.Errorf("findAccessPointByClientToken() gotAccessPoint = %v, want %v", gotAccessPoint, tt.wantAccessPoint)
}
})
}
}
6 changes: 3 additions & 3 deletions pkg/cloud/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (c *FakeCloudProvider) GetMetadata() MetadataService {
return c.m
}

func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
ap, exists := c.accessPoints[volumeName]
func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, usePvcName bool) (accessPoint *AccessPoint, err error) {
ap, exists := c.accessPoints[clientToken]
if exists {
if accessPointOpts.CapacityGiB == ap.CapacityGiB {
return ap, nil
Expand All @@ -45,7 +45,7 @@ func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, volumeName st
CapacityGiB: accessPointOpts.CapacityGiB,
}

c.accessPoints[volumeName] = ap
c.accessPoints[clientToken] = ap
return ap, nil
}

Expand Down
24 changes: 21 additions & 3 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
RoleArn = "awsRoleArn"
TempMountPathPrefix = "/var/lib/csi/pv"
Uid = "uid"
UsePvcNameKey = "usePvcName"
PvcNameKey = "csi.storage.k8s.io/pvc/name"
)

var (
Expand All @@ -59,7 +61,25 @@ var (

func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(4).Infof("CreateVolume: called with args %+v", *req)

var usePvcName bool
var err error
volumeParams := req.GetParameters()
volName := req.GetName()
clientToken := volName

// if true, then use pvcName as clientToken instead of PVC Id
// This allows users to reconnect to the same AP from different k8s cluster
if usePvcNameStr, ok := volumeParams[UsePvcNameKey]; ok {
usePvcName, err = strconv.ParseBool(usePvcNameStr)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "Invalid value for usePvcName parameter")
}
if usePvcName {
givenPvcName := volumeParams[PvcNameKey]
clientToken = givenPvcName[:64]
}
}
if volName == "" {
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
}
Expand All @@ -83,7 +103,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
var (
azName string
basePath string
err error
gid int
gidMin int
gidMax int
Expand All @@ -94,7 +113,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
)

//Parse parameters
volumeParams := req.GetParameters()
if value, ok := volumeParams[ProvisioningMode]; ok {
provisioningMode = value
//TODO: Add FS provisioning mode check when implemented
Expand Down Expand Up @@ -243,7 +261,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
accessPointsOptions.Gid = int64(gid)
accessPointsOptions.DirectoryPath = rootDir

accessPointId, err := localCloud.CreateAccessPoint(ctx, volName, accessPointsOptions)
accessPointId, err := localCloud.CreateAccessPoint(ctx, clientToken, accessPointsOptions, usePvcName)
if err != nil {
if allocatedGid != 0 {
d.gidAllocator.releaseGid(accessPointsOptions.FileSystemId, gid)
Expand Down
Loading

0 comments on commit d09324f

Please sign in to comment.