diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 16407b9506..be40d03c04 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -115,6 +116,9 @@ var ( // ErrAlreadyExists is returned when a resource is already existent. ErrAlreadyExists = errors.New("Resource already exists") + // ErrVolumeInUse is returned when a volume is already attached to an instance. + ErrVolumeInUse = errors.New("Request volume is already attached to an instance") + // ErrMultiSnapshots is returned when multiple snapshots are found // with the same ID ErrMultiSnapshots = errors.New("Multiple snapshots with the same name found") @@ -133,6 +137,7 @@ type Disk struct { AvailabilityZone string SnapshotID string OutpostArn string + Attachments []string } // DiskOptions represents parameters to create an EBS volume @@ -378,7 +383,7 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == "VolumeInUse" { - return "", ErrAlreadyExists + return "", ErrVolumeInUse } } return "", fmt.Errorf("could not attach volume %q to node %q: %v", volumeID, nodeID, err) @@ -396,7 +401,9 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string // TODO: Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint // It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call, // which could theoretically be against a different device (or even instance). - + // TODO: Check volume capability matches for ALREADY_EXISTS + // This could happen when request volume already attached to request node, + // but is incompatible with the specified volume_capability or readonly flag return device.Path, nil } @@ -521,6 +528,7 @@ func (c *cloud) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) } volume, err := c.getVolume(ctx, request) + if err != nil { return nil, err } @@ -530,6 +538,7 @@ func (c *cloud) GetDiskByID(ctx context.Context, volumeID string) (*Disk, error) CapacityGiB: aws.Int64Value(volume.Size), AvailabilityZone: aws.StringValue(volume.AvailabilityZone), OutpostArn: aws.StringValue(volume.OutpostArn), + Attachments: getVolumeAttachmentsList(volume), }, nil } @@ -1044,3 +1053,14 @@ func volumeModificationDone(state string) bool { } return false } + +func getVolumeAttachmentsList(volume *ec2.Volume) []string { + var volumeAttachmentList []string + for _, attachment := range volume.Attachments { + if attachment.State != nil && strings.ToLower(aws.StringValue(attachment.State)) == "attached" { + volumeAttachmentList = append(volumeAttachmentList, aws.StringValue(attachment.InstanceId)) + } + } + + return volumeAttachmentList +} diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 225ab76c4f..e13bbbdb5a 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -525,12 +525,14 @@ func TestGetDiskByID(t *testing.T) { volumeID string availabilityZone string outpostArn string + attachments *ec2.VolumeAttachment expErr error }{ { name: "success: normal", volumeID: "vol-test-1234", availabilityZone: expZone, + attachments: &ec2.VolumeAttachment{}, expErr: nil, }, { @@ -538,8 +540,19 @@ func TestGetDiskByID(t *testing.T) { volumeID: "vol-test-1234", availabilityZone: expZone, outpostArn: "arn:aws:outposts:us-west-2:111111111111:outpost/op-0aaa000a0aaaa00a0", + attachments: &ec2.VolumeAttachment{}, expErr: nil, }, + { + name: "success: attached instance list", + volumeID: "vol-test-1234", + availabilityZone: expZone, + outpostArn: "arn:aws:outposts:us-west-2:111111111111:outpost/op-0aaa000a0aaaa00a0", + attachments: &ec2.VolumeAttachment{ + InstanceId: aws.String("test-instance"), + State: aws.String("attached")}, + expErr: nil, + }, { name: "fail: DescribeVolumes returned generic error", volumeID: "vol-test-1234", @@ -561,6 +574,7 @@ func TestGetDiskByID(t *testing.T) { VolumeId: aws.String(tc.volumeID), AvailabilityZone: aws.String(tc.availabilityZone), OutpostArn: aws.String(tc.outpostArn), + Attachments: []*ec2.VolumeAttachment{tc.attachments}, }, }, }, @@ -585,6 +599,9 @@ func TestGetDiskByID(t *testing.T) { if disk.OutpostArn != tc.outpostArn { t.Fatalf("GetDisk() failed: expected outpostArn %q, got %q", tc.outpostArn, disk.OutpostArn) } + if len(disk.Attachments) > 0 && disk.Attachments[0] != aws.StringValue(tc.attachments.InstanceId) { + t.Fatalf("GetDisk() failed: expected attachment instance %q, got %q", aws.StringValue(tc.attachments.InstanceId), disk.Attachments[0]) + } } mockCtrl.Finish() diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index de2d362958..f0aff469b4 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -286,19 +286,21 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs if !d.cloud.IsExistInstance(ctx, nodeID) { return nil, status.Errorf(codes.NotFound, "Instance %q not found", nodeID) } - - if _, err := d.cloud.GetDiskByID(ctx, volumeID); err != nil { + disk, err := d.cloud.GetDiskByID(ctx, volumeID) + if err != nil { if err == cloud.ErrNotFound { return nil, status.Error(codes.NotFound, "Volume not found") } return nil, status.Errorf(codes.Internal, "Could not get volume with ID %q: %v", volumeID, err) } + // If given volumeId already assigned to given node, will directly return current device path devicePath, err := d.cloud.AttachDisk(ctx, volumeID, nodeID) if err != nil { - if err == cloud.ErrAlreadyExists { - return nil, status.Error(codes.AlreadyExists, err.Error()) + if err == cloud.ErrVolumeInUse { + return nil, status.Error(codes.FailedPrecondition, strings.Join(disk.Attachments, ",")) } + // TODO: Check volume capability matches for ALREADY_EXISTS return nil, status.Errorf(codes.Internal, "Could not attach volume %q to node %q: %v", volumeID, nodeID, err) } klog.V(5).Infof("ControllerPublishVolume: volume %s attached to node %s through device %s", volumeID, nodeID, devicePath) diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index a3a1ead970..aa16c7cdcd 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -2669,8 +2669,12 @@ func TestControllerPublishVolume(t *testing.T) { }, }, { - name: "fail attach disk with already exists error", + name: "fail attach disk with volume already in use error", testFunc: func(t *testing.T) { + attachedInstancId := "test-instance-id-attached" + disk := &cloud.Disk{ + Attachments: []string{attachedInstancId}, + } req := &csi.ControllerPublishVolumeRequest{ VolumeId: "does-not-exist", NodeId: expInstanceID, @@ -2684,8 +2688,8 @@ func TestControllerPublishVolume(t *testing.T) { mockCloud := mocks.NewMockCloud(mockCtl) mockCloud.EXPECT().IsExistInstance(gomock.Eq(ctx), gomock.Eq(req.NodeId)).Return(true) - mockCloud.EXPECT().GetDiskByID(gomock.Eq(ctx), gomock.Any()).Return(&cloud.Disk{}, nil) - mockCloud.EXPECT().AttachDisk(gomock.Eq(ctx), gomock.Any(), gomock.Eq(req.NodeId)).Return("", cloud.ErrAlreadyExists) + mockCloud.EXPECT().GetDiskByID(gomock.Eq(ctx), gomock.Any()).Return(disk, nil) + mockCloud.EXPECT().AttachDisk(gomock.Eq(ctx), gomock.Any(), gomock.Eq(req.NodeId)).Return("", cloud.ErrVolumeInUse) awsDriver := controllerService{ cloud: mockCloud, @@ -2697,8 +2701,11 @@ func TestControllerPublishVolume(t *testing.T) { if !ok { t.Fatalf("Could not get error status code from error: %v", srvErr) } - if srvErr.Code() != codes.AlreadyExists { - t.Fatalf("Expected error code %d, got %d message %s", codes.AlreadyExists, srvErr.Code(), srvErr.Message()) + if srvErr.Code() != codes.FailedPrecondition { + t.Fatalf("Expected error code %d, got %d message %s", codes.FailedPrecondition, srvErr.Code(), srvErr.Message()) + } + if srvErr.Message() != attachedInstancId { + t.Fatalf("Expected error message to contain previous attached instanceId %s, but get error message %s", attachedInstancId, srvErr.Message()) } } else { t.Fatalf("Expected error %v, got no error", codes.AlreadyExists)