Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ship logs to http endpoint #1228

Merged
merged 41 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c756dab
rough http buffer for to use for logger
James-Pickett Jun 13, 2023
b6f17ef
splits send buffer into own package, adds httpsenderlog
James-Pickett Jun 16, 2023
15d8ea9
added send mutex unlock after purge failure
James-Pickett Jun 16, 2023
8e1cf84
Merge branch 'main' into james/log-shipping
James-Pickett Jun 16, 2023
14989f9
addes log shipper, subscribes to control server auth token updates
James-Pickett Jun 20, 2023
43d07df
tweaks
James-Pickett Jun 20, 2023
0aa445e
few more tweaks
James-Pickett Jun 20, 2023
99d9661
Merge branch 'main' into james/log-shipping
James-Pickett Jun 20, 2023
73f7a66
add new flag for trace specific ingest url
James-Pickett Jun 21, 2023
6e81014
updates logshipper to use scheme based on obserability tls flag
James-Pickett Jun 21, 2023
da1ea15
update mocks
James-Pickett Jun 21, 2023
3fb0ef9
Merge branch 'main' into james/log-shipping
James-Pickett Jun 21, 2023
bec6850
drop kvstore from send buffer
James-Pickett Jun 22, 2023
ca7dff8
updates var names in send buffer to be more clear
James-Pickett Jun 22, 2023
09d5312
more var renaming
James-Pickett Jun 22, 2023
1e09c8d
fix tests broken by update to trace url name
James-Pickett Jun 22, 2023
11360e1
test clean up
James-Pickett Jun 22, 2023
00f55cf
now using blocking run style for send buffer
James-Pickett Jun 22, 2023
6cc4fd5
adds test for log shipper, updates based on feedback
James-Pickett Jun 22, 2023
4f4ecb9
update misspelled file name
James-Pickett Jun 22, 2023
28513d4
adds some logic to now stop launcher start up if logshipping init errors
James-Pickett Jun 22, 2023
3e28bd2
Merge branch 'main' into james/log-shipping
James-Pickett Jun 22, 2023
8f9df5f
remove context from log shipper constructor
James-Pickett Jun 23, 2023
355a4c8
no creating ctx and cancel func on run call
James-Pickett Jun 23, 2023
4d95623
fix test
James-Pickett Jun 23, 2023
edc9ed4
fixing tests
James-Pickett Jun 23, 2023
ab8ce04
Merge branch 'main' into james/log-shipping
James-Pickett Jun 26, 2023
850cb1b
only start logshipping if we have observerabiltiy ingest url
James-Pickett Jun 26, 2023
6e16b99
adds flag for log shipping enabled
James-Pickett Jun 26, 2023
847a940
update mocks for log shipper test
James-Pickett Jun 26, 2023
b1a3f19
better func names
James-Pickett Jun 26, 2023
c16d9fa
update flags mock
James-Pickett Jun 27, 2023
4ed2bd7
add mutext to delete all data, add test
James-Pickett Jun 27, 2023
5152ea7
fix log shipper url parsing
James-Pickett Jun 27, 2023
d69267c
fix broken test
James-Pickett Jun 27, 2023
6365497
cache is enabled booleaon on logshipper update
James-Pickett Jun 27, 2023
c434308
use explicit log_ingest_url flag instead of observability_ingest_url
James-Pickett Jun 27, 2023
ad75750
drop log shipping enabled flag and just rely on presence of log inges…
James-Pickett Jun 27, 2023
40dabe3
feedback
James-Pickett Jun 27, 2023
890a9c0
consolidated endpoint and token updating, added to tests
James-Pickett Jun 28, 2023
11dc04a
fix tests
James-Pickett Jun 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/kolide/launcher/pkg/debug"
"github.com/kolide/launcher/pkg/launcher"
"github.com/kolide/launcher/pkg/log/checkpoint"
"github.com/kolide/launcher/pkg/log/logshipper"
"github.com/kolide/launcher/pkg/log/teelogger"
"github.com/kolide/launcher/pkg/osquery"
osqueryInstanceHistory "github.com/kolide/launcher/pkg/osquery/runtime/history"
"github.com/kolide/launcher/pkg/service"
Expand Down Expand Up @@ -144,6 +146,19 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err
flagController := flags.NewFlagController(logger, stores[storage.AgentFlagsStore], fcOpts...)
k := knapsack.New(stores, flagController, db)

// Need to set up the log shipper so that we can get the logger early
// and pass it to the various systems.
isShippingLogs := k.ObservabilityIngestServerURL() != ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably okay to start with it. Interesting question is whether it's possible to enable/disable/change the URL from the control server

var logShipper *logshipper.LogShipper
if isShippingLogs {
logShipper, err = logshipper.New(k)
if err != nil {
logger.Log("msg", "failed to create log shipper", "err", err)
} else {
logger = teelogger.New(logger, logShipper)
}
}

// construct the appropriate http client based on security settings
httpClient := http.DefaultClient
if k.InsecureTLS() {
Expand Down Expand Up @@ -305,6 +320,13 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err
runGroup.Add(exp.Execute, exp.Interrupt)
controlService.RegisterSubscriber(authTokensSubsystemName, exp)
}

// begin log shipping and subsribe to token updates
// nil check incase it failed to create for some reason
if isShippingLogs && logShipper != nil {
runGroup.Add(logShipper.Run, logShipper.Stop)
controlService.RegisterSubscriber(authTokensSubsystemName, logShipper)
}
}

runEECode := k.ControlServerURL() != "" || k.IAmBreakingEELicense()
Expand Down
4 changes: 3 additions & 1 deletion cmd/launcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func parseOptions(subcommandName string, args []string) (*launcher.Options, erro
flConfigFilePath = flagset.String("config", defaultConfigFilePath, "config file to parse options from (optional)")
flExportTraces = flagset.Bool("export_traces", false, "Whether to export traces")
flTraceSamplingRate = flagset.Float64("trace_sampling_rate", 0.0, "What fraction of traces should be sampled")
flIngestServerURL = flagset.String("ingest_url", "", "Where to export traces and logs")
flIngestServerURL = flagset.String("ingest_url", "", "Where to export logs and other observability data")
flTraceIngestServerURL = flagset.String("trace_ingest_url", "", "Where to export traces")
flDisableIngestTLS = flagset.Bool("disable_ingest_tls", false, "Disable TLS for observability ingest server communication")

// osquery TLS endpoints
Expand Down Expand Up @@ -249,6 +250,7 @@ func parseOptions(subcommandName string, args []string) (*launcher.Options, erro
EnrollSecretPath: *flEnrollSecretPath,
ExportTraces: *flExportTraces,
ObservabilityIngestServerURL: *flIngestServerURL,
TraceIngestServerURL: *flTraceIngestServerURL,
DisableObservabilityIngestTLS: *flDisableIngestTLS,
AutoloadedExtensions: flAutoloadedExtensions,
IAmBreakingEELicense: *flIAmBreakingEELicense,
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/flags/flag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ func (fc *FlagController) ObservabilityIngestServerURL() string {
).get(fc.getControlServerValue(keys.ObservabilityIngestServerURL))
}

func (fc *FlagController) SetTraceIngestServerURL(url string) error {
return fc.setControlServerValue(keys.TraceIngestServerURL, []byte(url))
}
func (fc *FlagController) TraceIngestServerURL() string {
return NewStringFlagValue(
WithDefaultString(fc.cmdLineOpts.TraceIngestServerURL),
).get(fc.getControlServerValue(keys.TraceIngestServerURL))
}

func (fc *FlagController) SetDisableObservabilityIngestTLS(enabled bool) error {
return fc.setControlServerValue(keys.DisableObservabilityIngestTLS, boolToBytes(enabled))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flags/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
ExportTraces FlagKey = "export_traces"
TraceSamplingRate FlagKey = "trace_sampling_rate"
ObservabilityIngestServerURL FlagKey = "ingest_url"
TraceIngestServerURL FlagKey = "trace_ingest_url"
DisableObservabilityIngestTLS FlagKey = "disable_ingest_tls"
)

Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/knapsack/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ func (k *knapsack) ObservabilityIngestServerURL() string {
return k.flags.ObservabilityIngestServerURL()
}

func (k *knapsack) SetTraceIngestServerURL(url string) error {
return k.flags.SetTraceIngestServerURL(url)
}
func (k *knapsack) TraceIngestServerURL() string {
return k.flags.ObservabilityIngestServerURL()
}

func (k *knapsack) SetDisableObservabilityIngestTLS(enabled bool) error {
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
return k.flags.SetDisableObservabilityIngestTLS(enabled)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/agent/types/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ type Flags interface {
SetTraceSamplingRate(rate float64) error
TraceSamplingRate() float64

// ObservabilityIngestServerURL is the URL of the ingest server for logs and traces
// ObservabilityIngestServerURL is the URL of the ingest server for logs and other observability data
SetObservabilityIngestServerURL(url string) error
ObservabilityIngestServerURL() string

// TraceIngestServerURL is the URL of the ingest server for traces
SetTraceIngestServerURL(url string) error
TraceIngestServerURL() string

// DisableObservabilityIngestTLS disables TLS for observability ingest server communication
SetDisableObservabilityIngestTLS(enabled bool) error
DisableObservabilityIngestTLS() bool
Expand Down
39 changes: 33 additions & 6 deletions pkg/agent/types/mocks/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 33 additions & 6 deletions pkg/agent/types/mocks/knapsack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/launcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ type Options struct {
ExportTraces bool
// TraceSamplingRate is a number between 0.0 and 1.0 that indicates what fraction of traces should be sampled.
TraceSamplingRate float64
// ObservabilityIngestServerURL is the URL that traces and logs will be exported to
// ObservabilityIngestServerURL is the URL that logs and other observability data will be exported to
ObservabilityIngestServerURL string
// TraceIngestServerURL is the URL that traces will be exported to
TraceIngestServerURL string
// DisableObservabilityIngestTLS allows for disabling TLS when connecting to the observability ingest server
DisableObservabilityIngestTLS bool

Expand Down
49 changes: 49 additions & 0 deletions pkg/log/logshipper/authedhttpsender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package logshipper

import (
"fmt"
"io"
"net/http"
"time"
)

type authedHttpSender struct {
endpoint string
authtoken string
client *http.Client
}

func newAuthHttpSender(endpoint, authtoken string) *authedHttpSender {
return &authedHttpSender{
endpoint: endpoint,
authtoken: authtoken,
client: &http.Client{
Timeout: 30 * time.Second,
},
}
}

func (a *authedHttpSender) Send(r io.Reader) error {
req, err := http.NewRequest("POST", a.endpoint, r)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("authorization", a.authtoken)

resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode >= 300 {
bodyData, err := io.ReadAll(resp.Request.Body)
if err != nil {
return fmt.Errorf("received non 200 http status code: %d, error reading body response body %w", resp.StatusCode, err)
}

return fmt.Errorf("received non 200 http status code: %d, response body: %s", resp.StatusCode, bodyData)
}
return nil
}
47 changes: 47 additions & 0 deletions pkg/log/logshipper/authedhttpsender_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package logshipper

import (
"bytes"
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/kolide/kit/ulid"
"github.com/stretchr/testify/require"
)

func Test_authedHttpSender_Send(t *testing.T) {
t.Parallel()
tests := []struct {
name string
}{
{
name: "happy path",
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

dataToSend := []byte(ulid.New())
token := ulid.New()

wg := sync.WaitGroup{}
wg.Add(1)
// create http test server with handle func that returns 200 OK
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, token, r.Header.Get("authorization"))
w.WriteHeader(http.StatusOK)
wg.Done()
}))
defer ts.Close()

authedSender := newAuthHttpSender(ts.URL, token)
authedSender.Send(bytes.NewBuffer(dataToSend))

wg.Wait()
})
}
}
Loading