Skip to content

Commit

Permalink
Remove unary RPC and rename StreamCapture to Capture
Browse files Browse the repository at this point in the history
Signed-off-by: Chance Zibolski <chance.zibolski@gmail.com>
  • Loading branch information
chancez committed Apr 19, 2024
1 parent 489a769 commit 4f7fbca
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 188 deletions.
4 changes: 2 additions & 2 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type gateway struct {
connTimeout time.Duration
}

func (s *gateway) StreamCapture(req *capperpb.CaptureRequest, stream capperpb.Capper_StreamCaptureServer) error {
func (s *gateway) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_CaptureServer) error {
ctx := stream.Context()

var peers []string
Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *gateway) StreamCapture(req *capperpb.CaptureRequest, stream capperpb.Ca
c := capperpb.NewCapperClient(conn)

s.log.Debug("starting stream", "peer", peer)
peerStream, err := c.StreamCapture(ctx, req)
peerStream, err := c.Capture(ctx, req)
if err != nil {
return fmt.Errorf("error creating stream: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/remote_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func remoteCapture(ctx context.Context, log *slog.Logger, addr string, connTimeo
}

log.Debug("creating capture stream")
stream, err := c.StreamCapture(reqCtx, req)
stream, err := c.Capture(reqCtx, req)
if err != nil {
return fmt.Errorf("error creating stream: %w", err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func remoteCapture(ctx context.Context, log *slog.Logger, addr string, connTimeo
return err
}

func handleClientStream(ctx context.Context, handler capture.PacketHandler, stream capperpb.Capper_StreamCaptureClient) error {
func handleClientStream(ctx context.Context, handler capture.PacketHandler, stream capperpb.Capper_CaptureClient) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -170,13 +170,13 @@ func handleClientStream(ctx context.Context, handler capture.PacketHandler, stre
type clientStreamReader struct {
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
stream capperpb.Capper_StreamCaptureClient
stream capperpb.Capper_CaptureClient

pcapReader *pcapgo.Reader
errCh chan error
}

func newClientStreamReader(stream capperpb.Capper_StreamCaptureClient) (*clientStreamReader, error) {
func newClientStreamReader(stream capperpb.Capper_CaptureClient) (*clientStreamReader, error) {
r, w := io.Pipe()
rw := &clientStreamReader{
pipeReader: r,
Expand Down
31 changes: 4 additions & 27 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,30 +123,7 @@ func (s *server) getNetns(ctx context.Context, req *capperpb.CaptureRequest) (st
return netns, nil
}

func (s *server) Capture(ctx context.Context, req *capperpb.CaptureRequest) (*capperpb.CaptureResponse, error) {
netns, err := s.getNetns(ctx, req)
if err != nil {
return nil, fmt.Errorf("error getting netns: %w", err)
}

var buf bytes.Buffer
writeHandler := capture.NewPcapWriterHandler(&buf, uint32(req.GetSnaplen()))

conf := capture.Config{
Filter: req.GetFilter(),
Snaplen: int(req.GetSnaplen()),
Promisc: req.GetNoPromiscuousMode(),
NumPackets: req.GetNumPackets(),
CaptureDuration: req.GetDuration().AsDuration(),
Netns: netns,
}
err = capture.StartMulti(ctx, s.log, req.GetInterface(), conf, writeHandler)
if err != nil {
return nil, status.Errorf(codes.Internal, "error occurred while capturing packets: %s", err)
}
return &capperpb.CaptureResponse{Pcap: buf.Bytes()}, nil
}
func (s *server) StreamCapture(req *capperpb.CaptureRequest, stream capperpb.Capper_StreamCaptureServer) error {
func (s *server) Capture(req *capperpb.CaptureRequest, stream capperpb.Capper_CaptureServer) error {
ctx := stream.Context()
netns, err := s.getNetns(ctx, req)
if err != nil {
Expand All @@ -170,13 +147,13 @@ func (s *server) StreamCapture(req *capperpb.CaptureRequest, stream capperpb.Cap
}

// newStreamPacketHandler returns a PacketHandler which writes the packets as
// bytes to the given Capper_StreamCaptureServer stream.
func newStreamPacketHandler(snaplen uint32, stream capperpb.Capper_StreamCaptureServer) capture.PacketHandler {
// bytes to the given Capper_CaptureServer stream.
func newStreamPacketHandler(snaplen uint32, stream capperpb.Capper_CaptureServer) capture.PacketHandler {
var buf bytes.Buffer
writeHandler := capture.NewPcapWriterHandler(&buf, snaplen)
streamHandler := capture.PacketHandlerFunc(func(_ layers.LinkType, p gopacket.Packet) error {
// send the packet on the stream
if err := stream.Send(&capperpb.StreamCaptureResponse{
if err := stream.Send(&capperpb.CaptureResponse{
Data: buf.Bytes(),
}); err != nil {
errCode := status.Code(err)
Expand Down
113 changes: 22 additions & 91 deletions proto/capper/capper.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions proto/capper/capper.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ package capper;
option go_package = "github.com/chancez/capper/proto/capper";

service Capper {
rpc Capture(CaptureRequest) returns (CaptureResponse) {}
rpc StreamCapture(CaptureRequest) returns (stream StreamCaptureResponse) {}
rpc Capture(CaptureRequest) returns (stream CaptureResponse) {}
}

message CaptureRequest {
Expand All @@ -30,9 +29,5 @@ message K8sPodFilter {
}

message CaptureResponse {
bytes pcap = 1;
}

message StreamCaptureResponse {
bytes data = 1;
}
Loading

0 comments on commit 4f7fbca

Please sign in to comment.