Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafkabp: Support AWS IMDS v2 for rack id #657

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 94 additions & 44 deletions kafkabp/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -88,7 +89,7 @@ func FixedRackID(id string) RackIDFunc {
// Default values for SimpleHTTPRackIDConfig.
const (
SimpleHTTPRackIDDefaultLimit = 1024
SimpleHTTPRackIDDefaultTimeout = time.Second
SimpleHTTPRackIDDefaultTimeout = 1 * time.Second
)

// SimpleHTTPRackIDConfig defines the config to be used in SimpleHTTPRackID.
Expand Down Expand Up @@ -126,55 +127,104 @@ func SimpleHTTPRackID(cfg SimpleHTTPRackIDConfig) RackIDFunc {
}

return func() string {
client := http.Client{
Timeout: cfg.Timeout,
}
resp, err := client.Get(cfg.URL)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, cfg.URL, http.MethodGet, http.NoBody)
if err != nil {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
cfg.Logger.Log(ctx, fmt.Sprintf(
"Failed to get rack id from %s: %v",
cfg.URL,
err,
))
return ""
}

defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
content, err := io.ReadAll(io.LimitReader(resp.Body, cfg.Limit))
content, err := doHTTP(req, cfg.Limit)
if err != nil {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
"Failed to read rack id response from %s: %v",
"Failed to get rack id from %s: %v",
cfg.URL,
err,
))
return ""
}
if resp.StatusCode >= 400 {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
"Rack id URL %s returned status code %d: %s",
cfg.URL,
resp.StatusCode,
content,
))
return ""
}
return strings.TrimSpace(string(content))
return content
}
}

// Global cache for AWSAvailabilityZoneRackID.
var (
awsCachedRackID string
awsRackIDOnce sync.Once
)
var client http.Client

// doHTTP executes http request, reads the body up to the limit given, and
// return the body read as string with whitespace trimmed.
func doHTTP(r *http.Request, readLimit int64) (string, error) {
resp, err := client.Do(r)
if err != nil {
return "", fmt.Errorf("kafkabp.doHTTP: request failed: %w", err)
}

defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

content, err := io.ReadAll(io.LimitReader(resp.Body, readLimit))
if err != nil {
return "", fmt.Errorf("kafkabp.doHTTP: failed to read response body: %w", err)
}

body := strings.TrimSpace(string(content))
if resp.StatusCode >= 400 {
return "", fmt.Errorf("kafkabp.doHTTP: got http response with code %d and body %q", resp.StatusCode, body)
}

return body, nil
}

// References:
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html
const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
var awsRackID = sync.OnceValues(func() (string, error) {
const (
// References:
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔕 is there a reference for the IMDSv2 / token API to add?

tokenURL = "http://169.254.169.254/latest/api/token"
azURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"

timeout = time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔕 I think units should always come with a scalar too for readability, even if it's 1

Suggested change
timeout = time.Second
timeout = 1*time.Second

readLimit = 1024
)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

token, err := func(ctx context.Context) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as the rack API: not sure I am seeing the reason for the anonymous function, and that header should be normalized

req, err := http.NewRequestWithContext(ctx, http.MethodPut, tokenURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", tokenURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")

token, err := doHTTP(req, readLimit)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to get AWS IMDS v2 token from url %q: %w", tokenURL, err)
}
return token, nil
}(ctx)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, azURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", azURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token", token)

id, err := doHTTP(req, readLimit)
if err != nil {
err = fmt.Errorf("kafkabp.awsRackID: failed to get AWS availability zone from url %q: %w", azURL, err)
}
return id, err
})

// AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS
// availability zone as the rack id.
Expand All @@ -194,18 +244,18 @@ const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability
// // other configs
// })
//
// It uses SimpleHTTPRackIDConfig underneath with log.DefaultWrapper with a
// prometheus counter of kafkabp_aws_rack_id_failure_total and default
// Limit & Timeout.
// It uses AWS instance metadata HTTP API with 1second overall timeout and 1024
// HTTP response read limits..
//
// If there was an error retrieving rack id through AWS instance metadata API,
// the same error will be logged at slog's warning level every time
// AWSAvailabilityZoneRackID is called.
func AWSAvailabilityZoneRackID() string {
awsRackIDOnce.Do(func() {
awsCachedRackID = SimpleHTTPRackID(SimpleHTTPRackIDConfig{
URL: awsAZurl,
Logger: log.CounterWrapper(
nil, // delegate, let it fallback to DefaultWrapper
awsRackFailure,
),
})()
})
return awsCachedRackID
id, err := awsRackID()
if err != nil {
awsRackFailure.Inc()
slog.Warn("Failed to get AWS availability zone as rack id", "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔕 is v0 ready for slog?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to gradually shift v0 to use slog directly, which will use ctxlog if services have ctxlog set up in their main (as v0 cannot use ctxlog directly)

return ""
}
return id
}
Loading