Skip to content

Commit

Permalink
[Ingest Manager] Make agent retry values for bootstraping configurable (
Browse files Browse the repository at this point in the history
elastic#25163)

[Ingest Manager] Make agent retry values for bootstraping configurable (elastic#25163)
  • Loading branch information
michalpristas authored Apr 20, 2021
1 parent a951d19 commit b9e4022
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 26 deletions.
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
- Fix install command for Fleet Server bootstrap, remove need for --enrollment-token when using --fleet-server {pull}24981[24981]
- Respect host configuration for exposed processes endpoint {pull}25114[25114]
- Set --inscure in container when FLEET_SERVER_ENABLE and FLEET_INSECURE set {pull}25137[25137]

- Fixed: limit for retries to Kibana configurable {issue}25063[25063]
==== New features

- Prepare packaging for endpoint and asc files {pull}20186[20186]
Expand Down
98 changes: 73 additions & 25 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
Expand All @@ -40,9 +41,11 @@ import (
)

const (
requestRetrySleep = 1 * time.Second // sleep 1 sec between retries for HTTP requests
maxRequestRetries = 30 // maximum number of retries for HTTP requests
defaultStateDirectory = "/usr/share/elastic-agent/state" // directory that will hold the state data
requestRetrySleepEnv = "KIBANA_REQUEST_RETRY_SLEEP"
maxRequestRetriesEnv = "KIBANA_REQUEST_RETRY_COUNT"
defaultRequestRetrySleep = "1s" // sleep 1 sec between retries for HTTP requests
defaultMaxRequestRetries = "30" // maximum number of retries for HTTP requests
defaultStateDirectory = "/usr/share/elastic-agent/state" // directory that will hold the state data
)

var (
Expand Down Expand Up @@ -70,6 +73,8 @@ The following actions are possible and grouped based on the actions.
KIBANA_FLEET_USERNAME - kibana username to enable Fleet [$KIBANA_USERNAME]
KIBANA_FLEET_PASSWORD - kibana password to enable Fleet [$KIBANA_PASSWORD]
KIBANA_FLEET_CA - path to certificate authority to use with communicate with Kibana [$KIBANA_CA]
KIBANA_REQUEST_RETRY_SLEEP - specifies sleep duration taken when agent performs a request to kibana [default 1s]
KIBANA_REQUEST_RETRY_COUNT - specifies number of retries agent performs when executing a request to kibana [default 30]
* Bootstrapping Fleet Server
This bootstraps the Fleet Server to be run by this Elastic Agent. At least one Fleet Server is required in a Fleet
Expand Down Expand Up @@ -167,7 +172,11 @@ func containerCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
// if not in cloud mode, always run the agent
runAgent := !elasticCloud
// create access configuration from ENV and config files
cfg := defaultAccessConfig()
cfg, err := defaultAccessConfig()
if err != nil {
return err
}

for _, f := range []string{"fleet-setup.yml", "credentials.yml"} {
c, err := config.LoadFile(filepath.Join(paths.Config(), f))
if err != nil && !os.IsNotExist(err) {
Expand Down Expand Up @@ -230,7 +239,6 @@ func containerCmd(streams *cli.IOStreams, cmd *cobra.Command) error {
}
}

var err error
if runAgent {
// run the main elastic-agent container command
err = runContainerCmd(streams, cmd, cfg)
Expand Down Expand Up @@ -260,7 +268,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
return err
}
logInfo(streams, "Performing setup of Fleet in Kibana\n")
err = kibanaSetup(client, streams)
err = kibanaSetup(cfg, client, streams)
if err != nil {
return err
}
Expand All @@ -275,11 +283,11 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
return err
}
}
policy, err = kibanaFetchPolicy(client, cfg, streams)
policy, err = kibanaFetchPolicy(cfg, client, streams)
if err != nil {
return err
}
token, err = kibanaFetchToken(client, policy, streams, cfg.Fleet.TokenName)
token, err = kibanaFetchToken(cfg, client, policy, streams, cfg.Fleet.TokenName)
if err != nil {
return err
}
Expand Down Expand Up @@ -391,30 +399,30 @@ func buildFleetServerConnStr(cfg fleetServerConfig) (string, error) {
return fmt.Sprintf("%s://%s:%s@%s%s", u.Scheme, cfg.Elasticsearch.Username, cfg.Elasticsearch.Password, u.Host, path), nil
}

func kibanaSetup(client *kibana.Client, streams *cli.IOStreams) error {
err := performPOST(client, "/api/fleet/setup", streams.Err, "Kibana Fleet setup")
func kibanaSetup(cfg setupConfig, client *kibana.Client, streams *cli.IOStreams) error {
err := performPOST(cfg, client, "/api/fleet/setup", streams.Err, "Kibana Fleet setup")
if err != nil {
return err
}
err = performPOST(client, "/api/fleet/agents/setup", streams.Err, "Kibana Fleet Agents setup")
err = performPOST(cfg, client, "/api/fleet/agents/setup", streams.Err, "Kibana Fleet Agents setup")
if err != nil {
return err
}
return nil
}

func kibanaFetchPolicy(client *kibana.Client, cfg setupConfig, streams *cli.IOStreams) (*kibanaPolicy, error) {
func kibanaFetchPolicy(cfg setupConfig, client *kibana.Client, streams *cli.IOStreams) (*kibanaPolicy, error) {
var policies kibanaPolicies
err := performGET(client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy")
err := performGET(cfg, client, "/api/fleet/agent_policies", &policies, streams.Err, "Kibana fetch policy")
if err != nil {
return nil, err
}
return findPolicy(cfg, policies.Items)
}

func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams, tokenName string) (string, error) {
func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPolicy, streams *cli.IOStreams, tokenName string) (string, error) {
var keys kibanaAPIKeys
err := performGET(client, "/api/fleet/enrollment-api-keys", &keys, streams.Err, "Kibana fetch token")
err := performGET(cfg, client, "/api/fleet/enrollment-api-keys", &keys, streams.Err, "Kibana fetch token")
if err != nil {
return "", err
}
Expand All @@ -423,7 +431,7 @@ func kibanaFetchToken(client *kibana.Client, policy *kibanaPolicy, streams *cli.
return "", err
}
var keyDetail kibanaAPIKeyDetail
err = performGET(client, fmt.Sprintf("/api/fleet/enrollment-api-keys/%s", key.ID), &keyDetail, streams.Err, "Kibana fetch token detail")
err = performGET(cfg, client, fmt.Sprintf("/api/fleet/enrollment-api-keys/%s", key.ID), &keyDetail, streams.Err, "Kibana fetch token detail")
if err != nil {
return "", err
}
Expand Down Expand Up @@ -513,15 +521,15 @@ func isTrue(val string) bool {
return false
}

func performGET(client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error {
func performGET(cfg setupConfig, client *kibana.Client, path string, response interface{}, writer io.Writer, msg string) error {
var lastErr error
for i := 0; i < maxRequestRetries; i++ {
for i := 0; i < cfg.Kibana.RetryMaxCount; i++ {
code, result, err := client.Connection.Request("GET", path, nil, nil, nil)
if err != nil || code != 200 {
err = fmt.Errorf("http GET request to %s%s fails: %v. Response: %s",
client.Connection.URL, path, err, truncateString(result))
fmt.Fprintf(writer, "%s failed: %s\n", msg, err)
<-time.After(requestRetrySleep)
<-time.After(cfg.Kibana.RetrySleepDuration)
continue
}
if response == nil {
Expand All @@ -532,16 +540,16 @@ func performGET(client *kibana.Client, path string, response interface{}, writer
return lastErr
}

func performPOST(client *kibana.Client, path string, writer io.Writer, msg string) error {
func performPOST(cfg setupConfig, client *kibana.Client, path string, writer io.Writer, msg string) error {
var lastErr error
for i := 0; i < maxRequestRetries; i++ {
for i := 0; i < cfg.Kibana.RetryMaxCount; i++ {
code, result, err := client.Connection.Request("POST", path, nil, nil, nil)
if err != nil || code >= 400 {
err = fmt.Errorf("http POST request to %s%s fails: %v. Response: %s",
client.Connection.URL, path, err, truncateString(result))
lastErr = err
fmt.Fprintf(writer, "%s failed: %s\n", msg, err)
<-time.After(requestRetrySleep)
<-time.After(cfg.Kibana.RetrySleepDuration)
continue
}
return nil
Expand Down Expand Up @@ -778,7 +786,9 @@ type fleetServerConfig struct {
}

type kibanaConfig struct {
Fleet kibanaFleetConfig `config:"fleet"`
Fleet kibanaFleetConfig `config:"fleet"`
RetrySleepDuration time.Duration `config:"retry_sleep_duration"`
RetryMaxCount int `config:"retry_max_count"`
}

type kibanaFleetConfig struct {
Expand All @@ -789,7 +799,17 @@ type kibanaFleetConfig struct {
Username string `config:"username"`
}

func defaultAccessConfig() setupConfig {
func defaultAccessConfig() (setupConfig, error) {
retrySleepDuration, err := envDurationWithDefault(defaultRequestRetrySleep, requestRetrySleepEnv)
if err != nil {
return setupConfig{}, err
}

retryMaxCount, err := envIntWithDefault(defaultMaxRequestRetries, maxRequestRetriesEnv)
if err != nil {
return setupConfig{}, err
}

cfg := setupConfig{
Fleet: fleetConfig{
CA: envWithDefault("", "FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
Expand Down Expand Up @@ -829,7 +849,35 @@ func defaultAccessConfig() setupConfig {
Password: envWithDefault("changeme", "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD"),
CA: envWithDefault("", "KIBANA_FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
},
RetrySleepDuration: retrySleepDuration,
RetryMaxCount: retryMaxCount,
},
}
return cfg
return cfg, nil
}

func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) {
valStr := defVal
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
valStr = val
break
}
}

return time.ParseDuration(valStr)
}

func envIntWithDefault(defVal string, keys ...string) (int, error) {
valStr := defVal
for _, key := range keys {
val, ok := os.LookupEnv(key)
if ok {
valStr = val
break
}
}

return strconv.Atoi(valStr)
}

0 comments on commit b9e4022

Please sign in to comment.