Skip to content

Commit

Permalink
Replace deprecated protobuf package
Browse files Browse the repository at this point in the history
  • Loading branch information
vcastellm committed Mar 31, 2024
1 parent fbac360 commit ede6770
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 76 deletions.
4 changes: 2 additions & 2 deletions builtin/bins/dkron-processor-files/files_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/distribworks/dkron/v4/plugin"
"github.com/distribworks/dkron/v4/types"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestProcess(t *testing.T) {
now := ptypes.TimestampNow()
now := timestamppb.Now()

pa := &plugin.ProcessorArgs{
Execution: types.Execution{
Expand Down
4 changes: 2 additions & 2 deletions builtin/bins/dkron-processor-log/log_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (

"github.com/distribworks/dkron/v4/plugin"
"github.com/distribworks/dkron/v4/types"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestProcess(t *testing.T) {
now := ptypes.TimestampNow()
now := timestamppb.Now()

pa := &plugin.ProcessorArgs{
Execution: types.Execution{
Expand Down
2 changes: 1 addition & 1 deletion builtin/bins/dkron-processor-syslog/syslog_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

"github.com/distribworks/dkron/v4/plugin"
"github.com/distribworks/dkron/v4/types"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/ptypes"
)

func TestProcess(t *testing.T) {
Expand Down
70 changes: 35 additions & 35 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (

metrics "github.com/armon/go-metrics"
"github.com/distribworks/dkron/v4/plugin"
proto "github.com/distribworks/dkron/v4/types"
"github.com/golang/protobuf/ptypes/empty"
"github.com/distribworks/dkron/v4/types"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
)

var (
Expand All @@ -35,13 +35,13 @@ var (

// DkronGRPCServer defines the basics that a gRPC server should implement.
type DkronGRPCServer interface {
proto.DkronServer
types.DkronServer
Serve(net.Listener) error
}

// GRPCServer is the local implementation of the gRPC server interface.
type GRPCServer struct {
proto.DkronServer
types.DkronServer
agent *Agent
logger *logrus.Entry
}
Expand All @@ -57,10 +57,10 @@ func NewGRPCServer(agent *Agent, logger *logrus.Entry) DkronGRPCServer {
// Serve creates and start a new gRPC dkron server
func (grpcs *GRPCServer) Serve(lis net.Listener) error {
grpcServer := grpc.NewServer()
proto.RegisterDkronServer(grpcServer, grpcs)
types.RegisterDkronServer(grpcServer, grpcs)

as := NewAgentServer(grpcs.agent, grpcs.logger)
proto.RegisterAgentServer(grpcServer, as)
types.RegisterAgentServer(grpcServer, as)
go grpcServer.Serve(lis)

return nil
Expand All @@ -70,7 +70,7 @@ func (grpcs *GRPCServer) Serve(lis net.Listener) error {
func Encode(t MessageType, msg interface{}) ([]byte, error) {
var buf bytes.Buffer
buf.WriteByte(uint8(t))
m, err := pb.Marshal(msg.(pb.Message))
m, err := proto.Marshal(msg.(proto.Message))
if err != nil {
return nil, err
}
Expand All @@ -81,7 +81,7 @@ func Encode(t MessageType, msg interface{}) ([]byte, error) {
// SetJob broadcast a state change to the cluster members that will store the job.
// Then restart the scheduler
// This only works on the leader
func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequest) (*proto.SetJobResponse, error) {
func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *types.SetJobRequest) (*types.SetJobResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "set_job"}, time.Now())
grpcs.logger.WithFields(logrus.Fields{
"job": setJobReq.Job.Name,
Expand All @@ -98,12 +98,12 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ
return nil, err
}

return &proto.SetJobResponse{}, nil
return &types.SetJobResponse{}, nil
}

// DeleteJob broadcast a state change to the cluster members that will delete the job.
// This only works on the leader
func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJobRequest) (*proto.DeleteJobResponse, error) {
func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *types.DeleteJobRequest) (*types.DeleteJobResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "delete_job"}, time.Now())
grpcs.logger.WithField("job", delJobReq.GetJobName()).Debug("grpc: Received DeleteJob")

Expand All @@ -128,11 +128,11 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ
grpcs.logger.WithField("job", job.Name).Info("grpc: Done deleting ephemeral job")
}

return &proto.DeleteJobResponse{Job: jpb}, nil
return &types.DeleteJobResponse{Job: jpb}, nil
}

// GetJob loads the job from the datastore
func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequest) (*proto.GetJobResponse, error) {
func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *types.GetJobRequest) (*types.GetJobResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "get_job"}, time.Now())
grpcs.logger.WithField("job", getJobReq.JobName).Debug("grpc: Received GetJob")

Expand All @@ -141,8 +141,8 @@ func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequ
return nil, err
}

gjr := &proto.GetJobResponse{
Job: &proto.Job{},
gjr := &types.GetJobResponse{
Job: &types.Job{},
}

// Copy the data structure
Expand All @@ -154,7 +154,7 @@ func (grpcs *GRPCServer) GetJob(ctx context.Context, getJobReq *proto.GetJobRequ
}

// ExecutionDone saves the execution to the store
func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.ExecutionDoneRequest) (*proto.ExecutionDoneResponse, error) {
func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *types.ExecutionDoneRequest) (*types.ExecutionDoneResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "execution_done"}, time.Now())
grpcs.logger.WithFields(logrus.Fields{
"group": execDoneReq.Execution.Group,
Expand Down Expand Up @@ -230,7 +230,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
return nil, err
}

return &proto.ExecutionDoneResponse{
return &types.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("retry"),
}, nil
Expand Down Expand Up @@ -266,45 +266,45 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
}

if job.Ephemeral && job.Status == StatusSuccess {
if _, err := grpcs.DeleteJob(ctx, &proto.DeleteJobRequest{JobName: job.Name}); err != nil {
if _, err := grpcs.DeleteJob(ctx, &types.DeleteJobRequest{JobName: job.Name}); err != nil {
return nil, err
}
return &proto.ExecutionDoneResponse{
return &types.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("deleted"),
}, nil
}

return &proto.ExecutionDoneResponse{
return &types.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("saved"),
}, nil
}

// Leave calls the Stop method, stopping everything in the server
func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Empty, error) {
func (grpcs *GRPCServer) Leave(ctx context.Context, in *emptypb.Empty) (*emptypb.Empty, error) {
return in, grpcs.agent.Stop()
}

// RunJob runs a job in the cluster
func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error) {
func (grpcs *GRPCServer) RunJob(ctx context.Context, req *types.RunJobRequest) (*types.RunJobResponse, error) {
ex := NewExecution(req.JobName)
job, err := grpcs.agent.Run(req.JobName, ex)
if err != nil {
return nil, err
}
jpb := job.ToProto()

return &proto.RunJobResponse{Job: jpb}, nil
return &types.RunJobResponse{Job: jpb}, nil
}

// ToggleJob toggle the enablement of a job
func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *proto.ToggleJobRequest) (*proto.ToggleJobResponse, error) {
func (grpcs *GRPCServer) ToggleJob(ctx context.Context, getJobReq *types.ToggleJobRequest) (*types.ToggleJobResponse, error) {
return nil, nil
}

// RaftGetConfiguration get raft config
func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Empty) (*proto.RaftGetConfigurationResponse, error) {
func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *emptypb.Empty) (*types.RaftGetConfigurationResponse, error) {
// We can't fetch the leader and the configuration atomically with
// the current Raft API.
future := grpcs.agent.raft.GetConfiguration()
Expand All @@ -326,7 +326,7 @@ func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Emp

// Fill out the reply.
leader := grpcs.agent.raft.Leader()
reply := &proto.RaftGetConfigurationResponse{}
reply := &types.RaftGetConfigurationResponse{}
reply.Index = future.Index()
for _, server := range future.Configuration().Servers {
node := "(unknown)"
Expand All @@ -338,7 +338,7 @@ func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Emp
}
}

entry := &proto.RaftServer{
entry := &types.RaftServer{
Id: string(server.ID),
Node: node,
Address: string(server.Address),
Expand All @@ -355,7 +355,7 @@ func (grpcs *GRPCServer) RaftGetConfiguration(ctx context.Context, in *empty.Emp
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port". The reply argument is not used, but is required to fulfill the RPC
// interface.
func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *proto.RaftRemovePeerByIDRequest) (*empty.Empty, error) {
func (grpcs *GRPCServer) RaftRemovePeerByID(ctx context.Context, in *types.RaftRemovePeerByIDRequest) (*emptypb.Empty, error) {
// Since this is an operation designed for humans to use, we will return
// an error if the supplied id isn't among the peers since it's
// likely they screwed up.
Expand Down Expand Up @@ -388,28 +388,28 @@ REMOVE:
}

grpcs.logger.WithField("peer", in.Id).Warn("removed Raft peer")
return new(empty.Empty), nil
return new(emptypb.Empty), nil
}

// GetActiveExecutions returns the active executions on the server node
func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *empty.Empty) (*proto.GetActiveExecutionsResponse, error) {
func (grpcs *GRPCServer) GetActiveExecutions(ctx context.Context, in *emptypb.Empty) (*types.GetActiveExecutionsResponse, error) {
defer metrics.MeasureSince([]string{"grpc", "agent_run"}, time.Now())

var executions []*proto.Execution
var executions []*types.Execution
grpcs.agent.activeExecutions.Range(func(k, v interface{}) bool {
e := v.(*proto.Execution)
e := v.(*types.Execution)
executions = append(executions, e)
return true
})

return &proto.GetActiveExecutionsResponse{
return &types.GetActiveExecutionsResponse{
Executions: executions,
}, nil
}

// SetExecution broadcast a state change to the cluster members that will store the execution.
// This only works on the leader
func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *proto.Execution) (*empty.Empty, error) {
func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *types.Execution) (*emptypb.Empty, error) {
defer metrics.MeasureSince([]string{"grpc", "set_execution"}, time.Now())
grpcs.logger.WithFields(logrus.Fields{
"execution": execution.Key(),
Expand All @@ -426,5 +426,5 @@ func (grpcs *GRPCServer) SetExecution(ctx context.Context, execution *proto.Exec
return nil, err
}

return new(empty.Empty), nil
return new(emptypb.Empty), nil
}
Loading

0 comments on commit ede6770

Please sign in to comment.