-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafkabp: Support AWS IMDS v2 for rack id #657
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |||||
"encoding" | ||||||
"fmt" | ||||||
"io" | ||||||
"log/slog" | ||||||
"net/http" | ||||||
"strings" | ||||||
"sync" | ||||||
|
@@ -88,7 +89,7 @@ func FixedRackID(id string) RackIDFunc { | |||||
// Default values for SimpleHTTPRackIDConfig. | ||||||
const ( | ||||||
SimpleHTTPRackIDDefaultLimit = 1024 | ||||||
SimpleHTTPRackIDDefaultTimeout = time.Second | ||||||
SimpleHTTPRackIDDefaultTimeout = 1 * time.Second | ||||||
) | ||||||
|
||||||
// SimpleHTTPRackIDConfig defines the config to be used in SimpleHTTPRackID. | ||||||
|
@@ -126,55 +127,104 @@ func SimpleHTTPRackID(cfg SimpleHTTPRackIDConfig) RackIDFunc { | |||||
} | ||||||
|
||||||
return func() string { | ||||||
client := http.Client{ | ||||||
Timeout: cfg.Timeout, | ||||||
} | ||||||
resp, err := client.Get(cfg.URL) | ||||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout) | ||||||
defer cancel() | ||||||
|
||||||
req, err := http.NewRequestWithContext(ctx, cfg.URL, http.MethodGet, http.NoBody) | ||||||
if err != nil { | ||||||
cfg.Logger.Log(context.Background(), fmt.Sprintf( | ||||||
cfg.Logger.Log(ctx, fmt.Sprintf( | ||||||
"Failed to get rack id from %s: %v", | ||||||
cfg.URL, | ||||||
err, | ||||||
)) | ||||||
return "" | ||||||
} | ||||||
|
||||||
defer func() { | ||||||
io.Copy(io.Discard, resp.Body) | ||||||
resp.Body.Close() | ||||||
}() | ||||||
content, err := io.ReadAll(io.LimitReader(resp.Body, cfg.Limit)) | ||||||
content, err := doHTTP(req, cfg.Limit) | ||||||
if err != nil { | ||||||
cfg.Logger.Log(context.Background(), fmt.Sprintf( | ||||||
"Failed to read rack id response from %s: %v", | ||||||
"Failed to get rack id from %s: %v", | ||||||
cfg.URL, | ||||||
err, | ||||||
)) | ||||||
return "" | ||||||
} | ||||||
if resp.StatusCode >= 400 { | ||||||
cfg.Logger.Log(context.Background(), fmt.Sprintf( | ||||||
"Rack id URL %s returned status code %d: %s", | ||||||
cfg.URL, | ||||||
resp.StatusCode, | ||||||
content, | ||||||
)) | ||||||
return "" | ||||||
} | ||||||
return strings.TrimSpace(string(content)) | ||||||
return content | ||||||
} | ||||||
} | ||||||
|
||||||
// Global cache for AWSAvailabilityZoneRackID. | ||||||
var ( | ||||||
awsCachedRackID string | ||||||
awsRackIDOnce sync.Once | ||||||
) | ||||||
var client http.Client | ||||||
|
||||||
// doHTTP executes http request, reads the body up to the limit given, and | ||||||
// return the body read as string with whitespace trimmed. | ||||||
func doHTTP(r *http.Request, readLimit int64) (string, error) { | ||||||
resp, err := client.Do(r) | ||||||
if err != nil { | ||||||
return "", fmt.Errorf("kafkabp.doHTTP: request failed: %w", err) | ||||||
} | ||||||
|
||||||
defer func() { | ||||||
io.Copy(io.Discard, resp.Body) | ||||||
resp.Body.Close() | ||||||
}() | ||||||
|
||||||
content, err := io.ReadAll(io.LimitReader(resp.Body, readLimit)) | ||||||
if err != nil { | ||||||
return "", fmt.Errorf("kafkabp.doHTTP: failed to read response body: %w", err) | ||||||
} | ||||||
|
||||||
body := strings.TrimSpace(string(content)) | ||||||
if resp.StatusCode >= 400 { | ||||||
return "", fmt.Errorf("kafkabp.doHTTP: got http response with code %d and body %q", resp.StatusCode, body) | ||||||
} | ||||||
|
||||||
return body, nil | ||||||
} | ||||||
|
||||||
// References: | ||||||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html | ||||||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html | ||||||
const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability-zone" | ||||||
var awsRackID = sync.OnceValues(func() (string, error) { | ||||||
const ( | ||||||
// References: | ||||||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html | ||||||
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html | ||||||
tokenURL = "http://169.254.169.254/latest/api/token" | ||||||
azURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone" | ||||||
|
||||||
timeout = time.Second | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔕 I think units should always come with a scalar too for readability, even if it's
Suggested change
|
||||||
readLimit = 1024 | ||||||
) | ||||||
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout) | ||||||
defer cancel() | ||||||
|
||||||
token, err := func(ctx context.Context) (string, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments as the |
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, tokenURL, http.NoBody) | ||||||
if err != nil { | ||||||
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", tokenURL, err) | ||||||
} | ||||||
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600") | ||||||
|
||||||
token, err := doHTTP(req, readLimit) | ||||||
if err != nil { | ||||||
return "", fmt.Errorf("kafkabp.awsRackID: failed to get AWS IMDS v2 token from url %q: %w", tokenURL, err) | ||||||
} | ||||||
return token, nil | ||||||
}(ctx) | ||||||
if err != nil { | ||||||
return "", err | ||||||
} | ||||||
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, azURL, http.NoBody) | ||||||
if err != nil { | ||||||
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", azURL, err) | ||||||
} | ||||||
req.Header.Set("X-aws-ec2-metadata-token", token) | ||||||
|
||||||
id, err := doHTTP(req, readLimit) | ||||||
if err != nil { | ||||||
err = fmt.Errorf("kafkabp.awsRackID: failed to get AWS availability zone from url %q: %w", azURL, err) | ||||||
} | ||||||
return id, err | ||||||
}) | ||||||
|
||||||
// AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS | ||||||
// availability zone as the rack id. | ||||||
|
@@ -194,18 +244,18 @@ const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability | |||||
// // other configs | ||||||
// }) | ||||||
// | ||||||
// It uses SimpleHTTPRackIDConfig underneath with log.DefaultWrapper with a | ||||||
// prometheus counter of kafkabp_aws_rack_id_failure_total and default | ||||||
// Limit & Timeout. | ||||||
// It uses AWS instance metadata HTTP API with 1second overall timeout and 1024 | ||||||
// HTTP response read limits.. | ||||||
// | ||||||
// If there was an error retrieving rack id through AWS instance metadata API, | ||||||
// the same error will be logged at slog's warning level every time | ||||||
// AWSAvailabilityZoneRackID is called. | ||||||
func AWSAvailabilityZoneRackID() string { | ||||||
awsRackIDOnce.Do(func() { | ||||||
awsCachedRackID = SimpleHTTPRackID(SimpleHTTPRackIDConfig{ | ||||||
URL: awsAZurl, | ||||||
Logger: log.CounterWrapper( | ||||||
nil, // delegate, let it fallback to DefaultWrapper | ||||||
awsRackFailure, | ||||||
), | ||||||
})() | ||||||
}) | ||||||
return awsCachedRackID | ||||||
id, err := awsRackID() | ||||||
if err != nil { | ||||||
awsRackFailure.Inc() | ||||||
slog.Warn("Failed to get AWS availability zone as rack id", "err", err) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔕 is v0 ready for slog? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want to gradually shift v0 to use slog directly, which will use ctxlog if services have ctxlog set up in their main (as v0 cannot use ctxlog directly) |
||||||
return "" | ||||||
} | ||||||
return id | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔕 is there a reference for the IMDSv2 / token API to add?