From 97c6fab57a0c8f70092699b8d3e950b8050f0b1c Mon Sep 17 00:00:00 2001 From: Max Melentyev Date: Tue, 2 Apr 2024 12:15:18 +0100 Subject: [PATCH 1/2] Call describe-instance API concurrently Signed-off-by: Max Melentyev --- pkg/controller/ec2/instance/controller.go | 137 +++++++++++------- .../ec2/instance/controller_test.go | 6 + 2 files changed, 92 insertions(+), 51 deletions(-) diff --git a/pkg/controller/ec2/instance/controller.go b/pkg/controller/ec2/instance/controller.go index e5d5a78526..bb68755a87 100644 --- a/pkg/controller/ec2/instance/controller.go +++ b/pkg/controller/ec2/instance/controller.go @@ -18,6 +18,8 @@ package instance import ( "context" + "sync" + "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" awsec2 "github.com/aws/aws-sdk-go-v2/service/ec2" @@ -129,61 +131,14 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E }, nil } - response, err := e.client.DescribeInstances(ctx, - &awsec2.DescribeInstancesInput{ - InstanceIds: []string{meta.GetExternalName(cr)}, - }) - - // deleted instances that have not yet been cleaned up from the cluster return a - // 200 OK with a nil response.Reservations slice - if err == nil && len(response.Reservations) == 0 { - return managed.ExternalObservation{}, nil - } - - if err != nil { - return managed.ExternalObservation{}, - errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, err), errDescribe) - } - - // in a successful response, there should be one and only one object - if len(response.Reservations[0].Instances) != 1 { - return managed.ExternalObservation{}, errors.New(errMultipleItems) + instancePtr, o, err := e.describeInstance(ctx, meta.GetExternalName(cr)) + if err != nil || instancePtr == nil { + return managed.ExternalObservation{}, err } - - observed := response.Reservations[0].Instances[0] + observed := *instancePtr // update the CRD spec for any new values from provider current := cr.Spec.ForProvider.DeepCopy() - - o := awsec2.DescribeInstanceAttributeOutput{} - - for _, input := range []types.InstanceAttributeName{ - types.InstanceAttributeNameDisableApiTermination, - types.InstanceAttributeNameInstanceInitiatedShutdownBehavior, - types.InstanceAttributeNameUserData, - } { - r, err := e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{ - InstanceId: aws.String(meta.GetExternalName(cr)), - Attribute: input, - }) - - if err != nil { - return managed.ExternalObservation{}, errorutils.Wrap(err, errDescribe) - } - - if r.DisableApiTermination != nil { - o.DisableApiTermination = r.DisableApiTermination - } - - if r.InstanceInitiatedShutdownBehavior != nil { - o.InstanceInitiatedShutdownBehavior = r.InstanceInitiatedShutdownBehavior - } - - if r.UserData != nil { - o.UserData = r.UserData - } - } - ec2.LateInitializeInstance(&cr.Spec.ForProvider, &observed, &o) if !cmp.Equal(current, &cr.Spec.ForProvider) { @@ -221,6 +176,86 @@ func (e *external) Observe(ctx context.Context, mgd resource.Managed) (managed.E }, nil } +func (e *external) describeInstance(ctx context.Context, instanceId string) ( + *types.Instance, + awsec2.DescribeInstanceAttributeOutput, + error, +) { + wg := sync.WaitGroup{} + + var describeOutput *awsec2.DescribeInstancesOutput + var describeError error + wg.Add(1) + go func() { + describeOutput, describeError = e.client.DescribeInstances(ctx, &awsec2.DescribeInstancesInput{ + InstanceIds: []string{instanceId}, + }) + wg.Done() + }() + + attrs := awsec2.DescribeInstanceAttributeOutput{} + attrsErr := atomic.Pointer[error]{} + descAttr := func(attr types.InstanceAttributeName) (*awsec2.DescribeInstanceAttributeOutput, error) { + return e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{ + InstanceId: &instanceId, + Attribute: attr, + }) + } + + wg.Add(1) + go func() { + if r, err := descAttr(types.InstanceAttributeNameDisableApiTermination); err != nil { + attrsErr.Store(&err) + } else { + attrs.DisableApiTermination = r.DisableApiTermination + } + wg.Done() + }() + + wg.Add(1) + go func() { + if r, err := descAttr(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior); err != nil { + attrsErr.Store(&err) + } else { + attrs.InstanceInitiatedShutdownBehavior = r.InstanceInitiatedShutdownBehavior + } + wg.Done() + }() + + wg.Add(1) + go func() { + if r, err := descAttr(types.InstanceAttributeNameUserData); err != nil { + attrsErr.Store(&err) + } else { + attrs.UserData = r.UserData + } + wg.Done() + }() + + wg.Wait() + + if describeError != nil { + return nil, attrs, + errorutils.Wrap(resource.Ignore(ec2.IsInstanceNotFoundErr, describeError), errDescribe) + } + + // deleted instances that have not yet been cleaned up from the cluster return a + // 200 OK with a nil response.Reservations slice + if len(describeOutput.Reservations) == 0 { + return nil, attrs, nil + } + + // in a successful response, there should be one and only one object + if len(describeOutput.Reservations[0].Instances) != 1 { + return nil, attrs, errors.New(errMultipleItems) + } + + if err := attrsErr.Load(); err != nil { + return nil, attrs, errorutils.Wrap(*err, errDescribe) + } + return &describeOutput.Reservations[0].Instances[0], attrs, nil +} + func (e *external) Create(ctx context.Context, mgd resource.Managed) (managed.ExternalCreation, error) { cr, ok := mgd.(*svcapitypes.Instance) if !ok { diff --git a/pkg/controller/ec2/instance/controller_test.go b/pkg/controller/ec2/instance/controller_test.go index 96be6c0b07..4eb557cf05 100644 --- a/pkg/controller/ec2/instance/controller_test.go +++ b/pkg/controller/ec2/instance/controller_test.go @@ -166,6 +166,9 @@ func TestObserve(t *testing.T) { }}, }, nil }, + MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) { + return &awsec2.DescribeInstanceAttributeOutput{}, nil + }, }, cr: instance(withSpec(manualv1alpha1.InstanceParameters{ InstanceType: string(types.InstanceTypeM1Small), @@ -187,6 +190,9 @@ func TestObserve(t *testing.T) { MockDescribeInstances: func(ctx context.Context, input *awsec2.DescribeInstancesInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstancesOutput, error) { return &awsec2.DescribeInstancesOutput{}, errBoom }, + MockDescribeInstanceAttribute: func(ctx context.Context, input *awsec2.DescribeInstanceAttributeInput, opts []func(*awsec2.Options)) (*awsec2.DescribeInstanceAttributeOutput, error) { + return &awsec2.DescribeInstanceAttributeOutput{}, nil + }, }, cr: instance(withSpec(manualv1alpha1.InstanceParameters{ InstanceType: string(types.InstanceTypeM1Small), From 6482b8953fa95105622531cc8a627db648844fed Mon Sep 17 00:00:00 2001 From: Max Melentyev Date: Mon, 29 Apr 2024 13:36:04 -0400 Subject: [PATCH 2/2] Use golang.org/x/sync/errgroup Signed-off-by: Max Melentyev --- go.mod | 2 +- pkg/controller/ec2/instance/controller.go | 58 ++++++++++------------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 3af7f1df42..e3f343ad62 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 go.uber.org/zap v1.26.0 + golang.org/x/sync v0.7.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 k8s.io/api v0.28.3 k8s.io/apiextensions-apiserver v0.28.3 @@ -125,7 +126,6 @@ require ( golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/pkg/controller/ec2/instance/controller.go b/pkg/controller/ec2/instance/controller.go index bb68755a87..70acc880b3 100644 --- a/pkg/controller/ec2/instance/controller.go +++ b/pkg/controller/ec2/instance/controller.go @@ -18,8 +18,6 @@ package instance import ( "context" - "sync" - "sync/atomic" "github.com/aws/aws-sdk-go-v2/aws" awsec2 "github.com/aws/aws-sdk-go-v2/service/ec2" @@ -33,6 +31,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -181,20 +180,18 @@ func (e *external) describeInstance(ctx context.Context, instanceId string) ( awsec2.DescribeInstanceAttributeOutput, error, ) { - wg := sync.WaitGroup{} + eg := errgroup.Group{} var describeOutput *awsec2.DescribeInstancesOutput var describeError error - wg.Add(1) - go func() { + eg.Go(func() error { describeOutput, describeError = e.client.DescribeInstances(ctx, &awsec2.DescribeInstancesInput{ InstanceIds: []string{instanceId}, }) - wg.Done() - }() + return nil + }) attrs := awsec2.DescribeInstanceAttributeOutput{} - attrsErr := atomic.Pointer[error]{} descAttr := func(attr types.InstanceAttributeName) (*awsec2.DescribeInstanceAttributeOutput, error) { return e.client.DescribeInstanceAttribute(ctx, &awsec2.DescribeInstanceAttributeInput{ InstanceId: &instanceId, @@ -202,37 +199,34 @@ func (e *external) describeInstance(ctx context.Context, instanceId string) ( }) } - wg.Add(1) - go func() { - if r, err := descAttr(types.InstanceAttributeNameDisableApiTermination); err != nil { - attrsErr.Store(&err) + eg.Go(func() error { + if res, err := descAttr(types.InstanceAttributeNameDisableApiTermination); err != nil { + return errorutils.Wrap(err, "fetching DisableApiTermination") } else { - attrs.DisableApiTermination = r.DisableApiTermination + attrs.DisableApiTermination = res.DisableApiTermination + return nil } - wg.Done() - }() + }) - wg.Add(1) - go func() { - if r, err := descAttr(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior); err != nil { - attrsErr.Store(&err) + eg.Go(func() error { + if res, err := descAttr(types.InstanceAttributeNameInstanceInitiatedShutdownBehavior); err != nil { + return errorutils.Wrap(err, "fetching InstanceInitiatedShutdownBehavior") } else { - attrs.InstanceInitiatedShutdownBehavior = r.InstanceInitiatedShutdownBehavior + attrs.InstanceInitiatedShutdownBehavior = res.InstanceInitiatedShutdownBehavior + return nil } - wg.Done() - }() + }) - wg.Add(1) - go func() { - if r, err := descAttr(types.InstanceAttributeNameUserData); err != nil { - attrsErr.Store(&err) + eg.Go(func() error { + if res, err := descAttr(types.InstanceAttributeNameUserData); err != nil { + return errorutils.Wrap(err, "fetching UserData") } else { - attrs.UserData = r.UserData + attrs.UserData = res.UserData + return nil } - wg.Done() - }() + }) - wg.Wait() + attrsErr := eg.Wait() if describeError != nil { return nil, attrs, @@ -250,8 +244,8 @@ func (e *external) describeInstance(ctx context.Context, instanceId string) ( return nil, attrs, errors.New(errMultipleItems) } - if err := attrsErr.Load(); err != nil { - return nil, attrs, errorutils.Wrap(*err, errDescribe) + if attrsErr != nil { + return nil, attrs, errorutils.Wrap(attrsErr, errDescribe) } return &describeOutput.Reservations[0].Instances[0], attrs, nil }