Skip to content

Commit

Permalink
enhance logger (#217)
Browse files Browse the repository at this point in the history
* enhance logger

* fix

Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com>
  • Loading branch information
HarrisChu and Aiee committed Oct 28, 2022
1 parent b0d32fa commit 3cd1335
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 98 deletions.
2 changes: 1 addition & 1 deletion pkg/client/clientmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewNebulaClientMgr(settings *config.NebulaClientSettings, statsCh chan<- ba
mgr.pool = pool
}

mgr.runnerLogger.Infof("Create %d Nebula Graph clients", mgr.GetNumConnections())
logger.Log.Infof("Create %d Nebula Graph clients", mgr.GetNumConnections())

return &mgr, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *ClientPool) Close() {
if p.preStop != nil && p.preStop.Commands != nil {
if i := p.getActiveConnIdx(); i != -1 {
if err := p.exec(i, *p.preStop.Commands); err != nil {
p.runnerLogger.Errorf("%s", err.Error())
logger.Log.Errorf("%s", err.Error())
}
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (p *ClientPool) Init() error {
func (p *ClientPool) startWorker(i int) {
stmt := fmt.Sprintf("USE `%s`;", p.space)
if err := p.exec(i, stmt); err != nil {
p.runnerLogger.Error(err.Error())
logger.Log.Error(err.Error())
return
}
for {
Expand Down
103 changes: 52 additions & 51 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func isSupportedVersion(ver string) bool {
return false
}

func Parse(filename string, runnerLogger *logger.RunnerLogger) (*YAMLConfig, error) {
func Parse(filename string, runnerLogger logger.Logger) (*YAMLConfig, error) {
logger.SetLogger(runnerLogger)
content, err := ioutil.ReadFile(filename)
if err != nil {
return nil, ierrors.Wrap(ierrors.InvalidConfigPathOrFormat, err)
Expand Down Expand Up @@ -158,32 +159,32 @@ func Parse(filename string, runnerLogger *logger.RunnerLogger) (*YAMLConfig, err
}
}

if err = conf.ValidateAndReset(path, runnerLogger); err != nil {
if err = conf.ValidateAndReset(path); err != nil {
return nil, ierrors.Wrap(ierrors.ConfigError, err)
}

return &conf, nil
}

func (config *YAMLConfig) ValidateAndReset(dir string, runnerLogger *logger.RunnerLogger) error {
func (config *YAMLConfig) ValidateAndReset(dir string) error {
if config.NebulaClientSettings == nil {
return errors.New("please configure clientSettings")
}
if err := config.NebulaClientSettings.validateAndReset("clientSettings", runnerLogger); err != nil {
if err := config.NebulaClientSettings.validateAndReset("clientSettings"); err != nil {
return err
}

if config.RemoveTempFiles == nil {
removeTempFiles := false
config.RemoveTempFiles = &removeTempFiles
runnerLogger.Warnf("You have not configured whether to remove generated temporary files, reset to default value. removeTempFiles: %v",
logger.Log.Warnf("You have not configured whether to remove generated temporary files, reset to default value. removeTempFiles: %v",
*config.RemoveTempFiles)
}

if config.LogPath == nil {
defaultPath := filepath.Join(os.TempDir(), fmt.Sprintf("nebula-importer-%d.log", time.Now().UnixNano()))
config.LogPath = &defaultPath
runnerLogger.Warnf("You have not configured the log file path in: logPath, reset to default path: %s", *config.LogPath)
logger.Log.Warnf("You have not configured the log file path in: logPath, reset to default path: %s", *config.LogPath)
}
if !filepath.IsAbs(*config.LogPath) {
absPath := filepath.Join(dir, *config.LogPath)
Expand All @@ -196,26 +197,26 @@ func (config *YAMLConfig) ValidateAndReset(dir string, runnerLogger *logger.Runn

//TODO(yuyu): check each item in config.Files
// if item is a directory, iter this directory and replace this directory config section by filename config section
if err := config.expandDirectoryToFiles(dir, runnerLogger); err != nil {
runnerLogger.Errorf("%s", err)
if err := config.expandDirectoryToFiles(dir); err != nil {
logger.Log.Errorf("%s", err)
return err
}
for i := range config.Files {
if err := config.Files[i].validateAndReset(dir, fmt.Sprintf("files[%d]", i), runnerLogger); err != nil {
if err := config.Files[i].validateAndReset(dir, fmt.Sprintf("files[%d]", i)); err != nil {
return err
}
}

return nil
}

func (config *YAMLConfig) expandDirectoryToFiles(dir string, runnerLogger *logger.RunnerLogger) (err error) {
func (config *YAMLConfig) expandDirectoryToFiles(dir string) (err error) {
var newFiles []*File

for _, file := range config.Files {
err, files := file.expandFiles(dir, runnerLogger)
err, files := file.expandFiles(dir)
if err != nil {
runnerLogger.Errorf("error when expand file: %s", err)
logger.Log.Errorf("error when expand file: %s", err)
return err
}
for _, f := range files {
Expand All @@ -241,33 +242,33 @@ func (n *NebulaPostStart) validateAndReset(prefix string) error {
return nil
}

func (n *NebulaClientSettings) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (n *NebulaClientSettings) validateAndReset(prefix string) error {
if n.Space == nil {
return fmt.Errorf("Please configure the space name in: %s.space", prefix)
}

if n.Retry == nil {
retry := 1
n.Retry = &retry
runnerLogger.Warnf("Invalid retry option in %s.retry, reset to %d ", prefix, *n.Retry)
logger.Log.Warnf("Invalid retry option in %s.retry, reset to %d ", prefix, *n.Retry)
}

if n.Concurrency == nil {
d := 10
n.Concurrency = &d
runnerLogger.Warnf("Invalid client concurrency in %s.concurrency, reset to %d", prefix, *n.Concurrency)
logger.Log.Warnf("Invalid client concurrency in %s.concurrency, reset to %d", prefix, *n.Concurrency)
}

if n.ChannelBufferSize == nil {
d := 128
n.ChannelBufferSize = &d
runnerLogger.Warnf("Invalid client channel buffer size in %s.channelBufferSize, reset to %d", prefix, *n.ChannelBufferSize)
logger.Log.Warnf("Invalid client channel buffer size in %s.channelBufferSize, reset to %d", prefix, *n.ChannelBufferSize)
}

if n.Connection == nil {
return fmt.Errorf("Please configure the connection information in: %s.connection", prefix)
}
if err := n.Connection.validateAndReset(fmt.Sprintf("%s.connection", prefix), runnerLogger); err != nil {
if err := n.Connection.validateAndReset(fmt.Sprintf("%s.connection", prefix)); err != nil {
return err
}

Expand All @@ -277,20 +278,20 @@ func (n *NebulaClientSettings) validateAndReset(prefix string, runnerLogger *log
return nil
}

func (c *NebulaClientConnection) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (c *NebulaClientConnection) validateAndReset(prefix string) error {
if c.Address == nil {
c.Address = &kDefaultConnAddr
runnerLogger.Warnf("%s.address: %s", prefix, *c.Address)
logger.Log.Warnf("%s.address: %s", prefix, *c.Address)
}

if c.User == nil {
c.User = &kDefaultUser
runnerLogger.Warnf("%s.user: %s", prefix, *c.User)
logger.Log.Warnf("%s.user: %s", prefix, *c.User)
}

if c.Password == nil {
c.Password = &kDefaultPassword
runnerLogger.Warnf("%s.password: %s", prefix, *c.Password)
logger.Log.Warnf("%s.password: %s", prefix, *c.Password)
}
return nil
}
Expand All @@ -299,7 +300,7 @@ func (f *File) IsInOrder() bool {
return (f.InOrder != nil && *f.InOrder) || (f.CSV != nil && f.CSV.WithLabel != nil && *f.CSV.WithLabel)
}

func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerLogger) error {
func (f *File) validateAndReset(dir, prefix string) error {
if f.Path == nil {
return fmt.Errorf("Please configure file path in: %s.path", prefix)
}
Expand All @@ -316,7 +317,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL
if f.FailDataPath == nil {
failDataPath := filepath.Join(os.TempDir(), fmt.Sprintf("nebula-importer-err-data-%d", time.Now().UnixNano()))
f.FailDataPath = &failDataPath
runnerLogger.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to tmp path: %s",
logger.Log.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to tmp path: %s",
prefix, *f.FailDataPath)
}
} else {
Expand All @@ -331,7 +332,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL
if f.FailDataPath == nil {
p := filepath.Join(filepath.Dir(*f.Path), "err", filepath.Base(*f.Path))
f.FailDataPath = &p
runnerLogger.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to default path: %s",
logger.Log.Warnf("You have not configured the failed data output file path in: %s.failDataPath, reset to default path: %s",
prefix, *f.FailDataPath)
} else {
if !filepath.IsAbs(*f.FailDataPath) {
Expand All @@ -343,7 +344,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL

if f.BatchSize == nil {
f.BatchSize = &kDefaultBatchSize
runnerLogger.Infof("Invalid batch size in file(%s), reset to %d", *f.Path, *f.BatchSize)
logger.Log.Infof("Invalid batch size in file(%s), reset to %d", *f.Path, *f.BatchSize)
}

if f.InOrder == nil {
Expand All @@ -357,7 +358,7 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL
}

if f.CSV != nil {
err := f.CSV.validateAndReset(fmt.Sprintf("%s.csv", prefix), runnerLogger)
err := f.CSV.validateAndReset(fmt.Sprintf("%s.csv", prefix))
if err != nil {
return err
}
Expand All @@ -366,10 +367,10 @@ func (f *File) validateAndReset(dir, prefix string, runnerLogger *logger.RunnerL
if f.Schema == nil {
return fmt.Errorf("Please configure file schema: %s.schema", prefix)
}
return f.Schema.validateAndReset(fmt.Sprintf("%s.schema", prefix), runnerLogger)
return f.Schema.validateAndReset(fmt.Sprintf("%s.schema", prefix))
}

func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err error, files []*File) {
func (f *File) expandFiles(dir string) (err error, files []*File) {
if base.HasHttpPrefix(*f.Path) {
files = append(files, f)
} else {
Expand All @@ -380,7 +381,7 @@ func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err e

fileNames, err := filepath.Glob(*f.Path)
if err != nil || len(fileNames) == 0 {
runnerLogger.Errorf("error file path: %s", *f.Path)
logger.Log.Errorf("error file path: %s", *f.Path)
return err, files
}

Expand All @@ -390,30 +391,30 @@ func (f *File) expandFiles(dir string, runnerLogger *logger.RunnerLogger) (err e
base := filepath.Base(fileNames[i])
tmp := filepath.Join(*f.FailDataPath, base)
failedDataPath = &tmp
runnerLogger.Infof("Failed data path: %v", *failedDataPath)
logger.Log.Infof("Failed data path: %v", *failedDataPath)
}
eachConf := *f
eachConf.Path = &fileNames[i]
eachConf.FailDataPath = failedDataPath
files = append(files, &eachConf)
runnerLogger.Infof("find file: %v", *eachConf.Path)
logger.Log.Infof("find file: %v", *eachConf.Path)
}
}

return err, files
}

func (c *CSVConfig) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (c *CSVConfig) validateAndReset(prefix string) error {
if c.WithHeader == nil {
h := false
c.WithHeader = &h
runnerLogger.Infof("%s.withHeader: %v", prefix, false)
logger.Log.Infof("%s.withHeader: %v", prefix, false)
}

if c.WithLabel == nil {
l := false
c.WithLabel = &l
runnerLogger.Infof("%s.withLabel: %v", prefix, false)
logger.Log.Infof("%s.withLabel: %v", prefix, false)
}

if c.Delimiter != nil {
Expand Down Expand Up @@ -457,20 +458,20 @@ func (s *Schema) CollectEmptyPropsTagNames() []string {
return tagNames
}

func (s *Schema) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (s *Schema) validateAndReset(prefix string) error {
var err error = nil
switch strings.ToLower(*s.Type) {
case "edge":
if s.Edge != nil {
err = s.Edge.validateAndReset(fmt.Sprintf("%s.edge", prefix), runnerLogger)
err = s.Edge.validateAndReset(fmt.Sprintf("%s.edge", prefix))
} else {
runnerLogger.Infof("%s.edge is nil", prefix)
logger.Log.Infof("%s.edge is nil", prefix)
}
case "vertex":
if s.Vertex != nil {
err = s.Vertex.validateAndReset(fmt.Sprintf("%s.vertex", prefix), runnerLogger)
err = s.Vertex.validateAndReset(fmt.Sprintf("%s.vertex", prefix))
} else {
runnerLogger.Infof("%s.vertex is nil", prefix)
logger.Log.Infof("%s.vertex is nil", prefix)
}
default:
err = fmt.Errorf("Error schema type(%s) in %s.type only edge and vertex are supported", *s.Type, prefix)
Expand Down Expand Up @@ -553,7 +554,7 @@ func (v *VID) checkFunction(prefix string) error {
return nil
}

func (v *VID) validateAndReset(prefix string, defaultVal int, runnerLogger *logger.RunnerLogger) error {
func (v *VID) validateAndReset(prefix string, defaultVal int) error {
if v.Index == nil {
v.Index = &defaultVal
}
Expand All @@ -570,7 +571,7 @@ func (v *VID) validateAndReset(prefix string, defaultVal int, runnerLogger *logg
}
} else {
v.Type = &kDefaultVidType
runnerLogger.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type)
logger.Log.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type)
}
return nil
}
Expand Down Expand Up @@ -673,20 +674,20 @@ func (e *Edge) String() string {
return strings.Join(cells, ",")
}

func (e *Edge) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (e *Edge) validateAndReset(prefix string) error {
if e.Name == nil {
return fmt.Errorf("Please configure edge name in: %s.name", prefix)
}
if e.SrcVID != nil {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0, runnerLogger); err != nil {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil {
return err
}
} else {
index := 0
e.SrcVID = &VID{Index: &index, Type: &kDefaultVidType}
}
if e.DstVID != nil {
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1, runnerLogger); err != nil {
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil {
return err
}
} else {
Expand All @@ -712,7 +713,7 @@ func (e *Edge) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger
return err
}
} else {
runnerLogger.Errorf("prop %d of edge %s is nil", i, *e.Name)
logger.Log.Errorf("prop %d of edge %s is nil", i, *e.Name)
}
}
return nil
Expand Down Expand Up @@ -775,12 +776,12 @@ func (v *Vertex) String() string {
return strings.Join(cells, ",")
}

func (v *Vertex) validateAndReset(prefix string, runnerLogger *logger.RunnerLogger) error {
func (v *Vertex) validateAndReset(prefix string) error {
// if v.Tags == nil {
// return fmt.Errorf("Please configure %.tags", prefix)
// }
if v.VID != nil {
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0, runnerLogger); err != nil {
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil {
return err
}
} else {
Expand All @@ -790,12 +791,12 @@ func (v *Vertex) validateAndReset(prefix string, runnerLogger *logger.RunnerLogg
j := 1
for i := range v.Tags {
if v.Tags[i] != nil {
if err := v.Tags[i].validateAndReset(fmt.Sprintf("%s.tags[%d]", prefix, i), j, runnerLogger); err != nil {
if err := v.Tags[i].validateAndReset(fmt.Sprintf("%s.tags[%d]", prefix, i), j); err != nil {
return err
}
j = j + len(v.Tags[i].Props)
} else {
runnerLogger.Errorf("tag %d is nil", i)
logger.Log.Errorf("tag %d is nil", i)
}
}
return nil
Expand Down Expand Up @@ -870,7 +871,7 @@ func (t *Tag) FormatValues(record base.Record) (string, bool, error) {
return strings.Join(cells, ","), noProps, nil
}

func (t *Tag) validateAndReset(prefix string, start int, runnerLogger *logger.RunnerLogger) error {
func (t *Tag) validateAndReset(prefix string, start int) error {
if t.Name == nil {
return fmt.Errorf("Please configure the vertex tag name in: %s.name", prefix)
}
Expand All @@ -881,7 +882,7 @@ func (t *Tag) validateAndReset(prefix string, start int, runnerLogger *logger.Ru
return err
}
} else {
runnerLogger.Errorf("prop %d of tag %s is nil", i, *t.Name)
logger.Log.Errorf("prop %d of tag %s is nil", i, *t.Name)
}
}
return nil
Expand Down
Loading

0 comments on commit 3cd1335

Please sign in to comment.