Skip to content

Commit

Permalink
Merge branch 'master' into mx-psi/add-queue-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
mx-psi committed Jan 7, 2021
2 parents c955e1d + a6e29ad commit ab42619
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 40 deletions.
74 changes: 74 additions & 0 deletions cmd/query/app/mocks/Watcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 21 additions & 13 deletions cmd/query/app/static_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/ui"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/version"
)

Expand Down Expand Up @@ -60,16 +61,18 @@ func RegisterStaticHandler(r *mux.Router, logger *zap.Logger, qOpts *QueryOption

// StaticAssetsHandler handles static assets
type StaticAssetsHandler struct {
options StaticAssetsHandlerOptions
indexHTML atomic.Value // stores []byte
assetsFS http.FileSystem
options StaticAssetsHandlerOptions
indexHTML atomic.Value // stores []byte
assetsFS http.FileSystem
newWatcher func() (fswatcher.Watcher, error)
}

// StaticAssetsHandlerOptions defines options for NewStaticAssetsHandler
type StaticAssetsHandlerOptions struct {
BasePath string
UIConfigPath string
Logger *zap.Logger
NewWatcher func() (fswatcher.Watcher, error)
}

// NewStaticAssetsHandler returns a StaticAssetsHandler
Expand All @@ -83,14 +86,19 @@ func NewStaticAssetsHandler(staticAssetsRoot string, options StaticAssetsHandler
options.Logger = zap.NewNop()
}

if options.NewWatcher == nil {
options.NewWatcher = fswatcher.NewWatcher
}

indexHTML, err := loadAndEnrichIndexHTML(assetsFS.Open, options)
if err != nil {
return nil, err
}

h := &StaticAssetsHandler{
options: options,
assetsFS: assetsFS,
options: options,
assetsFS: assetsFS,
newWatcher: options.NewWatcher,
}

h.indexHTML.Store(indexHTML)
Expand Down Expand Up @@ -134,10 +142,10 @@ func loadAndEnrichIndexHTML(open func(string) (http.File, error), options Static
return indexBytes, nil
}

func (sH *StaticAssetsHandler) configListener(watcher *fsnotify.Watcher) {
func (sH *StaticAssetsHandler) configListener(watcher fswatcher.Watcher) {
for {
select {
case event := <-watcher.Events:
case event := <-watcher.Events():
// ignore if the event filename is not the UI configuration
if filepath.Base(event.Name) != filepath.Base(sH.options.UIConfigPath) {
continue
Expand All @@ -157,7 +165,8 @@ func (sH *StaticAssetsHandler) configListener(watcher *fsnotify.Watcher) {
sH.options.Logger.Error("error while reloading the UI config", zap.Error(err))
}
sH.indexHTML.Store(content)
case err, ok := <-watcher.Errors:
sH.options.Logger.Info("reloaded UI config", zap.String("filename", sH.options.UIConfigPath))
case err, ok := <-watcher.Errors():
if !ok {
return
}
Expand All @@ -168,10 +177,11 @@ func (sH *StaticAssetsHandler) configListener(watcher *fsnotify.Watcher) {

func (sH *StaticAssetsHandler) watch() {
if sH.options.UIConfigPath == "" {
sH.options.Logger.Info("UI config path not provided, config file will not be watched")
return
}

watcher, err := fsnotify.NewWatcher()
watcher, err := sH.newWatcher()
if err != nil {
sH.options.Logger.Error("failed to create a new watcher for the UI config", zap.Error(err))
return
Expand All @@ -181,16 +191,14 @@ func (sH *StaticAssetsHandler) watch() {
sH.configListener(watcher)
}()

err = watcher.Add(sH.options.UIConfigPath)
if err != nil {
if err := watcher.Add(sH.options.UIConfigPath); err != nil {
sH.options.Logger.Error("error adding watcher to file", zap.String("file", sH.options.UIConfigPath), zap.Error(err))
} else {
sH.options.Logger.Info("watching", zap.String("file", sH.options.UIConfigPath))
}

dir := filepath.Dir(sH.options.UIConfigPath)
err = watcher.Add(dir)
if err != nil {
if err := watcher.Add(dir); err != nil {
sH.options.Logger.Error("error adding watcher to dir", zap.String("dir", dir), zap.Error(err))
} else {
sH.options.Logger.Info("watching", zap.String("dir", dir))
Expand Down
174 changes: 147 additions & 27 deletions cmd/query/app/static_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ import (
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/jaegertracing/jaeger/cmd/query/app/mocks"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/testutils"
)

//go:generate mockery -all -dir ../../../pkg/fswatcher

func TestNotExistingUiConfig(t *testing.T) {
handler, err := NewStaticAssetsHandler("/foo/bar", StaticAssetsHandlerOptions{})
require.Error(t, err)
Expand Down Expand Up @@ -115,50 +124,137 @@ func TestNewStaticAssetsHandlerErrors(t *testing.T) {
}
}

// This test is potentially intermittent
func TestWatcherError(t *testing.T) {
const totalWatcherAddCalls = 2

for _, tc := range []struct {
name string
errorOnNthAdd int
newWatcherErr error
watcherAddErr error
wantWatcherAddCalls int
}{
{
name: "NewWatcher error",
newWatcherErr: fmt.Errorf("new watcher error"),
},
{
name: "Watcher.Add first call error",
errorOnNthAdd: 0,
watcherAddErr: fmt.Errorf("add first error"),
wantWatcherAddCalls: 2,
},
{
name: "Watcher.Add second call error",
errorOnNthAdd: 1,
watcherAddErr: fmt.Errorf("add second error"),
wantWatcherAddCalls: 2,
},
} {
t.Run(tc.name, func(t *testing.T) {
// Prepare
zcore, logObserver := observer.New(zapcore.InfoLevel)
logger := zap.New(zcore)
defer func() {
if r := recover(); r != nil {
// Select loop exits without logging error, only containing previous error log.
assert.Equal(t, logObserver.FilterMessage("event").Len(), 1)
assert.Equal(t, "send on closed channel", fmt.Sprint(r))
}
}()

watcher := &mocks.Watcher{}
for i := 0; i < totalWatcherAddCalls; i++ {
var err error
if i == tc.errorOnNthAdd {
err = tc.watcherAddErr
}
watcher.On("Add", mock.Anything).Return(err).Once()
}
watcher.On("Events").Return(make(chan fsnotify.Event))
errChan := make(chan error)
watcher.On("Errors").Return(errChan)

// Test
_, err := NewStaticAssetsHandler("fixture", StaticAssetsHandlerOptions{
UIConfigPath: "fixture/ui-config-hotreload.json",
NewWatcher: func() (fswatcher.Watcher, error) {
return watcher, tc.newWatcherErr
},
Logger: logger,
})

// Validate

// Error logged but not returned
assert.NoError(t, err)
if tc.newWatcherErr != nil {
assert.Equal(t, logObserver.FilterField(zap.Error(tc.newWatcherErr)).Len(), 1)
} else {
assert.Zero(t, logObserver.FilterField(zap.Error(tc.newWatcherErr)).Len())
}

if tc.watcherAddErr != nil {
assert.Equal(t, logObserver.FilterField(zap.Error(tc.watcherAddErr)).Len(), 1)
} else {
assert.Zero(t, logObserver.FilterField(zap.Error(tc.watcherAddErr)).Len())
}

watcher.AssertNumberOfCalls(t, "Add", tc.wantWatcherAddCalls)

// Validate Events and Errors channels
if tc.newWatcherErr == nil {
errChan <- fmt.Errorf("first error")

waitUntil(t, func() bool {
return logObserver.FilterMessage("event").Len() > 0
}, 100, 10*time.Millisecond, "timed out waiting for error")
assert.Equal(t, logObserver.FilterMessage("event").Len(), 1)

close(errChan)
errChan <- fmt.Errorf("second error on closed chan")
}
})
}
}

func TestHotReloadUIConfigTempFile(t *testing.T) {
tmpfile, err := ioutil.TempFile("", "ui-config-hotreload.*.json")
assert.NoError(t, err)
dir, err := ioutil.TempDir("", "ui-config-hotreload-*")
require.NoError(t, err)
defer os.RemoveAll(dir)

tmpfile, err := ioutil.TempFile(dir, "*.json")
require.NoError(t, err)
tmpFileName := tmpfile.Name()
defer os.Remove(tmpFileName)

content, err := ioutil.ReadFile("fixture/ui-config-hotreload.json")
assert.NoError(t, err)
require.NoError(t, err)

err = ioutil.WriteFile(tmpFileName, content, 0644)
assert.NoError(t, err)
err = syncWrite(tmpFileName, content, 0644)
require.NoError(t, err)

zcore, logObserver := observer.New(zapcore.InfoLevel)
logger := zap.New(zcore)
h, err := NewStaticAssetsHandler("fixture", StaticAssetsHandlerOptions{
UIConfigPath: tmpFileName,
Logger: logger,
})
assert.NoError(t, err)
require.NoError(t, err)

c := string(h.indexHTML.Load().([]byte))
assert.Contains(t, c, "About Jaeger")

newContent := strings.Replace(string(content), "About Jaeger", "About a new Jaeger", 1)
err = ioutil.WriteFile(tmpFileName, []byte(newContent), 0644)
assert.NoError(t, err)

done := make(chan bool)
go func() {
for {
i := string(h.indexHTML.Load().([]byte))
err = syncWrite(tmpFileName, []byte(newContent), 0644)
require.NoError(t, err)

if strings.Contains(i, "About a new Jaeger") {
done <- true
}
time.Sleep(10 * time.Millisecond)
}
}()
waitUntil(t, func() bool {
return logObserver.FilterMessage("reloaded UI config").
FilterField(zap.String("filename", tmpFileName)).Len() > 0
}, 100, 10*time.Millisecond, "timed out waiting for the hot reload to kick in")

select {
case <-done:
assert.Contains(t, string(h.indexHTML.Load().([]byte)), "About a new Jaeger")
case <-time.After(time.Second):
assert.Fail(t, "timed out waiting for the hot reload to kick in")
}
i := string(h.indexHTML.Load().([]byte))
assert.Contains(t, i, "About a new Jaeger", logObserver.All())
}

func TestLoadUIConfig(t *testing.T) {
Expand Down Expand Up @@ -225,3 +321,27 @@ func TestLoadIndexHTMLReadError(t *testing.T) {
_, err := loadIndexHTML(open)
require.Error(t, err)
}

func waitUntil(t *testing.T, f func() bool, iterations int, sleepInterval time.Duration, timeoutErrMsg string) {
for i := 0; i < iterations; i++ {
if f() {
return
}
time.Sleep(sleepInterval)
}
require.Fail(t, timeoutErrMsg)
}

// syncWrite ensures data is written to the given filename and flushed to disk.
// This ensures that any watchers looking for file system changes can be reliably alerted.
func syncWrite(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
if err != nil {
return err
}
defer f.Close()
if _, err = f.Write(data); err != nil {
return err
}
return f.Sync()
}
Loading

0 comments on commit ab42619

Please sign in to comment.