Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ec2): Call describe-instance API concurrently #2047

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
go.uber.org/zap v1.26.0
golang.org/x/sync v0.7.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/api v0.28.3
k8s.io/apiextensions-apiserver v0.28.3
Expand Down Expand Up @@ -125,7 +126,6 @@ require (
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
131 changes: 80 additions & 51 deletions pkg/controller/ec2/instance/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -129,61 +130,14 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E
}, nil
}

response, err := e.client.DescribeInstances(ctx,
&awsec2.DescribeInstancesInput{
InstanceIds: []string{meta.GetExternalName(cr)},
})

// deleted instances that have not yet been cleaned up from the cluster return a
// 200 OK with a nil response.Reservations slice
if err == nil && len(response.Reservations) == 0 {
return managed.ExternalObservation{}, nil
}

if err != nil {
return managed.ExternalObservation{},
errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, err), errDescribe)
}

// in a successful response, there should be one and only one object
if len(response.Reservations[0].Instances) != 1 {
return managed.ExternalObservation{}, errors.New(errMultipleItems)
instancePtr, o, err := e.describeInstance(ctx, meta.GetExternalName(cr))
if err != nil || instancePtr == nil {
return managed.ExternalObservation{}, err
}

observed := response.Reservations[0].Instances[0]
observed := *instancePtr

// update the CRD spec for any new values from provider
current := cr.Spec.ForProvider.DeepCopy()

o := awsec2.DescribeInstanceAttributeOutput{}

for _, input := range []types.InstanceAttributeName{
types.InstanceAttributeNameDisableApiTermination,
types.InstanceAttributeNameInstanceInitiatedShutdownBehavior,
types.InstanceAttributeNameUserData,
} {
r, err := e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{
InstanceId: aws.String(meta.GetExternalName(cr)),
Attribute: input,
})

if err != nil {
return managed.ExternalObservation{}, errorutils.Wrap(err, errDescribe)
}

if r.DisableApiTermination != nil {
o.DisableApiTermination = r.DisableApiTermination
}

if r.InstanceInitiatedShutdownBehavior != nil {
o.InstanceInitiatedShutdownBehavior = r.InstanceInitiatedShutdownBehavior
}

if r.UserData != nil {
o.UserData = r.UserData
}
}

ec2.LateInitializeInstance(&cr.Spec.ForProvider, &observed, &o)

if !cmp.Equal(current, &cr.Spec.ForProvider) {
Expand Down Expand Up @@ -221,6 +175,81 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E
}, nil
}

func (e *external) describeInstance(ctx context.Context, instanceId string) (
*types.Instance,
awsec2.DescribeInstanceAttributeOutput,
error,
) {
eg := errgroup.Group{}

var describeOutput *awsec2.DescribeInstancesOutput
var describeError error
eg.Go(func() error {
describeOutput, describeError = e.client.DescribeInstances(ctx, &awsec2.DescribeInstancesInput{
InstanceIds: []string{instanceId},
})
return nil
})

attrs := awsec2.DescribeInstanceAttributeOutput{}
descAttr := func(attr types.InstanceAttributeName) (*awsec2.DescribeInstanceAttributeOutput, error) {
return e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{
InstanceId: &instanceId,
Attribute: attr,
})
}
Comment on lines +195 to +200
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this could be done more elegantly by iterating over an array of attribute keys and start a Go routine in parallel. I would also prefer to use https://pkg.go.dev/golang.org/x/sync/errgroup instead of standard WaitGroup.

Something like this:

func (e *external) describeInstanceAttributes(ctx context.Context, cr *svcapitypes.Instance) (*awsec2.DescribeInstanceAttributeOutput, error) {
	eg := errgroup.Group{}
	out := &awsec2.DescribeInstanceAttributeOutput{}
	describeAttribute := func(attr types.InstanceAttributeName, onSuccess func(res *awsec2.DescribeInstanceAttributeOutput)) {
		eg.Go(func() error {
			res, err := e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{
				InstanceId: aws.String(meta.GetExternalName(cr)),
				Attribute: attr,
			})
			if err != nil {
				err = errors.Wrap(err, string(attr))
				return err
			}
			onSuccess(res)
			return nil
		})
	}

	describeAttribute(types.InstanceAttributeNameDisableApiTermination, func(res *awsec2.DescribeInstanceAttributeOutput) {
		out.DisableApiTermination = res.DisableApiTermination
	})
	describeAttribute(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior, func(res *awsec2.DescribeInstanceAttributeOutput) {
		out.InstanceInitiatedShutdownBehavior = res.InstanceInitiatedShutdownBehavior
	})
	describeAttribute(types.InstanceAttributeNameUserData, func(res *awsec2.DescribeInstanceAttributeOutput) {
		out.UserData = res.UserData
	})
	
	err := eg.Wait()
	return out, err
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, cool. I didn't know about errgroup. It looks much better now.


eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameDisableApiTermination); err != nil {
return errorutils.Wrap(err, "fetching DisableApiTermination")
} else {
attrs.DisableApiTermination = res.DisableApiTermination
return nil
}
})
MisterMX marked this conversation as resolved.
Show resolved Hide resolved

eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior); err != nil {
return errorutils.Wrap(err, "fetching InstanceInitiatedShutdownBehavior")
} else {
attrs.InstanceInitiatedShutdownBehavior = res.InstanceInitiatedShutdownBehavior
return nil
}
})

eg.Go(func() error {
if res, err := descAttr(types.InstanceAttributeNameUserData); err != nil {
return errorutils.Wrap(err, "fetching UserData")
} else {
attrs.UserData = res.UserData
return nil
}
})

attrsErr := eg.Wait()

if describeError != nil {
return nil, attrs,
errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, describeError), errDescribe)
}

// deleted instances that have not yet been cleaned up from the cluster return a
// 200 OK with a nil response.Reservations slice
if len(describeOutput.Reservations) == 0 {
return nil, attrs, nil
}

// in a successful response, there should be one and only one object
if len(describeOutput.Reservations[0].Instances) != 1 {
return nil, attrs, errors.New(errMultipleItems)
}

if attrsErr != nil {
return nil, attrs, errorutils.Wrap(attrsErr, errDescribe)
}
return &describeOutput.Reservations[0].Instances[0], attrs, nil
}

func (e *external) Create(ctx context.Context, mgd resource.Managed) (managed.ExternalCreation, error) {
cr, ok := mgd.(*svcapitypes.Instance)
if !ok {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/ec2/instance/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func TestObserve(t *testing.T) {
}},
}, nil
},
MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) {
return &awsec2.DescribeInstanceAttributeOutput{}, nil
},
},
cr: instance(withSpec(manualv1alpha1.InstanceParameters{
InstanceType: string(types.InstanceTypeM1Small),
Expand All @@ -187,6 +190,9 @@ func TestObserve(t *testing.T) {
MockDescribeInstances: func(ctx context.Context, input *awsec2.DescribeInstancesInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstancesOutput, error) {
return &awsec2.DescribeInstancesOutput{}, errBoom
},
MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) {
return &awsec2.DescribeInstanceAttributeOutput{}, nil
},
},
cr: instance(withSpec(manualv1alpha1.InstanceParameters{
InstanceType: string(types.InstanceTypeM1Small),
Expand Down
Loading