From 6d227d823b7288d7dbe8d2f5db3aba862fdff7e4 Mon Sep 17 00:00:00 2001 From: HarrisChu <1726587+HarrisChu@users.noreply.github.com> Date: Mon, 21 Feb 2022 14:52:38 +0800 Subject: [PATCH 1/2] replace nebula-go with ccore --- client.go | 312 ------------------------------------ go.mod | 2 +- go.sum | 5 +- csv.go => pkg/common/csv.go | 28 ++-- pkg/common/types.go | 46 ++++++ pkg/nebulagraph/client.go | 312 ++++++++++++++++++++++++++++++++++++ register.go | 5 +- types.go | 1 + 8 files changed, 381 insertions(+), 330 deletions(-) delete mode 100644 client.go rename csv.go => pkg/common/csv.go (87%) create mode 100644 pkg/common/types.go create mode 100644 pkg/nebulagraph/client.go create mode 100644 types.go diff --git a/client.go b/client.go deleted file mode 100644 index dea304a..0000000 --- a/client.go +++ /dev/null @@ -1,312 +0,0 @@ -package nebulagraph - -import ( - "crypto/tls" - "fmt" - "strconv" - "strings" - "sync" - "time" - - nebula "github.com/vesoft-inc/nebula-go/v3" -) - -type ( - // NebulaPool nebula connection pool - NebulaPool struct { - HostList []nebula.HostAddress - Pool *nebula.ConnectionPool - Log nebula.Logger - DataChs []chan Data - OutoptCh chan []string - csvStrategy csvReaderStrategy - initialized bool - sessions []*nebula.Session - channelBufferSize int - sslconfig *sslConfig - mutex sync.Mutex - } - - // NebulaSession a wrapper for nebula session, could read data from DataCh - NebulaSession struct { - Session *nebula.Session - Pool *NebulaPool - DataCh chan Data - } - - // Response a wrapper for nebula resultset - Response struct { - *nebula.ResultSet - ResponseTime int32 - } - - csvReaderStrategy int - - sslConfig struct { - rootCAPath string - certPath string - privateKeyPath string - } - - // Data data in csv file - Data []string - - output struct { - timeStamp int64 - nGQL string - latency int64 - responseTime int32 - isSucceed bool - rows int32 - errorMsg string - } -) - -const ( - // AllInOne all the vus use the same DataCh - AllInOne csvReaderStrategy = iota - // Separate each vu has a seprate DataCh - Separate -) - -func formatOutput(o *output) []string { - return []string{ - strconv.FormatInt(o.timeStamp, 10), - o.nGQL, - strconv.Itoa(int(o.latency)), - strconv.Itoa(int(o.responseTime)), - strconv.FormatBool(o.isSucceed), - strconv.Itoa(int(o.rows)), - o.errorMsg, - } -} - -var outputHeader []string = []string{ - "timestamp", - "nGQL", - "latency", - "responseTime", - "isSucceed", - "rows", - "errorMsg", -} - -// New for k6 initialization. -func New() *NebulaPool { - return &NebulaPool{ - Log: nebula.DefaultLogger{}, - initialized: false, - } -} - -// NewSSLConfig return sslConfig -func (np *NebulaPool) NewSSLConfig(rootCAPath, certPath, privateKeyPath string) { - np.sslconfig = &sslConfig{ - rootCAPath: rootCAPath, - certPath: certPath, - privateKeyPath: privateKeyPath, - } -} - -// Init init nebula pool with address and concurrent, by default the buffersize is 20000 -func (np *NebulaPool) Init(address string, concurrent int) (*NebulaPool, error) { - return np.InitWithSize(address, concurrent, 20000) -} - -// InitWithSize init nebula pool with channel buffer size -func (np *NebulaPool) InitWithSize(address string, concurrent int, size int) (*NebulaPool, error) { - np.mutex.Lock() - defer np.mutex.Unlock() - // k6 run in concurrent thread. - if np.initialized { - return np, nil - } - np.Log.Info("begin init the nebula pool") - var ( - sslConfig *tls.Config - err error - pool *nebula.ConnectionPool - ) - - if np.sslconfig != nil { - sslConfig, err = nebula.GetDefaultSSLConfig( - np.sslconfig.rootCAPath, - np.sslconfig.certPath, - np.sslconfig.privateKeyPath, - ) - if err != nil { - return nil, err - } - // skip insecure verification for stress testing. - sslConfig.InsecureSkipVerify = true - } - err = np.initAndVerifyPool(address, concurrent, size) - if err != nil { - return nil, err - } - conf := np.getDefaultConf(concurrent) - if sslConfig != nil { - pool, err = nebula.NewSslConnectionPool(np.HostList, *conf, sslConfig, np.Log) - - } else { - pool, err = nebula.NewConnectionPool(np.HostList, *conf, np.Log) - } - - if err != nil { - return nil, err - } - np.Pool = pool - np.Log.Info("finish init the pool") - np.initialized = true - return np, nil -} - -func (np *NebulaPool) initAndVerifyPool(address string, concurrent int, size int) error { - - addrs := strings.Split(address, ",") - var hosts []nebula.HostAddress - for _, addr := range addrs { - hostPort := strings.Split(addr, ":") - if len(hostPort) != 2 { - return fmt.Errorf("Invalid address: %s", addr) - } - port, err := strconv.Atoi(hostPort[1]) - if err != nil { - return err - } - host := hostPort[0] - hostAddr := nebula.HostAddress{Host: host, Port: port} - hosts = append(hosts, hostAddr) - np.HostList = hosts - } - np.sessions = make([]*nebula.Session, concurrent) - np.channelBufferSize = size - np.OutoptCh = make(chan []string, np.channelBufferSize) - return nil -} - -func (np *NebulaPool) getDefaultConf(concurrent int) *nebula.PoolConfig { - conf := nebula.PoolConfig{ - TimeOut: 0, - IdleTime: 0, - MaxConnPoolSize: concurrent, - MinConnPoolSize: 1, - } - return &conf -} - -// ConfigCsvStrategy set csv reader strategy -func (np *NebulaPool) ConfigCsvStrategy(strategy int) { - np.csvStrategy = csvReaderStrategy(strategy) -} - -// ConfigCSV config the csv file to be read -func (np *NebulaPool) ConfigCSV(path, delimiter string, withHeader bool) error { - for _, dataCh := range np.DataChs { - reader := NewCsvReader(path, delimiter, withHeader, dataCh) - if err := reader.ReadForever(); err != nil { - return err - } - } - return nil -} - -// ConfigOutput config the output file, would write the execution outputs -func (np *NebulaPool) ConfigOutput(path string) error { - writer := NewCsvWriter(path, ",", outputHeader, np.OutoptCh) - if err := writer.WriteForever(); err != nil { - return err - } - return nil -} - -// Close close the nebula pool -func (np *NebulaPool) Close() error { - np.mutex.Lock() - defer np.mutex.Unlock() - if !np.initialized { - return nil - } - np.Log.Info("begin close the nebula pool") - for _, s := range np.sessions { - if s != nil { - s.Release() - } - } - np.initialized = false - return nil -} - -// GetSession get the session from pool -func (np *NebulaPool) GetSession(user, password string) (*NebulaSession, error) { - session, err := np.Pool.GetSession(user, password) - if err != nil { - return nil, err - } - np.mutex.Lock() - defer np.mutex.Unlock() - np.sessions = append(np.sessions, session) - s := &NebulaSession{Session: session, Pool: np} - s.prepareCsvReader() - - return s, nil -} - -func (s *NebulaSession) prepareCsvReader() error { - np := s.Pool - if np.csvStrategy == AllInOne { - if len(np.DataChs) == 0 { - dataCh := make(chan Data, np.channelBufferSize) - np.DataChs = append(np.DataChs, dataCh) - } - s.DataCh = np.DataChs[0] - } else { - dataCh := make(chan Data, np.channelBufferSize) - np.DataChs = append(np.DataChs, dataCh) - s.DataCh = dataCh - } - return nil -} - -// GetData get data from csv reader -func (s *NebulaSession) GetData() (Data, error) { - if s.DataCh != nil && len(s.DataCh) != 0 { - if d, ok := <-s.DataCh; ok { - return d, nil - } - } - return nil, fmt.Errorf("no Data at all") -} - -// Execute execute nebula query -func (s *NebulaSession) Execute(stmt string) (*Response, error) { - start := time.Now() - rs, err := s.Session.Execute(stmt) - // us - responseTime := int32(time.Since(start) / 1000) - if err != nil { - return nil, err - } - - // output - if s.Pool.OutoptCh != nil && len(s.Pool.OutoptCh) != cap(s.Pool.OutoptCh) { - o := &output{ - timeStamp: start.Unix(), - nGQL: stmt, - latency: rs.GetLatency(), - responseTime: responseTime, - isSucceed: rs.IsSucceed(), - rows: int32(rs.GetRowSize()), - errorMsg: rs.GetErrorMsg(), - } - s.Pool.OutoptCh <- formatOutput(o) - - } - - return &Response{ResultSet: rs, ResponseTime: responseTime}, nil -} - -// GetResponseTime GetResponseTime -func (r *Response) GetResponseTime() int32 { - return r.ResponseTime -} diff --git a/go.mod b/go.mod index fcaec7d..2203050 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/vesoft-inc/k6-plugin go 1.16 require ( - github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220119024722-ab348afbb79d + github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220215025312-993ec26095cb go.k6.io/k6 v0.33.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index c0f96f6..a36a011 100644 --- a/go.sum +++ b/go.sum @@ -276,6 +276,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= +github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/serenize/snaker v0.0.0-20171204205717-a683aaf2d516/go.mod h1:Yow6lPLSAXx2ifx470yD/nUe22Dv5vBvxK/UK9UUTVs= github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e h1:zWKUYT07mGmVBH+9UgnHXd/ekCK99C8EbDSAt5qsjXE= @@ -324,8 +325,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY github.com/urfave/negroni v0.3.1-0.20180130044549-22c5532ea862/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v0.0.0-20170224212429-dcecefd839c4/go.mod h1:50wTf68f99/Zt14pr046Tgt3Lp2vLyFZKzbFXTOabXw= -github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220119024722-ab348afbb79d h1:spO7OAtYI1wiqBiJ9417pKhqx0IkqFAFdFQFPm4JIrs= -github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220119024722-ab348afbb79d/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= +github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220215025312-993ec26095cb h1:cX6Ghc0soBHbpPUlFgUnX4saVa3cBJTL2+SwiRrZKu4= +github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220215025312-993ec26095cb/go.mod h1:sFEvE+cY4TgwqWx6H6msOqAUzRhsEHHKaaMgIZENHuQ= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/csv.go b/pkg/common/csv.go similarity index 87% rename from csv.go rename to pkg/common/csv.go index 69fe586..ece8670 100644 --- a/csv.go +++ b/pkg/common/csv.go @@ -1,4 +1,4 @@ -package nebulagraph +package common import ( "encoding/csv" @@ -6,19 +6,21 @@ import ( "os" ) -type CSVReader struct { - Path string - Delimiter string - WithHeader bool - DataCh chan<- Data -} +type ( + CSVReader struct { + Path string + Delimiter string + WithHeader bool + DataCh chan<- Data + } -type CSVWriter struct { - Path string - Header []string - Delimiter string - DataCh <-chan []string -} + CSVWriter struct { + Path string + Header []string + Delimiter string + DataCh <-chan []string + } +) func NewCsvReader(path, delimiter string, withHeader bool, dataCh chan<- Data) *CSVReader { return &CSVReader{ diff --git a/pkg/common/types.go b/pkg/common/types.go new file mode 100644 index 0000000..25d7c47 --- /dev/null +++ b/pkg/common/types.go @@ -0,0 +1,46 @@ +package common + +type ( + // Data data in csv file + Data []string + + // IClient common client + IClient interface { + Open() error + Close() error + } + + // IClientPool common client pool + IClientPool interface { + Close() error + } + + // IGraphClient graph client + IGraphClient interface { + IClient + GetData() (Data, error) + Execute(stat string) (IGraphResponse, error) + } + + // IGraphResponse graph response, just support 3 functions to user. + IGraphResponse interface { + IsSucceed() bool + GetLatency() int64 + GetResponseTime() int32 + } + + // IGraphClientPool graph client pool. + IGraphClientPool interface { + IClientPool + GetSession(username, password string) (IGraphClient, error) + // Init initialize the poop with default channel buffersize + Init(address string, concurrent int) (IGraphClientPool, error) + InitWithSize(address string, concurrent int, size int) (IGraphClientPool, error) + ConfigCSV(path, delimiter string, withHeader bool) error + ConfigOutput(path string) error + // ConfigCsvStrategy, csv strategy + // 0 means all vus have the same csv reader. + // 1 means each vu has a separate csv reader. + ConfigCsvStrategy(strategy int) + } +) diff --git a/pkg/nebulagraph/client.go b/pkg/nebulagraph/client.go new file mode 100644 index 0000000..a4c27e8 --- /dev/null +++ b/pkg/nebulagraph/client.go @@ -0,0 +1,312 @@ +package nebulagraph + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/vesoft-inc/k6-plugin/pkg/common" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula" + nerrors "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/errors" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/types" + "github.com/vesoft-inc/nebula-http-gateway/ccore/nebula/wrapper" +) + +type ( + // GraphPool nebula connection pool + GraphPool struct { + DataChs []chan common.Data + OutoptCh chan []string + Version string + csvStrategy csvReaderStrategy + initialized bool + clients []nebula.GraphClient + channelBufferSize int + hosts []string + mutex sync.Mutex + clientGetter graphClientGetter + } + + graphClientGetter func(endpoint, username, password string) (nebula.GraphClient, error) + + // GraphClient a wrapper for nebula client, could read data from DataCh + GraphClient struct { + Client nebula.GraphClient + Pool *GraphPool + DataCh chan common.Data + username string + password string + } + + // Response a wrapper for nebula resultset + Response struct { + *wrapper.ResultSet + ResponseTime int32 + codeErr nerrors.CodeError + } + + csvReaderStrategy int + + output struct { + timeStamp int64 + nGQL string + latency int64 + responseTime int32 + isSucceed bool + rows int32 + errorMsg string + } +) + +var _ common.IGraphClient = &GraphClient{} +var _ common.IGraphClientPool = &GraphPool{} + +const ( + // AllInOne all the vus use the same DataCh + AllInOne csvReaderStrategy = iota + // Separate each vu has a seprate DataCh + Separate +) + +func formatOutput(o *output) []string { + return []string{ + strconv.FormatInt(o.timeStamp, 10), + o.nGQL, + strconv.Itoa(int(o.latency)), + strconv.Itoa(int(o.responseTime)), + strconv.FormatBool(o.isSucceed), + strconv.Itoa(int(o.rows)), + o.errorMsg, + } +} + +var outputHeader []string = []string{ + "timestamp", + "nGQL", + "latency", + "responseTime", + "isSucceed", + "rows", + "errorMsg", +} + +// NewNebulaGraph New for k6 initialization. +func NewNebulaGraph() *GraphPool { + return &GraphPool{ + clientGetter: func(endpoint string, username, password string) (nebula.GraphClient, error) { + // ccore just use the first host in list + return nebula.NewGraphClient([]string{endpoint}, username, password) + }, + } +} + +// Init init nebula pool with address and concurrent, by default the buffersize is 20000 +func (gp *GraphPool) Init(address string, concurrent int) (common.IGraphClientPool, error) { + return gp.InitWithSize(address, concurrent, 20000) +} + +// InitWithSize init nebula pool with channel buffer size +func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int) (common.IGraphClientPool, error) { + gp.mutex.Lock() + defer gp.mutex.Unlock() + if gp.initialized { + return gp, nil + } + var err error + + err = gp.initAndVerifyPool(address, concurrent, chanSize) + if err != nil { + return nil, err + } + gp.initialized = true + return gp, nil +} + +func (gp *GraphPool) initAndVerifyPool(address string, concurrent int, chanSize int) error { + addrs := strings.Split(address, ",") + for _, addr := range addrs { + hostPort := strings.Split(addr, ":") + if len(hostPort) != 2 { + return fmt.Errorf("Invalid address: %s", addr) + } + _, err := strconv.Atoi(hostPort[1]) + if err != nil { + return err + } + gp.hosts = append(gp.hosts, addr) + } + gp.clients = make([]nebula.GraphClient, 0) + gp.channelBufferSize = chanSize + gp.OutoptCh = make(chan []string, gp.channelBufferSize) + return nil +} + +// ConfigCsvStrategy set csv reader strategy +func (gp *GraphPool) ConfigCsvStrategy(strategy int) { + gp.csvStrategy = csvReaderStrategy(strategy) +} + +// ConfigCSV config the csv file to be read +func (gp *GraphPool) ConfigCSV(path, delimiter string, withHeader bool) error { + for _, dataCh := range gp.DataChs { + reader := common.NewCsvReader(path, delimiter, withHeader, dataCh) + if err := reader.ReadForever(); err != nil { + return err + } + } + return nil +} + +// ConfigOutput config the output file, would write the execution outputs +func (gp *GraphPool) ConfigOutput(path string) error { + writer := common.NewCsvWriter(path, ",", outputHeader, gp.OutoptCh) + if err := writer.WriteForever(); err != nil { + return err + } + return nil +} + +// Close close the nebula pool +func (gp *GraphPool) Close() error { + gp.mutex.Lock() + defer gp.mutex.Unlock() + if !gp.initialized { + return nil + } + // gp.Log.Println("begin close the nebula pool") + for _, s := range gp.clients { + if s != nil { + s.Close() + } + } + gp.initialized = false + return nil +} + +// GetSession get the session from pool +func (gp *GraphPool) GetSession(username, password string) (common.IGraphClient, error) { + gp.mutex.Lock() + defer gp.mutex.Unlock() + // balancer, ccore just use the first endpoint + index := len(gp.clients) % len(gp.hosts) + client, err := gp.clientGetter(gp.hosts[index], username, password) + if gp.Version == "" { + gp.Version = string(client.Version()) + } + if err != nil { + return nil, err + } + err = client.Open() + if err != nil { + return nil, err + } + + gp.clients = append(gp.clients, client) + s := &GraphClient{Client: client, Pool: gp} + s.prepareCsvReader() + + return s, nil +} + +func (gc *GraphClient) Open() error { + return gc.Client.Open() +} +func (gc *GraphClient) Auth() error { + _, err := gc.Client.Authenticate(gc.username, gc.password) + return err +} +func (gc *GraphClient) Close() error { + return gc.Client.Close() +} + +func (gc *GraphClient) prepareCsvReader() error { + np := gc.Pool + + if np.csvStrategy == AllInOne { + if len(np.DataChs) == 0 { + dataCh := make(chan common.Data, np.channelBufferSize) + np.DataChs = append(np.DataChs, dataCh) + } + gc.DataCh = np.DataChs[0] + } else { + dataCh := make(chan common.Data, np.channelBufferSize) + np.DataChs = append(np.DataChs, dataCh) + gc.DataCh = dataCh + } + return nil +} + +// GetData get data from csv reader +func (gc *GraphClient) GetData() (common.Data, error) { + if gc.DataCh != nil && len(gc.DataCh) != 0 { + if d, ok := <-gc.DataCh; ok { + return d, nil + } + } + return nil, fmt.Errorf("no Data at all") +} + +// Execute execute nebula query +func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) { + start := time.Now() + resp, err := gc.Client.Execute([]byte(stmt)) + var ( + codeErr nerrors.CodeError + ok bool + rows int32 + rs *wrapper.ResultSet + latency int64 + ) + if err != nil { + codeErr, ok = nerrors.AsCodeError(err) + if !ok { + return nil, err + } + rows = 0 + latency = 0 + } else { + rs, _ = wrapper.GenResultSet(resp, gc.Client.Factory(), types.TimezoneInfo{}) + rows = int32(rs.GetRowSize()) + latency = resp.GetLatencyInUs() + } + + responseTime := int32(time.Since(start) / 1000) + // output + if gc.Pool.OutoptCh != nil && len(gc.Pool.OutoptCh) != cap(gc.Pool.OutoptCh) { + o := &output{ + timeStamp: start.Unix(), + nGQL: stmt, + latency: latency, + responseTime: responseTime, + isSucceed: codeErr.GetErrorCode() == nerrors.ErrorCode_SUCCEEDED, + rows: rows, + errorMsg: codeErr.GetErrorMsg(), + } + gc.Pool.OutoptCh <- formatOutput(o) + + } + return &Response{ResultSet: rs, ResponseTime: responseTime, codeErr: codeErr}, nil +} + +// GetResponseTime GetResponseTime +func (r *Response) GetResponseTime() int32 { + return r.ResponseTime +} + +// IsSucceed IsSucceed +func (r *Response) IsSucceed() bool { + if r.codeErr != nil && r.codeErr.GetErrorCode() != nerrors.ErrorCode_SUCCEEDED { + return false + } + return true +} + +// GetLatency GetLatency +func (r *Response) GetLatency() int64 { + if r.ResultSet != nil { + return r.ResultSet.GetLatency() + } + return 0 +} diff --git a/register.go b/register.go index 486464d..b86ddec 100644 --- a/register.go +++ b/register.go @@ -1,9 +1,10 @@ -package nebulagraph +package k6plugin import ( + "github.com/vesoft-inc/k6-plugin/pkg/nebulagraph" "go.k6.io/k6/js/modules" ) func init() { - modules.Register("k6/x/nebulagraph", New()) + modules.Register("k6/x/nebulagraph", nebulagraph.NewNebulaGraph()) } diff --git a/types.go b/types.go new file mode 100644 index 0000000..eaf754a --- /dev/null +++ b/types.go @@ -0,0 +1 @@ +package k6plugin From 29e4446eaf27e035477ff680d0a94d3ca7057fd4 Mon Sep 17 00:00:00 2001 From: HarrisChu <1726587+HarrisChu@users.noreply.github.com> Date: Mon, 21 Feb 2022 17:07:31 +0800 Subject: [PATCH 2/2] fix doc --- pkg/nebulagraph/client.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/nebulagraph/client.go b/pkg/nebulagraph/client.go index a4c27e8..bfa73b3 100644 --- a/pkg/nebulagraph/client.go +++ b/pkg/nebulagraph/client.go @@ -102,12 +102,12 @@ func NewNebulaGraph() *GraphPool { } } -// Init init nebula pool with address and concurrent, by default the buffersize is 20000 +// Init initializes nebula pool with address and concurrent, by default the buffersize is 20000 func (gp *GraphPool) Init(address string, concurrent int) (common.IGraphClientPool, error) { return gp.InitWithSize(address, concurrent, 20000) } -// InitWithSize init nebula pool with channel buffer size +// InitWithSize initializes nebula pool with channel buffer size func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int) (common.IGraphClientPool, error) { gp.mutex.Lock() defer gp.mutex.Unlock() @@ -143,12 +143,12 @@ func (gp *GraphPool) initAndVerifyPool(address string, concurrent int, chanSize return nil } -// ConfigCsvStrategy set csv reader strategy +// ConfigCsvStrategy sets csv reader strategy func (gp *GraphPool) ConfigCsvStrategy(strategy int) { gp.csvStrategy = csvReaderStrategy(strategy) } -// ConfigCSV config the csv file to be read +// ConfigCSV makes the read csv file configuration func (gp *GraphPool) ConfigCSV(path, delimiter string, withHeader bool) error { for _, dataCh := range gp.DataChs { reader := common.NewCsvReader(path, delimiter, withHeader, dataCh) @@ -159,7 +159,7 @@ func (gp *GraphPool) ConfigCSV(path, delimiter string, withHeader bool) error { return nil } -// ConfigOutput config the output file, would write the execution outputs +// ConfigOutput makes the output file configuration, would write the execution outputs func (gp *GraphPool) ConfigOutput(path string) error { writer := common.NewCsvWriter(path, ",", outputHeader, gp.OutoptCh) if err := writer.WriteForever(); err != nil { @@ -168,7 +168,7 @@ func (gp *GraphPool) ConfigOutput(path string) error { return nil } -// Close close the nebula pool +// Close closes the nebula pool func (gp *GraphPool) Close() error { gp.mutex.Lock() defer gp.mutex.Unlock() @@ -185,7 +185,7 @@ func (gp *GraphPool) Close() error { return nil } -// GetSession get the session from pool +// GetSession gets the session from pool func (gp *GraphPool) GetSession(username, password string) (common.IGraphClient, error) { gp.mutex.Lock() defer gp.mutex.Unlock() @@ -248,7 +248,7 @@ func (gc *GraphClient) GetData() (common.Data, error) { return nil, fmt.Errorf("no Data at all") } -// Execute execute nebula query +// Execute executes nebula query func (gc *GraphClient) Execute(stmt string) (common.IGraphResponse, error) { start := time.Now() resp, err := gc.Client.Execute([]byte(stmt))