Skip to content

Commit

Permalink
kafkabp: Support AWS IMDS v2 for rack id
Browse files Browse the repository at this point in the history
Also modernize the code a bit (use sync.OnceValues, etc.).
  • Loading branch information
fishy committed Jun 11, 2024
1 parent b7dafe2 commit 63f8545
Showing 1 changed file with 94 additions and 44 deletions.
138 changes: 94 additions & 44 deletions kafkabp/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -88,7 +89,7 @@ func FixedRackID(id string) RackIDFunc {
// Default values for SimpleHTTPRackIDConfig.
const (
SimpleHTTPRackIDDefaultLimit = 1024
SimpleHTTPRackIDDefaultTimeout = time.Second
SimpleHTTPRackIDDefaultTimeout = 1 * time.Second
)

// SimpleHTTPRackIDConfig defines the config to be used in SimpleHTTPRackID.
Expand Down Expand Up @@ -126,55 +127,104 @@ func SimpleHTTPRackID(cfg SimpleHTTPRackIDConfig) RackIDFunc {
}

return func() string {
client := http.Client{
Timeout: cfg.Timeout,
}
resp, err := client.Get(cfg.URL)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, cfg.URL, http.MethodGet, http.NoBody)
if err != nil {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
cfg.Logger.Log(ctx, fmt.Sprintf(
"Failed to get rack id from %s: %v",
cfg.URL,
err,
))
return ""
}

defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
content, err := io.ReadAll(io.LimitReader(resp.Body, cfg.Limit))
content, err := doHTTP(req, cfg.Limit)
if err != nil {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
"Failed to read rack id response from %s: %v",
"Failed to get rack id from %s: %v",
cfg.URL,
err,
))
return ""
}
if resp.StatusCode >= 400 {
cfg.Logger.Log(context.Background(), fmt.Sprintf(
"Rack id URL %s returned status code %d: %s",
cfg.URL,
resp.StatusCode,
content,
))
return ""
}
return strings.TrimSpace(string(content))
return content
}
}

// Global cache for AWSAvailabilityZoneRackID.
var (
awsCachedRackID string
awsRackIDOnce sync.Once
)
var client http.Client

// doHTTP executes http request, reads the body up to the limit given, and
// return the body read as string with whitespace trimmed.
func doHTTP(r *http.Request, readLimit int64) (string, error) {
resp, err := client.Do(r)
if err != nil {
return "", fmt.Errorf("kafkabp.doHTTP: request failed: %w", err)
}

defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

content, err := io.ReadAll(io.LimitReader(resp.Body, readLimit))
if err != nil {
return "", fmt.Errorf("kafkabp.doHTTP: failed to read response body: %w", err)
}

body := strings.TrimSpace(string(content))
if resp.StatusCode >= 400 {
return "", fmt.Errorf("kafkabp.doHTTP: got http response with code %d and body %q", resp.StatusCode, body)
}

return body, nil
}

// References:
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html
const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
var awsRackID = sync.OnceValues(func() (string, error) {
const (
// References:
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-categories.html
tokenURL = "http://169.254.169.254/latest/api/token"
azURL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"

timeout = time.Second
readLimit = 1024
)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

token, err := func(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPut, tokenURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", tokenURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")

token, err := doHTTP(req, readLimit)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to get AWS IMDS v2 token from url %q: %w", tokenURL, err)
}
return token, nil
}(ctx)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, azURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("kafkabp.awsRackID: failed to create request from url %q: %w", azURL, err)
}
req.Header.Set("X-aws-ec2-metadata-token", token)

id, err := doHTTP(req, readLimit)
if err != nil {
err = fmt.Errorf("kafkabp.awsRackID: failed to get AWS availability zone from url %q: %w", azURL, err)
}
return id, err
})

// AWSAvailabilityZoneRackID is a RackIDFunc implementation that returns AWS
// availability zone as the rack id.
Expand All @@ -194,18 +244,18 @@ const awsAZurl = "http://169.254.169.254/latest/meta-data/placement/availability
// // other configs
// })
//
// It uses SimpleHTTPRackIDConfig underneath with log.DefaultWrapper with a
// prometheus counter of kafkabp_aws_rack_id_failure_total and default
// Limit & Timeout.
// It uses AWS instance metadata HTTP API with 1second overall timeout and 1024
// HTTP response read limits..
//
// If there was an error retrieving rack id through AWS instance metadata API,
// the same error will be logged at slog's warning level every time
// AWSAvailabilityZoneRackID is called.
func AWSAvailabilityZoneRackID() string {
awsRackIDOnce.Do(func() {
awsCachedRackID = SimpleHTTPRackID(SimpleHTTPRackIDConfig{
URL: awsAZurl,
Logger: log.CounterWrapper(
nil, // delegate, let it fallback to DefaultWrapper
awsRackFailure,
),
})()
})
return awsCachedRackID
id, err := awsRackID()
if err != nil {
awsRackFailure.Inc()
slog.Warn("Failed to get AWS availability zone as rack id", "err", err)
return ""
}
return id
}

0 comments on commit 63f8545

Please sign in to comment.