Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable agent to send custom headers to kibana/ES #26275

Merged
merged 33 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
becc30b
add custom headers
michalpristas Jun 10, 2021
8d91386
Merge branch 'master' of github.com:elastic/beats into feat/custom-he…
michalpristas Jun 14, 2021
4515d5e
headers
michalpristas Jun 14, 2021
de598b3
changelog
michalpristas Jun 14, 2021
9ebe416
rename
michalpristas Jun 14, 2021
e92ef42
add kibana headers for setup
michalpristas Jun 14, 2021
d9c0e99
missing headers def
michalpristas Jun 14, 2021
b159ebd
missing headers def
michalpristas Jun 14, 2021
8695fed
headers
michalpristas Jun 15, 2021
449476d
rename
michalpristas Jun 15, 2021
029203b
Update enroll.go
michalpristas Jun 15, 2021
c711673
[Oracle] Fixing default values for paths in config template (#26276)
P1llus Jun 14, 2021
8859a2b
Change xml processor names in script processor to match convention (#…
andrewkroh Jun 14, 2021
5f9e7a5
[filebeat] Add preserve_original_event option to o365audit input (#26…
marc-gr Jun 14, 2021
ef933b4
Report total and free CPU for vSphere virtual machines (#26167)
jsoriano Jun 14, 2021
2b053a3
Update go version to 1.16.5 (#26186)
michel-laterman Jun 14, 2021
553be20
Add support note (#26283)
Jun 14, 2021
54981fe
Add log.flags and object metadata to aws-s3 input events (#26267)
andrewkroh Jun 14, 2021
2d44d87
Handle data returned with io.EOF in LineReader (#26260)
andrewkroh Jun 14, 2021
da028ac
Disable packaging-arm in elastic-agent (#26240)
kaiyan-sheng Jun 14, 2021
6f6302e
First refactor of the system module - system/cpu and system/core (#25…
fearful-symmetry Jun 14, 2021
01f6530
Don't include full ES index template in errors (#25743)
andrewkroh Jun 15, 2021
ee2777d
Change link to snapshots in README (#26317)
jsoriano Jun 15, 2021
2d8b720
[Packetbeat] Add `url.extension` to Packetbeat HTTP events (#25999)
legoguy1000 Jun 15, 2021
e5e74e3
Osquerybeat: Align with the rest of the beats, set the ECS version (#…
aleksmaus Jun 15, 2021
5c5e7d8
Updated filter expression for filtering 86 artifacts (#26313)
michalpristas Jun 15, 2021
77deb96
Forward port 7.13.2 changelog to master (#26323)
andresrc Jun 15, 2021
e608bb6
Update go-structform to 0.0.9 (#26251)
Jun 15, 2021
939964b
[Filebeat] [MongoDB] Support MongoDB 4.4 json logs (#24774)
tetianakravchenko Jun 16, 2021
12a2fd0
update envoyproxy ECS version (#26277)
P1llus Jun 16, 2021
255e83c
update threatintel ECS version (#26274)
P1llus Jun 16, 2021
b9ee11a
rename
michalpristas Jun 17, 2021
58f3526
conflicts
michalpristas Jun 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,4 @@
- Keep http and logging config during enroll {pull}25132[25132]
- Log output of container to $LOGS_PATH/elastic-agent-start.log when LOGS_PATH set {pull}25150[25150]
- Use `filestream` input for internal log collection. {pull}25660[25660]
- Enable agent to send custom headers to kibana/ES {pull}26275[26275]
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const maxRetriesloadAgentInfo = 5

type persistentAgentInfo struct {
ID string `json:"id" yaml:"id" config:"id"`
CustomHeaders map[string]string `json:"custom_headers" yaml:"custom_headers" config:"custom_headers"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why prefix with custom? Why not just "headers"?

That would match the elasticsearch configuration - https://www.elastic.co/guide/en/beats/filebeat/current/elasticsearch-output.html#_headers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following naming from cloud

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are asking for the feature but we should align the naming with what is in beats to easy the transition to our users from beats to Elastic Agent. Unless we feel that headers is not a good name and now is the time to change it. I feel that headers is a better name over custom_headers.

LogLevel string `json:"logging.level,omitempty" yaml:"logging.level,omitempty" config:"logging.level,omitempty"`
MonitoringHTTP *monitoringConfig.MonitoringHTTPConfig `json:"monitoring.http,omitempty" yaml:"monitoring.http,omitempty" config:"monitoring.http,omitempty"`
}
Expand Down
15 changes: 11 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
agentID string
logLevel string
agentID string
logLevel string
customHeaders map[string]string
}

// NewAgentInfoWithLog creates a new agent information.
Expand All @@ -28,8 +29,9 @@ func NewAgentInfoWithLog(level string, createAgentID bool) (*AgentInfo, error) {
}

return &AgentInfo{
agentID: agentInfo.ID,
logLevel: agentInfo.LogLevel,
agentID: agentInfo.ID,
logLevel: agentInfo.LogLevel,
customHeaders: agentInfo.CustomHeaders,
}, nil
}

Expand Down Expand Up @@ -84,3 +86,8 @@ func (*AgentInfo) Version() string {
func (*AgentInfo) Snapshot() bool {
return release.Snapshot()
}

// CustomHeaders returns custom headers used to communicate with elasticsearch.
func (i *AgentInfo) CustomHeaders() map[string]string {
return i.customHeaders
}
142 changes: 31 additions & 111 deletions x-pack/elastic-agent/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
}

if cfg.Kibana.Fleet.Setup {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.FleetServer.CustomKibanaHeaders)
if err != nil {
return err
}
Expand All @@ -286,7 +286,7 @@ func runContainerCmd(streams *cli.IOStreams, cmd *cobra.Command, cfg setupConfig
token := cfg.Fleet.EnrollmentToken
if token == "" && !cfg.FleetServer.Enable {
if client == nil {
client, err = kibanaClient(cfg.Kibana)
client, err = kibanaClient(cfg.Kibana, cfg.FleetServer.CustomKibanaHeaders)
if err != nil {
return err
}
Expand Down Expand Up @@ -363,6 +363,11 @@ func buildEnrollArgs(cfg setupConfig, token string, policyID string) ([]string,
if cfg.FleetServer.CertKey != "" {
args = append(args, "--fleet-server-cert-key", cfg.FleetServer.CertKey)
}

for k, v := range cfg.FleetServer.CustomHeaders {
args = append(args, "--header", k+"="+v)
}

if cfg.Fleet.URL != "" {
args = append(args, "--url", cfg.Fleet.URL)
}
Expand Down Expand Up @@ -444,19 +449,21 @@ func kibanaFetchToken(cfg setupConfig, client *kibana.Client, policy *kibanaPoli
return keyDetail.Item.APIKey, nil
}

func kibanaClient(cfg kibanaConfig) (*kibana.Client, error) {
func kibanaClient(cfg kibanaConfig, headers map[string]string) (*kibana.Client, error) {
var tls *tlscommon.Config
if cfg.Fleet.CA != "" {
tls = &tlscommon.Config{
CAs: []string{cfg.Fleet.CA},
}
}

return kibana.NewClientWithConfig(&kibana.ClientConfig{
Host: cfg.Fleet.Host,
Username: cfg.Fleet.Username,
Password: cfg.Fleet.Password,
IgnoreVersion: true,
TLS: tls,
Headers: headers,
})
}

Expand Down Expand Up @@ -518,6 +525,27 @@ func envBool(keys ...string) bool {
return false
}

func envMap(key string) map[string]string {
m := make(map[string]string)
prefix := key + "="
for _, env := range os.Environ() {
if !strings.HasPrefix(env, prefix) {
continue
}

envVal := strings.TrimPrefix(env, prefix)

keyValue := strings.SplitN(envVal, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}

return m
}

func isTrue(val string) bool {
trueVals := []string{"1", "true", "yes", "y"}
val = strings.ToLower(val)
Expand Down Expand Up @@ -815,114 +843,6 @@ type kibanaAPIKeyDetail struct {
Item kibanaAPIKey `json:"item"`
}

// setup configuration

type setupConfig struct {
Fleet fleetConfig `config:"fleet"`
FleetServer fleetServerConfig `config:"fleet_server"`
Kibana kibanaConfig `config:"kibana"`
}

type elasticsearchConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Username string `config:"username"`
Password string `config:"password"`
ServiceToken string `config:"service_token"`
}

type fleetConfig struct {
CA string `config:"ca"`
Enroll bool `config:"enroll"`
EnrollmentToken string `config:"enrollment_token"`
Force bool `config:"force"`
Insecure bool `config:"insecure"`
TokenName string `config:"token_name"`
TokenPolicyName string `config:"token_policy_name"`
URL string `config:"url"`
}

type fleetServerConfig struct {
Cert string `config:"cert"`
CertKey string `config:"cert_key"`
Elasticsearch elasticsearchConfig `config:"elasticsearch"`
Enable bool `config:"enable"`
Host string `config:"host"`
InsecureHTTP bool `config:"insecure_http"`
PolicyID string `config:"policy_id"`
Port string `config:"port"`
}

type kibanaConfig struct {
Fleet kibanaFleetConfig `config:"fleet"`
RetrySleepDuration time.Duration `config:"retry_sleep_duration"`
RetryMaxCount int `config:"retry_max_count"`
}

type kibanaFleetConfig struct {
CA string `config:"ca"`
Host string `config:"host"`
Password string `config:"password"`
Setup bool `config:"setup"`
Username string `config:"username"`
}

func defaultAccessConfig() (setupConfig, error) {
retrySleepDuration, err := envDurationWithDefault(defaultRequestRetrySleep, requestRetrySleepEnv)
if err != nil {
return setupConfig{}, err
}

retryMaxCount, err := envIntWithDefault(defaultMaxRequestRetries, maxRequestRetriesEnv)
if err != nil {
return setupConfig{}, err
}

cfg := setupConfig{
Fleet: fleetConfig{
CA: envWithDefault("", "FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
Enroll: envBool("FLEET_ENROLL", "FLEET_SERVER_ENABLE"),
EnrollmentToken: envWithDefault("", "FLEET_ENROLLMENT_TOKEN"),
Force: envBool("FLEET_FORCE"),
Insecure: envBool("FLEET_INSECURE"),
TokenName: envWithDefault("Default", "FLEET_TOKEN_NAME"),
TokenPolicyName: envWithDefault("", "FLEET_TOKEN_POLICY_NAME"),
URL: envWithDefault("", "FLEET_URL"),
},
FleetServer: fleetServerConfig{
Cert: envWithDefault("", "FLEET_SERVER_CERT"),
CertKey: envWithDefault("", "FLEET_SERVER_CERT_KEY"),
Elasticsearch: elasticsearchConfig{
Host: envWithDefault("http://elasticsearch:9200", "FLEET_SERVER_ELASTICSEARCH_HOST", "ELASTICSEARCH_HOST"),
Username: envWithDefault("elastic", "FLEET_SERVER_ELASTICSEARCH_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "FLEET_SERVER_ELASTICSEARCH_PASSWORD", "ELASTICSEARCH_PASSWORD"),
ServiceToken: envWithDefault("", "FLEET_SERVER_SERVICE_TOKEN"),
CA: envWithDefault("", "FLEET_SERVER_ELASTICSEARCH_CA", "ELASTICSEARCH_CA"),
},
Enable: envBool("FLEET_SERVER_ENABLE"),
Host: envWithDefault("", "FLEET_SERVER_HOST"),
InsecureHTTP: envBool("FLEET_SERVER_INSECURE_HTTP"),
PolicyID: envWithDefault("", "FLEET_SERVER_POLICY_ID", "FLEET_SERVER_POLICY"),
Port: envWithDefault("", "FLEET_SERVER_PORT"),
},
Kibana: kibanaConfig{
Fleet: kibanaFleetConfig{
// Remove FLEET_SETUP in 8.x
// The FLEET_SETUP environment variable boolean is a fallback to the old name. The name was updated to
// reflect that its setting up Fleet in Kibana versus setting up Fleet Server.
Setup: envBool("KIBANA_FLEET_SETUP", "FLEET_SETUP"),
Host: envWithDefault("http://kibana:5601", "KIBANA_FLEET_HOST", "KIBANA_HOST"),
Username: envWithDefault("elastic", "KIBANA_FLEET_USERNAME", "KIBANA_USERNAME", "ELASTICSEARCH_USERNAME"),
Password: envWithDefault("changeme", "KIBANA_FLEET_PASSWORD", "KIBANA_PASSWORD", "ELASTICSEARCH_PASSWORD"),
CA: envWithDefault("", "KIBANA_FLEET_CA", "KIBANA_CA", "ELASTICSEARCH_CA"),
},
RetrySleepDuration: retrySleepDuration,
RetryMaxCount: retryMaxCount,
},
}
return cfg, nil
}

func envDurationWithDefault(defVal string, keys ...string) (time.Duration, error) {
valStr := defVal
for _, key := range keys {
Expand Down
24 changes: 24 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
Expand Down Expand Up @@ -59,6 +60,7 @@ func addEnrollFlags(cmd *cobra.Command) {
cmd.Flags().Uint16P("fleet-server-port", "", 0, "Fleet Server HTTP binding port (overrides the policy)")
cmd.Flags().StringP("fleet-server-cert", "", "", "Certificate to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringP("fleet-server-cert-key", "", "", "Private key to use for exposed Fleet Server HTTPS endpoint")
cmd.Flags().StringSliceP("header", "", []string{}, "App auth token used for elasticsearch")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation for this flag is incorrect, seems like a copy/paste error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for noticing

cmd.Flags().BoolP("fleet-server-insecure-http", "", false, "Expose Fleet Server over HTTP (not recommended; insecure)")
cmd.Flags().StringP("certificate-authorities", "a", "", "Comma separated list of root certificate for server verifications")
cmd.Flags().StringP("ca-sha256", "p", "", "Comma separated list of certificate authorities hash pins used for certificate verifications")
Expand All @@ -81,6 +83,7 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
fPort, _ := cmd.Flags().GetUint16("fleet-server-port")
fCert, _ := cmd.Flags().GetString("fleet-server-cert")
fCertKey, _ := cmd.Flags().GetString("fleet-server-cert-key")
fCustomHeaders, _ := cmd.Flags().GetStringSlice("header")
fInsecure, _ := cmd.Flags().GetBool("fleet-server-insecure-http")
ca, _ := cmd.Flags().GetString("certificate-authorities")
sha256, _ := cmd.Flags().GetString("ca-sha256")
Expand Down Expand Up @@ -128,6 +131,12 @@ func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string
args = append(args, "--fleet-server-cert-key")
args = append(args, fCertKey)
}

for k, v := range mapFromEnvList(fCustomHeaders) {
args = append(args, "--header")
args = append(args, k+"="+v)
}

if fInsecure {
args = append(args, "--fleet-server-insecure-http")
}
Expand Down Expand Up @@ -211,6 +220,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
enrollmentToken, _ := cmd.Flags().GetString("enrollment-token")
fServer, _ := cmd.Flags().GetString("fleet-server-es")
fElasticSearchCA, _ := cmd.Flags().GetString("fleet-server-es-ca")
fCustomHeaders, _ := cmd.Flags().GetStringSlice("header")
fServiceToken, _ := cmd.Flags().GetString("fleet-server-service-token")
fPolicy, _ := cmd.Flags().GetString("fleet-server-policy")
fHost, _ := cmd.Flags().GetString("fleet-server-host")
Expand Down Expand Up @@ -246,6 +256,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, args []string) error {
CertKey: fCertKey,
Insecure: fInsecure,
SpawnAgent: !fromInstall,
CustomHeaders: mapFromEnvList(fCustomHeaders),
},
}

Expand Down Expand Up @@ -285,3 +296,16 @@ func handleSignal(ctx context.Context) context.Context {

return ctx
}

func mapFromEnvList(envList []string) map[string]string {
m := make(map[string]string)
for _, kv := range envList {
keyValue := strings.SplitN(kv, "=", 2)
if len(keyValue) != 2 {
continue
}

m[keyValue[0]] = keyValue[1]
}
return m
}
31 changes: 26 additions & 5 deletions x-pack/elastic-agent/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type enrollCmdFleetServerOption struct {
CertKey string
Insecure bool
SpawnAgent bool
CustomHeaders map[string]string
}

// enrollCmdOption define all the supported enrollment option.
Expand Down Expand Up @@ -233,7 +234,8 @@ func (c *enrollCmd) fleetServerBootstrap(ctx context.Context) (string, error) {
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.CustomHeaders)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -413,7 +415,7 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
return err
}

agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig)
agentConfig, err := c.createAgentConfig(resp.Item.ID, persistentConfig, c.options.FleetServer.CustomHeaders)
if err != nil {
return err
}
Expand All @@ -423,7 +425,8 @@ func (c *enrollCmd) enroll(ctx context.Context, persistentConfig map[string]inte
c.options.FleetServer.ConnStr, c.options.FleetServer.ServiceToken,
c.options.FleetServer.PolicyID,
c.options.FleetServer.Host, c.options.FleetServer.Port,
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA)
c.options.FleetServer.Cert, c.options.FleetServer.CertKey, c.options.FleetServer.ElasticsearchCA,
c.options.FleetServer.CustomHeaders)
if err != nil {
return err
}
Expand Down Expand Up @@ -718,7 +721,12 @@ func storeAgentInfo(s saver, reader io.Reader) error {
return nil
}

func createFleetServerBootstrapConfig(connStr string, serviceToken string, policyID string, host string, port uint16, cert string, key string, esCA string) (*configuration.FleetAgentConfig, error) {
func createFleetServerBootstrapConfig(
connStr, serviceToken, policyID, host string,
port uint16,
cert, key, esCA string,
headers map[string]string,
) (*configuration.FleetAgentConfig, error) {
es, err := configuration.ElasticsearchFromConnStr(connStr, serviceToken)
if err != nil {
return nil, err
Expand All @@ -734,6 +742,15 @@ func createFleetServerBootstrapConfig(connStr string, serviceToken string, polic
if port == 0 {
port = defaultFleetServerPort
}
if len(headers) > 0 {
if es.Headers == nil {
es.Headers = make(map[string]string)
}
// overwrites previously set headers
for k, v := range headers {
es.Headers[k] = v
}
}
cfg := configuration.DefaultFleetAgentConfig()
cfg.Enabled = true
cfg.Server = &configuration.FleetServerConfig{
Expand Down Expand Up @@ -774,11 +791,15 @@ func createFleetConfigFromEnroll(accessAPIKey string, cli remote.Config) (*confi
return cfg, nil
}

func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}) (map[string]interface{}, error) {
func (c *enrollCmd) createAgentConfig(agentID string, pc map[string]interface{}, headers map[string]string) (map[string]interface{}, error) {
agentConfig := map[string]interface{}{
"id": agentID,
}

if len(headers) > 0 {
agentConfig["custom_headers"] = headers
}

if c.options.Staging != "" {
staging := fmt.Sprintf("https://staging.elastic.co/%s-%s/downloads/", release.Version(), c.options.Staging[:8])
agentConfig["download"] = map[string]interface{}{
Expand Down
Loading