Skip to content

Commit

Permalink
Merge pull request #5 from pengweisong/add-service
Browse files Browse the repository at this point in the history
add ExistDir & ServiceStatus rpc
  • Loading branch information
critical27 committed Feb 8, 2022
2 parents 0c060ef + ef7bbee commit ca17311
Show file tree
Hide file tree
Showing 11 changed files with 1,048 additions and 478 deletions.
37 changes: 34 additions & 3 deletions internal/clients/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clients
import (
"fmt"
"os/exec"
"strings"

log "github.com/sirupsen/logrus"
pb "github.com/vesoft-inc/nebula-agent/pkg/proto"
Expand All @@ -11,7 +12,6 @@ import (
type ServiceName string

const (
ServiceName_All ServiceName = "all"
ServiceName_Metad ServiceName = "metad"
ServiceName_Storaged ServiceName = "storaged"
ServiceName_Graphd ServiceName = "graphd"
Expand All @@ -20,8 +20,6 @@ const (

func toName(r pb.ServiceRole) ServiceName {
switch r {
case pb.ServiceRole_ALL:
return ServiceName_All
case pb.ServiceRole_META:
return ServiceName_Metad
case pb.ServiceRole_STORAGE:
Expand Down Expand Up @@ -55,6 +53,13 @@ func FromStopReq(req *pb.StopServiceRequest) *Service {
}
}

func FromStatusReq(req *pb.ServiceStatusRequest) *Service {
return &Service{
name: toName(req.GetRole()),
dir: req.GetDir(),
}
}

// ServiceDaemon will start/stop metad/storaged/graphd in the service machine
// through scripts providing by the nebula
type ServiceDaemon struct {
Expand Down Expand Up @@ -98,3 +103,29 @@ func (d *ServiceDaemon) Stop() error {
}
return nil
}

func (d *ServiceDaemon) Status() (pb.Status, error) {
cmdStr := fmt.Sprintf("cd %s && scripts/nebula.service status %s", d.s.dir, d.s.name)
log.WithField("cmd", cmdStr).Debug("Try to get service's status")
cmd := exec.Command("bash", "-c", cmdStr)
outByte, err := cmd.Output()
if err != nil {
log.WithError(err).Errorf("Get status of service %s failed", d.s.name)
return pb.Status_UNKNOWN_STATUS, err
}

// Note: depend on the nebula scripts output now.
outStr := string(outByte)

// an example: [INFO] nebula-graphd(46b2aac66): Exited
if strings.Contains(outStr, "Exit") {
return pb.Status_EXITED, nil
}

// an example: [INFO] nebula-metad(46b2aac66): Running as 25859, Listening on 29559
if strings.Contains(outStr, "Run") {
return pb.Status_RUNNING, nil
}

return pb.Status_UNKNOWN_STATUS, fmt.Errorf("unrecognized output: '%s'", outStr)
}
4 changes: 2 additions & 2 deletions internal/clients/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ func TestDaemon(t *testing.T) {
assert := assert.New(t)
rootDir := "/tmp/nebula-install"
startReq := &pb.StartServiceRequest{
Role: pb.ServiceRole_ALL,
Role: pb.ServiceRole_STORAGE,
Dir: rootDir,
}
s := FromStartReq(startReq)
assert.Equal(s.name, ServiceName_All)
assert.Equal(s.name, ServiceName_Storaged)
assert.Equal(s.dir, rootDir)
d, err := NewDaemon(s)
assert.Nil(err)
Expand Down
19 changes: 19 additions & 0 deletions internal/server/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,25 @@ func (a *AgentServer) StopService(ctx context.Context, req *pb.StopServiceReques
return resp, d.Stop()
}

// ServiceStatus return the status(exit or running) of metad/storaged/graphd/all service in agent machine
func (a *AgentServer) ServiceStatus(ctx context.Context, req *pb.ServiceStatusRequest) (*pb.ServiceStatusResponse, error) {
resp := &pb.ServiceStatusResponse{
Status: pb.Status_UNKNOWN_STATUS,
}

d, err := clients.NewDaemon(clients.FromStatusReq(req))
if err != nil {
return resp, fmt.Errorf("create service daemon failed when get service status: %w", err)
}

resp.Status, err = d.Status()
if err != nil {
return resp, fmt.Errorf("get %s status by daemon failed: %w", req.Role, err)
}

return resp, nil
}

// TODO(spw): should call graphd's corresponding interface
func (a *AgentServer) BanReadWrite(context.Context, *pb.BanReadWriteRequest) (*pb.BanReadWriteResponse, error) {
return nil, nil
Expand Down
27 changes: 26 additions & 1 deletion internal/server/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,15 @@ func (ss *StorageServer) MoveDir(ctx context.Context, req *pb.MoveDirRequest) (*
log.WithField("src", req.GetSrcPath()).WithField("dst", req.GetDstPath()).Debug("Rename dir")
res := &pb.MoveDirResponse{}

err := os.Rename(req.GetSrcPath(), req.GetDstPath())
_, err := os.Stat(req.GetSrcPath())
if err != nil {
if os.IsNotExist(err) {
return res, fmt.Errorf("%s does not exist", req.GetSrcPath())
}
return res, fmt.Errorf("get %s status failed: %w", req.GetSrcPath(), err)
}

err = os.Rename(req.GetSrcPath(), req.GetDstPath())
if err != nil {
return res, err
}
Expand All @@ -120,3 +128,20 @@ func (ss *StorageServer) RemoveDir(ctx context.Context, req *pb.RemoveDirRequest

return res, nil
}

// ExistDir check if file/dir in agent machine
func (ss *StorageServer) ExistDir(ctx context.Context, req *pb.ExistDirRequest) (*pb.ExistDirResponse, error) {
log.WithField("path", req.GetPath()).Debug("Check if dir exist")
res := &pb.ExistDirResponse{Exist: false}

_, err := os.Stat(req.GetPath())
if err == nil {
res.Exist = true
return res, nil
}
if os.IsNotExist(err) {
return res, nil
}

return res, err
}
22 changes: 22 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ type Client interface {
DownloadFile(req *pb.DownloadFileRequest) (*pb.DownloadFileResponse, error)
StartService(req *pb.StartServiceRequest) (*pb.StartServiceResponse, error)
StopService(req *pb.StopServiceRequest) (*pb.StopServiceResponse, error)
ServiceStatus(req *pb.ServiceStatusRequest) (*pb.ServiceStatusResponse, error)
BanReadWrite(req *pb.BanReadWriteRequest) (*pb.BanReadWriteResponse, error)
AllowReadWrite(req *pb.AllowReadWriteRequest) (*pb.AllowReadWriteResponse, error)
MoveDir(req *pb.MoveDirRequest) (*pb.MoveDirResponse, error)
RemoveDir(req *pb.RemoveDirRequest) (*pb.RemoveDirResponse, error)
ExistDir(req *pb.ExistDirRequest) (*pb.ExistDirResponse, error)
}

func genSessionId() string {
Expand Down Expand Up @@ -102,6 +104,16 @@ func (c *client) RemoveDir(req *pb.RemoveDirRequest) (resp *pb.RemoveDirResponse
return c.storage.RemoveDir(c.ctx, req)
}

func (c *client) ExistDir(req *pb.ExistDirRequest) (resp *pb.ExistDirResponse, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("agent, check dir exist failed: %w", err)
}
}()

return c.storage.ExistDir(c.ctx, req)
}

func (c *client) StartService(req *pb.StartServiceRequest) (resp *pb.StartServiceResponse, err error) {
defer func() {
if err != nil {
Expand All @@ -122,6 +134,16 @@ func (c *client) StopService(req *pb.StopServiceRequest) (resp *pb.StopServiceRe
return c.agent.StopService(c.ctx, req)
}

func (c *client) ServiceStatus(req *pb.ServiceStatusRequest) (resp *pb.ServiceStatusResponse, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("agent, get service status failed: %w", err)
}
}()

return c.agent.ServiceStatus(c.ctx, req)
}

func (c *client) BanReadWrite(req *pb.BanReadWriteRequest) (resp *pb.BanReadWriteResponse, err error) {
defer func() {
if err != nil {
Expand Down
Loading

0 comments on commit ca17311

Please sign in to comment.