Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
darkwrat committed Mar 5, 2019
1 parent d6ea2d0 commit 49b8ed4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 56 deletions.
74 changes: 37 additions & 37 deletions cmd/cctvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,49 @@ package main
import (
"flag"
"fmt"
"github.com/mattn/go-mjpeg"
"golang.org/x/net/context"
"google.golang.org/grpc"
"log"
"net/http"
"time"

"github.com/mattn/go-mjpeg"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/darkwrat/cctvd/cctv"
)

var (
addr = flag.String("addr", "127.0.0.1:50051", "cctvd host:port")
addr = flag.String("addr", "127.0.0.1:50051", "cctvd host:port")
delay = flag.Duration("delay", 1*time.Second, "cctvd reconnect delay after failure")
)

func live(m map[int32]*mjpeg.Stream) error {
conn, err := grpc.Dial(*addr, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("could not connect: %s", err)
}
defer conn.Close()

c := cctv.NewCCTVClient(conn)
feeds, err := c.Feeds(context.Background(), &cctv.Channels{Mask: 0xffff})
if err != nil {
return fmt.Errorf("could not subscribe: %s", err)
}

for {
frame, err := feeds.Recv()
if err != nil {
return fmt.Errorf("could not receive frame: %s", err)
}

if frame.Channel < 32 {
if err := m[frame.Channel].Update(frame.Image); err != nil {
return fmt.Errorf("could not update frame for channel `%d': %s", frame.Channel, err)
}
}
}
}

func main() {
flag.Parse()

Expand All @@ -31,41 +60,12 @@ func main() {
log.Fatal(http.ListenAndServe(":8000", nil))
}()

outer:
for {
conn, err := grpc.Dial(*addr, grpc.WithInsecure())
if err != nil {
log.Printf("could not connect: %s", err)
time.Sleep(1 * time.Second)
continue outer
if err := live(m); err != nil {
log.Print(err)
}

c := cctv.NewCCTVClient(conn)
feeds, err := c.Feeds(context.Background(), &cctv.Channels{Mask: 0xffff})
if err != nil {
log.Printf("could not subscribe: %s", err)
_ = conn.Close()
time.Sleep(1 * time.Second)
continue outer
}

for {
frame, err := feeds.Recv()
if err != nil {
log.Printf("could not receive frame: %s", err)
_ = conn.Close()
time.Sleep(1 * time.Second)
continue outer
}

if frame.Channel < 32 {
if err := m[frame.Channel].Update(frame.Image); err != nil {
log.Printf("could not update frame for channel `%d': %s", frame.Channel, err)
_ = conn.Close()
time.Sleep(1 * time.Second)
continue outer
}
}
}
log.Printf("sleeping for %v seconds before retry", delay.Seconds())
time.Sleep(*delay)
}
}
35 changes: 18 additions & 17 deletions cmd/cctvd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package main
import (
"flag"
"fmt"
"google.golang.org/grpc"
"log"
"net"
"sync"
"time"

"google.golang.org/grpc"

"github.com/darkwrat/cctvd/cctv"
"github.com/darkwrat/cctvd/dvr"
)
Expand Down Expand Up @@ -40,9 +41,9 @@ func (s *server) unsubscribe(mask int32, sub chan *cctv.Frame) {
s.Lock()
defer s.Unlock()

for ch := uint8(0); ch < 32; ch++ {
if v := mask & (1 << ch); v != 0 {
if c, ok := s.m[ch]; ok {
for feed := uint8(0); feed < 32; feed++ {
if v := mask & (1 << feed); v != 0 {
if c, ok := s.m[feed]; ok {
delete(c, sub)
}
}
Expand All @@ -53,8 +54,6 @@ func (s *server) send(frame *cctv.Frame) {
s.RLock()
defer s.RUnlock()

// todo fine grained lock
// todo handle client drop
ch := uint8(frame.GetChannel())
if c, ok := s.m[ch]; ok {
for sub := range c {
Expand Down Expand Up @@ -96,25 +95,27 @@ func (s *server) multicast(ch chan *dvr.Frame) {
}
}

var (
addr = flag.String("addr", "127.0.0.1:7620", "dvr host:port")
delay = flag.Duration("delay", 5*time.Second, "delay before relive after failure")
)

func live(opts dvr.ConnectOpts, ch chan *dvr.Frame) error {
c, err := dvr.Connect(opts)
if err != nil {
return fmt.Errorf("cannot connect to dvr: %s", err)
return fmt.Errorf("could not connect to dvr: %s", err)
}
defer c.Close()

if err := c.Live(ch); err != nil {
return fmt.Errorf("cannot stream anymore: %s", err)
return fmt.Errorf("could not continue stream: %s", err)
}

return nil
}

var (
addr = flag.String("addr", "127.0.0.1:7620", "dvr host:port")
user = flag.String("user", "ADMIN", "dvr username")
pass = flag.String("pass", "0000", "dvr password")
delay = flag.Duration("delay", 5*time.Second, "dvr reconnect delay after failure")
)

func main() {
flag.Parse()

Expand All @@ -131,14 +132,14 @@ func main() {
go csrv.multicast(ch)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("cannot serve grpc: %s", err)
log.Fatalf("could not serve grpc: %s", err)
}
}()

opts := dvr.ConnectOpts{
Addr: *addr,
User: "ADMIN",
Password: "0000",
Addr: *addr,
User: *user,
Pass: *pass,
}

for {
Expand Down
4 changes: 2 additions & 2 deletions dvr/dvr.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ type Client struct {
}

type ConnectOpts struct {
Addr, User, Password string
Addr, User, Pass string
}

func Connect(opts ConnectOpts) (*Client, error) {
Expand All @@ -256,7 +256,7 @@ func Connect(opts ConnectOpts) (*Client, error) {
return nil, fmt.Errorf("could not dial: %s", err)
}

if err := doAuth(conn, opts.User, opts.Password); err != nil {
if err := doAuth(conn, opts.User, opts.Pass); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("auth failed: %s", err)
}
Expand Down

0 comments on commit 49b8ed4

Please sign in to comment.