Skip to content

Commit

Permalink
fix: Logging prefix in grpc agent
Browse files Browse the repository at this point in the history
  • Loading branch information
Victor Castell committed May 28, 2020
1 parent 8d78706 commit b83bde1
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions dkron/grpc_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
metrics "github.com/armon/go-metrics"
"github.com/distribworks/dkron/v3/plugin/types"
"github.com/golang/protobuf/ptypes"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -46,19 +47,23 @@ func NewAgentServer(agent *Agent) types.AgentServer {
// AgentRun is called when an agent starts running a job and lasts all execution,
// the agent will stream execution progress to the server.
func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_AgentRunServer) error {
defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now())
defer metrics.MeasureSince([]string{"grpc_agent", "agent_run"}, time.Now())

job := req.Job
execution := req.Execution

log.WithFields(logrus.Fields{
"job": job.Name,
}).Info("grpc_agent: Starting job")

output, _ := circbuf.NewBuffer(maxBufSize)

var success bool

jex := job.Executor
exc := job.ExecutorConfig
if jex == "" {
return errors.New("invoke: No executor defined, nothing to do")
return errors.New("grpc_agent: No executor defined, nothing to do")
}

// Send the first update with the initial execution state to be stored in the server
Expand All @@ -73,7 +78,7 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A

// Check if executor exists
if executor, ok := as.agent.ExecutorPlugins[jex]; ok {
log.WithField("plugin", jex).Debug("invoke: calling executor plugin")
log.WithField("plugin", jex).Debug("grpc_agent: calling executor plugin")
runningExecutions.Store(execution.GetGroup(), execution)
out, err := executor.Execute(&types.ExecuteRequest{
JobName: job.Name,
Expand All @@ -87,7 +92,7 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A
err = errors.New(out.Error)
}
if err != nil {
log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("invoke: command error output")
log.WithError(err).WithField("job", job.Name).WithField("plugin", executor).Error("grpc_agent: command error output")
success = false
output.Write([]byte(err.Error() + "\n"))
} else {
Expand All @@ -98,8 +103,8 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A
output.Write(out.Output)
}
} else {
log.WithField("executor", jex).Error("invoke: Specified executor is not present")
output.Write([]byte("invoke: Specified executor is not present"))
log.WithField("executor", jex).Error("grpc_agent: Specified executor is not present")
output.Write([]byte("grpc_agent: Specified executor is not present"))
}

execution.FinishedAt = ptypes.TimestampNow()
Expand All @@ -113,7 +118,7 @@ func (as *AgentServer) AgentRun(req *types.AgentRunRequest, stream types.Agent_A
Execution: execution,
}); err != nil {
// In case of error means that maybe the server is gone so fallback to ExecutionDone
log.WithError(err).WithField("job", job.Name).Error("invoke: error sending the final execution, falling back to ExecutionDone")
log.WithError(err).WithField("job", job.Name).Error("grpc_agent: error sending the final execution, falling back to ExecutionDone")
rpcServer, err := as.agent.checkAndSelectServer()
if err != nil {
return err
Expand Down

0 comments on commit b83bde1

Please sign in to comment.