Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging to avoid package level var #963

Merged
merged 1 commit into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"github.com/distribworks/dkron/v3/dkron"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -23,7 +24,8 @@ var leaveCmd = &cobra.Command{
},
RunE: func(cmd *cobra.Command, args []string) error {
var gc dkron.DkronGRPCClient
gc = dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc = dkron.NewGRPCClient(nil, nil, log)

if err := gc.Leave(ip); err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions cmd/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/distribworks/dkron/v3/dkron"
"github.com/ryanuber/columnize"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand All @@ -29,7 +30,8 @@ var raftListCmd = &cobra.Command{
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
gc := dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc := dkron.NewGRPCClient(nil, nil, log)

reply, err := gc.RaftGetConfiguration(ip)
if err != nil {
Expand Down Expand Up @@ -60,7 +62,8 @@ var raftRemovePeerCmd = &cobra.Command{
Short: "Command to list raft peers",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
gc := dkron.NewGRPCClient(nil, nil)
log := logrus.NewEntry(logrus.New())
gc := dkron.NewGRPCClient(nil, nil, log)

if err := gc.RaftRemovePeerByID(ip, peerID); err != nil {
return err
Expand Down
94 changes: 49 additions & 45 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ type Agent struct {
activeExecutions sync.Map

listener net.Listener

// logger is the log entry to use fo all logging calls
logger *logrus.Entry
}

// ProcessorFactory is a function type that creates a new instance
Expand Down Expand Up @@ -145,7 +148,8 @@ func NewAgent(config *Config, options ...AgentOption) *Agent {
// Start the current agent by running all the necessary
// checks and server or client routines.
func (a *Agent) Start() error {
InitLogger(a.config.LogLevel, a.config.NodeName)
log := InitLogger(a.config.LogLevel, a.config.NodeName)
a.logger = log

// Normalize configured addresses
a.config.normalizeAddrs()
Expand All @@ -164,7 +168,7 @@ func (a *Agent) Start() error {
}

if err := initMetrics(a); err != nil {
log.Fatal("agent: Can not setup metrics")
a.logger.Fatal("agent: Can not setup metrics")
}

// Expose the node name
Expand All @@ -179,7 +183,7 @@ func (a *Agent) Start() error {
addr := a.bindRPCAddr()
l, err := net.Listen("tcp", addr)
if err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
a.listener = l

Expand All @@ -193,13 +197,13 @@ func (a *Agent) Start() error {
}

grpcServer := grpc.NewServer(opts...)
as := NewAgentServer(a)
as := NewAgentServer(a, a.logger)
proto.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(l)
}

if a.GRPCClient == nil {
a.GRPCClient = NewGRPCClient(nil, a)
a.GRPCClient = NewGRPCClient(nil, a, a.logger)
}

tags := a.serf.LocalMember().Tags
Expand Down Expand Up @@ -233,7 +237,7 @@ func (a *Agent) JoinLAN(addrs []string) (int, error) {
// was participating in leader election or not (local storage).
// Then actually leave the cluster.
func (a *Agent) Stop() error {
log.Info("agent: Called member stop, now stopping")
a.logger.Info("agent: Called member stop, now stopping")

if a.config.Server {
a.raft.Shutdown()
Expand Down Expand Up @@ -264,8 +268,8 @@ func (a *Agent) setupRaft() error {
}

logger := ioutil.Discard
if log.Logger.Level == logrus.DebugLevel {
logger = log.Logger.Writer()
if a.logger.Logger.Level == logrus.DebugLevel {
logger = a.logger.Logger.Writer()
}

transport := raft.NewNetworkTransport(a.raftLayer, 3, raftTimeout, logger)
Expand Down Expand Up @@ -324,25 +328,25 @@ func (a *Agent) setupRaft() error {
// Check for peers.json file for recovery
peersFile := filepath.Join(a.config.DataDir, "raft", "peers.json")
if _, err := os.Stat(peersFile); err == nil {
log.Info("found peers.json file, recovering Raft configuration...")
a.logger.Info("found peers.json file, recovering Raft configuration...")
var configuration raft.Configuration
configuration, err = raft.ReadConfigJSON(peersFile)
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
store, err := NewStore()
store, err := NewStore(a.logger)
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
tmpFsm := newFSM(store, nil)
tmpFsm := newFSM(store, nil, a.logger)
if err := raft.RecoverCluster(config, tmpFsm,
logStore, stableStore, snapshots, transport, configuration); err != nil {
return fmt.Errorf("recovery failed: %v", err)
}
if err := os.Remove(peersFile); err != nil {
return fmt.Errorf("recovery failed to delete peers.json, please delete manually (see peers.info for details): %v", err)
}
log.Info("deleted peers.json file after successful recovery")
a.logger.Info("deleted peers.json file after successful recovery")
}
}

Expand Down Expand Up @@ -370,7 +374,7 @@ func (a *Agent) setupRaft() error {

// Instantiate the Raft systems. The second parameter is a finite state machine
// which stores the actual kv pairs and is operated upon through Apply().
fsm := newFSM(a.Store, a.ProAppliers)
fsm := newFSM(a.Store, a.ProAppliers, a.logger)
rft, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
Expand Down Expand Up @@ -451,19 +455,19 @@ func (a *Agent) setupSerf() (*serf.Serf, error) {
serfConfig.ReconnectTimeout, err = time.ParseDuration(config.SerfReconnectTimeout)

if err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}

// Create a channel to listen for events from Serf
a.eventCh = make(chan serf.Event, 2048)
serfConfig.EventCh = a.eventCh

// Start Serf
log.Info("agent: Dkron agent starting")
a.logger.Info("agent: Dkron agent starting")

if log.Logger.Level == logrus.DebugLevel {
serfConfig.LogOutput = log.Logger.Writer()
serfConfig.MemberlistConfig.LogOutput = log.Logger.Writer()
if a.logger.Logger.Level == logrus.DebugLevel {
serfConfig.LogOutput = a.logger.Logger.Writer()
serfConfig.MemberlistConfig.LogOutput = a.logger.Logger.Writer()
} else {
serfConfig.LogOutput = ioutil.Discard
serfConfig.MemberlistConfig.LogOutput = ioutil.Discard
Expand All @@ -472,7 +476,7 @@ func (a *Agent) setupSerf() (*serf.Serf, error) {
// Create serf first
serf, err := serf.Create(serfConfig)
if err != nil {
log.Error(err)
a.logger.Error(err)
return nil, err
}
return serf, nil
Expand All @@ -491,17 +495,17 @@ func (a *Agent) SetConfig(c *Config) {
// StartServer launch a new dkron server process
func (a *Agent) StartServer() {
if a.Store == nil {
s, err := NewStore()
s, err := NewStore(a.logger)
if err != nil {
log.WithError(err).Fatal("dkron: Error initializing store")
a.logger.WithError(err).Fatal("dkron: Error initializing store")
}
a.Store = s
}

a.sched = NewScheduler()
a.sched = NewScheduler(a.logger)

if a.HTTPTransport == nil {
a.HTTPTransport = NewTransport(a)
a.HTTPTransport = NewTransport(a, a.logger)
}
a.HTTPTransport.ServeHTTP()

Expand Down Expand Up @@ -529,12 +533,12 @@ func (a *Agent) StartServer() {

go func() {
if err := tlsm.Serve(); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
}()
} else {
// Declare a plain RaftLayer
a.raftLayer = NewRaftLayer()
a.raftLayer = NewRaftLayer(a.logger)

// Declare the match for gRPC
grpcl = tcpm.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
Expand All @@ -544,25 +548,25 @@ func (a *Agent) StartServer() {
}

if a.GRPCServer == nil {
a.GRPCServer = NewGRPCServer(a)
a.GRPCServer = NewGRPCServer(a, a.logger)
}

if err := a.GRPCServer.Serve(grpcl); err != nil {
log.WithError(err).Fatal("agent: RPC server failed to start")
a.logger.WithError(err).Fatal("agent: RPC server failed to start")
}

if err := a.raftLayer.Open(raftl); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}

if err := a.setupRaft(); err != nil {
log.WithError(err).Fatal("agent: Raft layer failed to start")
a.logger.WithError(err).Fatal("agent: Raft layer failed to start")
}

// Start serving everything
go func() {
if err := tcpm.Serve(); err != nil {
log.Fatal(err)
a.logger.Fatal(err)
}
}()
go a.monitorLeadership()
Expand Down Expand Up @@ -629,17 +633,17 @@ func (a *Agent) LocalServers() (members []*ServerParts) {
// Listens to events from Serf and handle the event.
func (a *Agent) eventLoop() {
serfShutdownCh := a.serf.ShutdownCh()
log.Info("agent: Listen for events")
a.logger.Info("agent: Listen for events")
for {
select {
case e := <-a.eventCh:
log.WithField("event", e.String()).Info("agent: Received event")
a.logger.WithField("event", e.String()).Info("agent: Received event")
metrics.IncrCounter([]string{"agent", "event_received", e.String()}, 1)

// Log all member events
if me, ok := e.(serf.MemberEvent); ok {
for _, member := range me.Members {
log.WithFields(logrus.Fields{
a.logger.WithFields(logrus.Fields{
"node": a.config.NodeName,
"member": member.Name,
"event": e.EventType(),
Expand All @@ -662,26 +666,26 @@ func (a *Agent) eventLoop() {
a.localMemberEvent(me)
case serf.EventMemberUpdate, serf.EventUser, serf.EventQuery: // Ignore
default:
log.WithField("event", e.String()).Warn("agent: Unhandled serf event")
a.logger.WithField("event", e.String()).Warn("agent: Unhandled serf event")
}
}

case <-serfShutdownCh:
log.Warn("agent: Serf shutdown detected, quitting")
a.logger.Warn("agent: Serf shutdown detected, quitting")
return
}
}
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
log.Infof("agent: joining: %v replay: %v", addrs, replay)
a.logger.Infof("agent: joining: %v replay: %v", addrs, replay)
n, err = a.serf.Join(addrs, !replay)
if n > 0 {
log.Infof("agent: joined: %d nodes", n)
a.logger.Infof("agent: joined: %d nodes", n)
}
if err != nil {
log.Warnf("agent: error joining: %v", err)
a.logger.Warnf("agent: error joining: %v", err)
}
return
}
Expand Down Expand Up @@ -881,11 +885,11 @@ func (a *Agent) checkAndSelectServer() (string, error) {
}

for _, peer := range peers {
log.WithField("peer", peer).Debug("Checking peer")
a.logger.WithField("peer", peer).Debug("Checking peer")
conn, err := net.DialTimeout("tcp", peer, 1*time.Second)
if err == nil {
conn.Close()
log.WithField("peer", peer).Debug("Found good peer")
a.logger.WithField("peer", peer).Debug("Found good peer")
return peer, nil
}
}
Expand All @@ -894,27 +898,27 @@ func (a *Agent) checkAndSelectServer() (string, error) {

func (a *Agent) startReporter() {
if a.config.DisableUsageStats || a.config.DevMode {
log.Info("agent: usage report client disabled")
a.logger.Info("agent: usage report client disabled")
return
}

clusterID, err := a.config.Hash()
if err != nil {
log.Warning("agent: unable to hash the service configuration:", err.Error())
a.logger.Warn("agent: unable to hash the service configuration:", err.Error())
return
}

go func() {
serverID, _ := uuid.GenerateUUID()
log.Info(fmt.Sprintf("agent: registering usage stats for cluster ID '%s'", clusterID))
a.logger.Info(fmt.Sprintf("agent: registering usage stats for cluster ID '%s'", clusterID))

if err := client.StartReporter(context.Background(), client.Options{
ClusterID: clusterID,
ServerID: serverID,
URL: "https://stats.dkron.io",
Version: fmt.Sprintf("%s %s", Name, Version),
}); err != nil {
log.Warning("agent: unable to create the usage report client:", err.Error())
a.logger.Warn("agent: unable to create the usage report client:", err.Error())
}
}()
}
3 changes: 2 additions & 1 deletion dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dkron
import (
"fmt"
"io/ioutil"
"log"
"os"
"testing"
"time"
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestAgentCommand_runForElection(t *testing.T) {
// Wait until a follower steps as leader
time.Sleep(2 * time.Second)
assert.True(t, (a2.IsLeader() || a3.IsLeader()))
log.Info(a3.IsLeader())
log.Println(a3.IsLeader())

a2.Stop()
a3.Stop()
Expand Down
Loading