Skip to content

Commit

Permalink
wait until all the on going requests go to a stable stat before shuti…
Browse files Browse the repository at this point in the history
…ng down daemon

Signed-off-by: shenling.yyb <shenling.yyb@alibaba-inc.com>
  • Loading branch information
shenling.yyb authored and fuweid committed Oct 14, 2019
1 parent 5a83e1e commit 53f5b0e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ coverage.txt
bin/
coverage/
test/testdata/cp/*
.vscode
4 changes: 4 additions & 0 deletions apis/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"net/http/pprof"
"sync/atomic"
"time"

serverTypes "github.com/alibaba/pouch/apis/server/types"
Expand Down Expand Up @@ -177,6 +178,9 @@ func filter(handler serverTypes.Handler, s *Server) http.HandlerFunc {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

atomic.AddInt32(&s.FlyingReq, 1)
defer atomic.AddInt32(&s.FlyingReq, -1)

s.lock.RLock()
if len(s.ManagerWhiteList) > 0 && req.TLS != nil && len(req.TLS.PeerCertificates) > 0 {
if _, isManager := s.ManagerWhiteList[req.TLS.PeerCertificates[0].Subject.CommonName]; !isManager {
Expand Down
21 changes: 21 additions & 0 deletions apis/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/alibaba/pouch/cri/stream"
Expand All @@ -33,6 +34,7 @@ type Server struct {
APIPlugin hookplugins.APIPlugin
ManagerWhiteList map[string]struct{}
lock sync.RWMutex
FlyingReq int32
}

// Start setup route table and listen to specified address which currently only supports unix socket and tcp address.
Expand Down Expand Up @@ -107,5 +109,24 @@ func (s *Server) Stop() error {
for _, one := range s.listeners {
one.Close()
}

// drain all requests on going or timeout after one minute
drain := make(chan struct{})
go func() {
for {
if atomic.LoadInt32(&s.FlyingReq) == 0 {
close(drain)
return
}
time.Sleep(time.Microsecond * 50)
}
}()

select {
case <-drain:
case <-time.After(60 * time.Second):
logrus.Errorf("stop pouch server after waited 60 seconds, on going request %d", atomic.LoadInt32(&s.FlyingReq))
}

return nil
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func runDaemon(cmd *cobra.Command) error {
}

signal.Notify(signalCh, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGHUP)
sigHandles = append(sigHandles, d.ShutdownPlugin, d.Shutdown)
sigHandles = append(sigHandles, d.Shutdown, d.ShutdownPlugin)

go func() {
// FIXME: I think the Run() should always return error.
Expand Down

0 comments on commit 53f5b0e

Please sign in to comment.