forked from bradfitz/http2
-
Notifications
You must be signed in to change notification settings - Fork 0
Anatomy of serve() loop
Viacheslav Biriukov edited this page Mar 8, 2015
·
2 revisions
Starting by sending Settings:
func (sc *serverConn) serve() {
... // some defers
sc.writeFrame(frameWriteMsg{
write: writeSettings{
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
// TODO: more actual settings, notably
// SettingInitialWindowSize, but then we also
// want to bump up the conn window size the
// same amount here right after the settings
},
})
where frameWriteMsg
from writesched.go
:
// frameWriteMsg is a request to write a frame.
type frameWriteMsg struct {
// write is the interface value that does the writing, once the
// writeScheduler (below) has decided to select this frame
// to write. The write functions are all defined in write.go.
write writeFramer
stream *stream // used for prioritization. nil for non-stream frames.
// done, if non-nil, must be a buffered channel with space for
// 1 message and is sent the return value from write (or an
// earlier error) when the frame has been written.
done chan error
}
and writeSettings
from write.go
:
type writeSettings []Setting
func (s writeSettings) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteSettings([]Setting(s)...)
}
where Framer().WriteSettings
in frame.go
:
// WriteSettings writes a SETTINGS frame with zero or more settings
// specified and the ACK bit not set.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently.
func (f *Framer) WriteSettings(settings ...Setting) error {
f.startWrite(FrameSettings, 0, 0)
for _, s := range settings {
f.writeUint16(uint16(s.ID))
f.writeUint32(s.Val)
}
return f.endWrite()
}
Next we are waiting for ClientPreface greeting:
if err := sc.readPreface(); err != nil {
sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
}
where readPreface()
is:
// readPreface reads the ClientPreface greeting from the peer
// or returns an error on timeout or an invalid greeting.
func (sc *serverConn) readPreface() error {
...
}
Next important thing: we start main goroutine for reading frames from socket:
go sc.readFrames() // closed by defer sc.conn.Close above
// readFrames is the loop that reads incoming frames.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
g := make(gate, 1)
for {
f, err := sc.framer.ReadFrame()
if err != nil {
sc.readFrameErrCh <- err
close(sc.readFrameCh)
return
}
sc.readFrameCh <- frameAndGate{f, g}
// We can't read another frame until this one is
// processed, as the ReadFrame interface doesn't copy
// memory. The Frame accessor methods access the last
// frame's (shared) buffer. So we wait for the
// serve goroutine to tell us it's done:
g.Wait()
}
}
And get infinity loop for serving our connection:
for {
select {
case wm := <-sc.wantWriteFrameCh:
sc.writeFrame(wm)
case pp := <-sc.pushPromiseCh:
sc.startPushPromise(pp)
case <-sc.wroteFrameCh:
sc.writingFrame = false
sc.scheduleFrameWrite()
case fg, ok := <-sc.readFrameCh:
if !ok {
sc.readFrameCh = nil
}
if !sc.processFrameFromReader(fg, ok) {
return
}
if settingsTimer.C != nil {
settingsTimer.Stop()
settingsTimer.C = nil
}
case m := <-sc.bodyReadCh:
sc.noteBodyRead(m.st, m.n)
case <-settingsTimer.C:
sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
return
case <-sc.shutdownTimerCh:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
case fn := <-sc.testHookCh:
fn()
}
}
When we get Frame – run processFrameFromReader
:
// processFrameFromReader processes the serve loop's read from readFrameCh from the
// frame-reading goroutine.
// processFrameFromReader returns whether the connection should be kept open.
func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
sc.serveG.check()
var clientGone bool
var err error
if !fgValid {
err = <-sc.readFrameErrCh
if err == ErrFrameTooLarge {
sc.goAway(ErrCodeFrameSize)
return true // goAway will close the loop
}
clientGone = err == io.EOF || strings.Contains(err.Error(), "use of closed network connection")
if clientGone {
// TODO: could we also get into this state if
// the peer does a half close
// (e.g. CloseWrite) because they're done
// sending frames but they're still wanting
// our open replies? Investigate.
// TODO: add CloseWrite to crypto/tls.Conn first
// so we have a way to test this? I suppose
// just for testing we could have a non-TLS mode.
return false
}
}
if fgValid {
f := fg.f <-------------------------------- Memory copy
sc.vlogf("got %v: %#v", f.Header(), f)
err = sc.processFrame(f)
fg.g.Done() // unblock the readFrames goroutine
if err == nil {
return true
}
}
switch ev := err.(type) {
case StreamError:
sc.resetStream(ev)
return true
case goAwayFlowError:
sc.goAway(ErrCodeFlowControl)
return true
case ConnectionError:
sc.logf("%v: %v", sc.conn.RemoteAddr(), ev)
sc.goAway(ErrCode(ev))
return true // goAway will handle shutdown
default:
if !fgValid {
sc.logf("disconnecting; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
} else {
sc.logf("disconnection due to other error: %v", err)
}
}
return false
}
Processing frame by type:
func (sc *serverConn) processFrame(f Frame) error {
sc.serveG.check()
// First frame received must be SETTINGS.
if !sc.sawFirstSettings {
if _, ok := f.(*SettingsFrame); !ok {
return ConnectionError(ErrCodeProtocol)
}
sc.sawFirstSettings = true
}
if s := sc.curHeaderStreamID(); s != 0 {
if cf, ok := f.(*ContinuationFrame); !ok {
return ConnectionError(ErrCodeProtocol)
} else if cf.Header().StreamID != s {
return ConnectionError(ErrCodeProtocol)
}
}
switch f := f.(type) {
case *SettingsFrame:
return sc.processSettings(f)
case *HeadersFrame:
return sc.processHeaders(f)
case *ContinuationFrame:
return sc.processContinuation(f)
case *WindowUpdateFrame:
return sc.processWindowUpdate(f)
case *PingFrame:
return sc.processPing(f)
case *DataFrame:
return sc.processData(f)
case *RSTStreamFrame:
return sc.processResetStream(f)
case *PriorityFrame:
return sc.processPriority(f)
case *PushPromiseFrame:
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError(ErrCodeProtocol)
default:
log.Printf("Ignoring frame: %v", f.Header())
return nil
}
}
Now about writes. As we saw in begin of the serve()
we use writeFrame
for send setting to a client:
// writeFrame schedules a frame to write and sends it if there's nothing
// already being written.
//
// There is no pushback here (the serve goroutine never blocks). It's
// the http.Handlers that block, waiting for their previous frames to
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
func (sc *serverConn) writeFrame(wm frameWriteMsg) {
sc.serveG.check()
sc.writeSched.add(wm)
sc.scheduleFrameWrite()
}
In the func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler)
while creating serverConn – create writeSched:
sc := &serverConn{
srv: srv,
hs: hs,
conn: c,
...
writeSched: writeScheduler{
maxFrameSize: initialMaxFrameSize,
},
...
where writeScheduler
in writesched.go
:
// writeScheduler tracks pending frames to write, priorities, and decides
// the next one to use. It is not thread-safe.
type writeScheduler struct {
// zero are frames not associated with a specific stream.
// They're sent before any stream-specific freams.
zero writeQueue
// maxFrameSize is the maximum size of a DATA frame
// we'll write. Must be non-zero and between 16K-16M.
maxFrameSize uint32
// sq contains the stream-specific queues, keyed by stream ID.
// when a stream is idle, it's deleted from the map.
sq map[uint32]*writeQueue
// canSend is a slice of memory that's reused between frame
// scheduling decisions to hold the list of writeQueues (from sq)
// which have enough flow control data to send. After canSend is
// built, the best is selected.
canSend []*writeQueue
// pool of empty queues for reuse.
queuePool []*writeQueue
}
When we neet to write:
// scheduleFrameWrite tickles the frame writing scheduler.
//
// If a frame is already being written, nothing happens. This will be called again
// when the frame is done being written.
//
// If a frame isn't being written we need to send one, the best frame
// to send is selected, preferring first things that aren't
// stream-specific (e.g. ACKing settings), and then finding the
// highest priority stream.
//
// If a frame isn't being written and there's nothing else to send, we
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
if sc.writingFrame {
return
}
if sc.needToSendGoAway {
sc.needToSendGoAway = false
sc.startFrameWrite(frameWriteMsg{
write: &writeGoAway{
maxStreamID: sc.maxStreamID,
code: sc.goAwayCode,
},
})
return
}
if sc.needToSendSettingsAck {
sc.needToSendSettingsAck = false
sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
return
}
if !sc.inGoAway {
if wm, ok := sc.writeSched.take(); ok {
sc.startFrameWrite(wm)
return
}
}
if sc.needsFrameFlush {
sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
return
}
}
// startFrameWrite starts a goroutine to write wm (in a separate
// goroutine since that might block on the network), and updates the
// serve goroutine's state about the world, updated from info in wm.
func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
sc.serveG.check()
if sc.writingFrame {
panic("internal error: can only be writing one frame at a time")
}
st := wm.stream
if st != nil {
switch st.state {
case stateHalfClosedLocal:
panic("internal error: attempt to send frame on half-closed-local stream")
case stateClosed:
if st.sentReset || st.gotReset {
// Skip this frame. But fake the frame write to reschedule:
sc.wroteFrameCh <- struct{}{}
return
}
panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
}
}
sc.writingFrame = true
sc.needsFrameFlush = true
if endsStream(wm.write) {
if st == nil {
panic("internal error: expecting non-nil stream")
}
switch st.state {
case stateOpen:
// Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
// for finishing writing to a ResponseWriter
// while still reading data (see possible TODO
// at top of this file), we go into closed
// state here anyway, after telling the peer
// we're hanging up on them.
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
errCancel := StreamError{st.id, ErrCodeCancel}
sc.resetStream(errCancel)
case stateHalfClosedRemote:
sc.closeStream(st, nil)
}
}
go sc.writeFrameAsync(wm)
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
err := wm.write.writeFrame(sc)
if ch := wm.done; ch != nil {
select {
case ch <- err:
default:
panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
}
}
sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
}