Skip to content

Anatomy of serve() loop

Viacheslav Biriukov edited this page Mar 8, 2015 · 2 revisions

Init

Starting by sending Settings:

func (sc *serverConn) serve() {
       
        ... // some defers

	sc.writeFrame(frameWriteMsg{
		write: writeSettings{
			{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
			{SettingMaxConcurrentStreams, sc.advMaxStreams},

			// TODO: more actual settings, notably
			// SettingInitialWindowSize, but then we also
			// want to bump up the conn window size the
			// same amount here right after the settings
		},
	})

where frameWriteMsg from writesched.go:

// frameWriteMsg is a request to write a frame.
type frameWriteMsg struct {
	// write is the interface value that does the writing, once the
	// writeScheduler (below) has decided to select this frame
	// to write. The write functions are all defined in write.go.
	write writeFramer

	stream *stream // used for prioritization. nil for non-stream frames.

	// done, if non-nil, must be a buffered channel with space for
	// 1 message and is sent the return value from write (or an
	// earlier error) when the frame has been written.
	done chan error
}

and writeSettings from write.go:

type writeSettings []Setting

func (s writeSettings) writeFrame(ctx writeContext) error {
	return ctx.Framer().WriteSettings([]Setting(s)...)
}

where Framer().WriteSettings in frame.go:

// WriteSettings writes a SETTINGS frame with zero or more settings
// specified and the ACK bit not set.
//
// It will perform exactly one Write to the underlying Writer.
// It is the caller's responsibility to not call other Write methods concurrently.
func (f *Framer) WriteSettings(settings ...Setting) error {
	f.startWrite(FrameSettings, 0, 0)
	for _, s := range settings {
		f.writeUint16(uint16(s.ID))
		f.writeUint32(s.Val)
	}
	return f.endWrite()
}

Next we are waiting for ClientPreface greeting:

	if err := sc.readPreface(); err != nil {
		sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
		return
	}

where readPreface() is:

// readPreface reads the ClientPreface greeting from the peer
// or returns an error on timeout or an invalid greeting.
func (sc *serverConn) readPreface() error {
    ...
}

Next important thing: we start main goroutine for reading frames from socket:

go sc.readFrames() // closed by defer sc.conn.Close above
// readFrames is the loop that reads incoming frames.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() {
	g := make(gate, 1)
	for {
		f, err := sc.framer.ReadFrame()
		if err != nil {
			sc.readFrameErrCh <- err
			close(sc.readFrameCh)
			return
		}
		sc.readFrameCh <- frameAndGate{f, g}
		// We can't read another frame until this one is
		// processed, as the ReadFrame interface doesn't copy
		// memory.  The Frame accessor methods access the last
		// frame's (shared) buffer. So we wait for the
		// serve goroutine to tell us it's done:
		g.Wait()
	}
}

And get infinity loop for serving our connection:

	for {
		select {
		case wm := <-sc.wantWriteFrameCh:
			sc.writeFrame(wm)
		case pp := <-sc.pushPromiseCh:
			sc.startPushPromise(pp)
		case <-sc.wroteFrameCh:
			sc.writingFrame = false
			sc.scheduleFrameWrite()
		case fg, ok := <-sc.readFrameCh:
			if !ok {
				sc.readFrameCh = nil
			}
			if !sc.processFrameFromReader(fg, ok) {
				return
			}
			if settingsTimer.C != nil {
				settingsTimer.Stop()
				settingsTimer.C = nil
			}
		case m := <-sc.bodyReadCh:
			sc.noteBodyRead(m.st, m.n)
		case <-settingsTimer.C:
			sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
			return
		case <-sc.shutdownTimerCh:
			sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
			return
		case fn := <-sc.testHookCh:
			fn()
		}
	}

Read

When we get Frame – run processFrameFromReader:

// processFrameFromReader processes the serve loop's read from readFrameCh from the
// frame-reading goroutine.
// processFrameFromReader returns whether the connection should be kept open.
func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
	sc.serveG.check()
	var clientGone bool
	var err error
	if !fgValid {
		err = <-sc.readFrameErrCh
		if err == ErrFrameTooLarge {
			sc.goAway(ErrCodeFrameSize)
			return true // goAway will close the loop
		}
		clientGone = err == io.EOF || strings.Contains(err.Error(), "use of closed network connection")
		if clientGone {
			// TODO: could we also get into this state if
			// the peer does a half close
			// (e.g. CloseWrite) because they're done
			// sending frames but they're still wanting
			// our open replies?  Investigate.
			// TODO: add CloseWrite to crypto/tls.Conn first
			// so we have a way to test this? I suppose
			// just for testing we could have a non-TLS mode.
			return false
		}
	}

	if fgValid {
		f := fg.f  <-------------------------------- Memory copy
		sc.vlogf("got %v: %#v", f.Header(), f)
		err = sc.processFrame(f)
		fg.g.Done() // unblock the readFrames goroutine
		if err == nil {
			return true
		}
	}

	switch ev := err.(type) {
	case StreamError:
		sc.resetStream(ev)
		return true
	case goAwayFlowError:
		sc.goAway(ErrCodeFlowControl)
		return true
	case ConnectionError:
		sc.logf("%v: %v", sc.conn.RemoteAddr(), ev)
		sc.goAway(ErrCode(ev))
		return true // goAway will handle shutdown
	default:
		if !fgValid {
			sc.logf("disconnecting; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
		} else {
			sc.logf("disconnection due to other error: %v", err)
		}
	}
	return false
}

Processing frame by type:

func (sc *serverConn) processFrame(f Frame) error {
	sc.serveG.check()

	// First frame received must be SETTINGS.
	if !sc.sawFirstSettings {
		if _, ok := f.(*SettingsFrame); !ok {
			return ConnectionError(ErrCodeProtocol)
		}
		sc.sawFirstSettings = true
	}

	if s := sc.curHeaderStreamID(); s != 0 {
		if cf, ok := f.(*ContinuationFrame); !ok {
			return ConnectionError(ErrCodeProtocol)
		} else if cf.Header().StreamID != s {
			return ConnectionError(ErrCodeProtocol)
		}
	}

	switch f := f.(type) {
	case *SettingsFrame:
		return sc.processSettings(f)
	case *HeadersFrame:
		return sc.processHeaders(f)
	case *ContinuationFrame:
		return sc.processContinuation(f)
	case *WindowUpdateFrame:
		return sc.processWindowUpdate(f)
	case *PingFrame:
		return sc.processPing(f)
	case *DataFrame:
		return sc.processData(f)
	case *RSTStreamFrame:
		return sc.processResetStream(f)
	case *PriorityFrame:
		return sc.processPriority(f)
	case *PushPromiseFrame:
		// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
		// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
		return ConnectionError(ErrCodeProtocol)
	default:
		log.Printf("Ignoring frame: %v", f.Header())
		return nil
	}
}

Write

Now about writes. As we saw in begin of the serve() we use writeFrame for send setting to a client:

// writeFrame schedules a frame to write and sends it if there's nothing
// already being written.
//
// There is no pushback here (the serve goroutine never blocks). It's
// the http.Handlers that block, waiting for their previous frames to
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
func (sc *serverConn) writeFrame(wm frameWriteMsg) {
	sc.serveG.check()
	sc.writeSched.add(wm)
	sc.scheduleFrameWrite()
}

In the func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) while creating serverConn – create writeSched:

	sc := &serverConn{
		srv:              srv,
		hs:               hs,
		conn:             c,
                ...
		writeSched: writeScheduler{
			maxFrameSize: initialMaxFrameSize,
		},
                ...

where writeScheduler in writesched.go:

// writeScheduler tracks pending frames to write, priorities, and decides
// the next one to use. It is not thread-safe.
type writeScheduler struct {
	// zero are frames not associated with a specific stream.
	// They're sent before any stream-specific freams.
	zero writeQueue

	// maxFrameSize is the maximum size of a DATA frame
	// we'll write. Must be non-zero and between 16K-16M.
	maxFrameSize uint32

	// sq contains the stream-specific queues, keyed by stream ID.
	// when a stream is idle, it's deleted from the map.
	sq map[uint32]*writeQueue

	// canSend is a slice of memory that's reused between frame
	// scheduling decisions to hold the list of writeQueues (from sq)
	// which have enough flow control data to send. After canSend is
	// built, the best is selected.
	canSend []*writeQueue

	// pool of empty queues for reuse.
	queuePool []*writeQueue
}

When we neet to write:

// scheduleFrameWrite tickles the frame writing scheduler.
//
// If a frame is already being written, nothing happens. This will be called again
// when the frame is done being written.
//
// If a frame isn't being written we need to send one, the best frame
// to send is selected, preferring first things that aren't
// stream-specific (e.g. ACKing settings), and then finding the
// highest priority stream.
//
// If a frame isn't being written and there's nothing else to send, we
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
	sc.serveG.check()
	if sc.writingFrame {
		return
	}
	if sc.needToSendGoAway {
		sc.needToSendGoAway = false
		sc.startFrameWrite(frameWriteMsg{
			write: &writeGoAway{
				maxStreamID: sc.maxStreamID,
				code:        sc.goAwayCode,
			},
		})
		return
	}
	if sc.needToSendSettingsAck {
		sc.needToSendSettingsAck = false
		sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
		return
	}
	if !sc.inGoAway {
		if wm, ok := sc.writeSched.take(); ok {
			sc.startFrameWrite(wm)
			return
		}
	}
	if sc.needsFrameFlush {
		sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
		sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
		return
	}
}
// startFrameWrite starts a goroutine to write wm (in a separate
// goroutine since that might block on the network), and updates the
// serve goroutine's state about the world, updated from info in wm.
func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
	sc.serveG.check()
	if sc.writingFrame {
		panic("internal error: can only be writing one frame at a time")
	}

	st := wm.stream
	if st != nil {
		switch st.state {
		case stateHalfClosedLocal:
			panic("internal error: attempt to send frame on half-closed-local stream")
		case stateClosed:
			if st.sentReset || st.gotReset {
				// Skip this frame. But fake the frame write to reschedule:
				sc.wroteFrameCh <- struct{}{}
				return
			}
			panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
		}
	}

	sc.writingFrame = true
	sc.needsFrameFlush = true
	if endsStream(wm.write) {
		if st == nil {
			panic("internal error: expecting non-nil stream")
		}
		switch st.state {
		case stateOpen:
			// Here we would go to stateHalfClosedLocal in
			// theory, but since our handler is done and
			// the net/http package provides no mechanism
			// for finishing writing to a ResponseWriter
			// while still reading data (see possible TODO
			// at top of this file), we go into closed
			// state here anyway, after telling the peer
			// we're hanging up on them.
			st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
			errCancel := StreamError{st.id, ErrCodeCancel}
			sc.resetStream(errCancel)
		case stateHalfClosedRemote:
			sc.closeStream(st, nil)
		}
	}
	go sc.writeFrameAsync(wm)
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
	err := wm.write.writeFrame(sc)
	if ch := wm.done; ch != nil {
		select {
		case ch <- err:
		default:
			panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
		}
	}
	sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
}
Clone this wiki locally