Skip to content

Commit

Permalink
virtio-net: use per-CPU-Tx framework
Browse files Browse the repository at this point in the history
This involves the introduction of an xmit iterator and a worker thread.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
  • Loading branch information
Vlad Zolotarov authored and Pekka Enberg committed May 15, 2014
1 parent aa45bbb commit 0296a6c
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 55 deletions.
78 changes: 57 additions & 21 deletions drivers/virtio-net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,39 @@ static int if_transmit(struct ifnet* ifp, struct mbuf* m_head)

net_d("%s_start", __FUNCTION__);

/* Process packets */
vnet->_tx_ring_lock.lock();
return vnet->xmit(m_head);
}

net_d("*** processing packet! ***");
inline int net::xmit(struct mbuf* buff)
{
//
// We currently have only a single TX queue. Select a proper TXq here when
// we implement a multi-queue.
//
return _txq.xmit(buff);
}

int error = vnet->tx_locked(m_head);
inline int net::txq::xmit(mbuf* buff)
{
return _xmitter.xmit(buff);
}

if (!error)
vnet->kick(1);
inline bool net::txq::kick_hw()
{
return vqueue->kick();
}

vnet->_tx_ring_lock.unlock();
inline void net::txq::kick_pending(u16 thresh)
{
if (_pkts_to_kick >= thresh) {
_pkts_to_kick = 0;
kick_hw();
}
}

return error;
inline void net::txq::wake_worker()
{
worker.wake();
}

static void if_init(void* xsc)
Expand Down Expand Up @@ -203,6 +223,7 @@ net::net(pci::device& dev)
_txq(this, get_virt_queue(1))
{
sched::thread* poll_task = &_rxq.poll_task;
sched::thread* tx_worker_task = &_txq.worker;

_driver_name = "virtio-net";
virtio_i("VIRTIO NET INSTANCE");
Expand Down Expand Up @@ -266,6 +287,9 @@ net::net(pci::device& dev)
//Start the polling thread before attaching it to the Rx interrupt
poll_task->start();

// TODO: What if_init() is for?
tx_worker_task->start();

ether_ifattach(_ifn, _config.mac);
if (dev.is_msix()) {
_msi.easy_register({
Expand Down Expand Up @@ -552,16 +576,23 @@ void net::fill_rx_ring()
vq->kick();
}

inline int net::tx_locked(struct mbuf* m_head)
inline int net::txq::try_xmit_one_locked(void* _req)
{
return _txq.xmit_one_locked(m_head);
net_req* req = static_cast<net_req*>(_req);
int rc = try_xmit_one_locked(req);

if (rc) {
return rc;
}

update_stats(req);
return 0;
}

inline int net::txq::xmit_prep(mbuf* m_head, net_req*& cooky)
inline int net::txq::xmit_prep(mbuf* m_head, void*& cooky)
{
net_req* req = new net_req;
net_req* req = new net_req(m_head);
mbuf* m;
req->um.reset(m_head);

if (m_head->M_dat.MH.MH_pkthdr.csum_flags != 0) {
m = offload(m_head, &req->mhdr.hdr);
Expand All @@ -581,10 +612,11 @@ inline int net::txq::xmit_prep(mbuf* m_head, net_req*& cooky)

int net::txq::try_xmit_one_locked(net_req* req)
{
mbuf *m_head = req->um.get(), *m;
mbuf *m_head = req->mb, *m;
u16 vec_sz = 0;
u64 tx_bytes = 0;

DEBUG_ASSERT(!try_lock_running(), "RUNNING lock not taken!\n");

if (_parent->_mergeable_bufs) {
req->mhdr.num_buffers = 0;
Expand Down Expand Up @@ -638,16 +670,15 @@ inline void net::txq::update_stats(net_req* req)
stats.tx_tso++;
}

int net::txq::xmit_one_locked(mbuf* m_head)

void net::txq::xmit_one_locked(void* _req)
{
net_req* req;
int rc = xmit_prep(m_head, req);
if (rc) {
return rc;
}
net_req* req = static_cast<net_req*>(_req);

if (try_xmit_one_locked(req)) {
do {
// We are going to poll - flush the pending packets
kick_pending();
if (!vqueue->used_ring_not_empty()) {
do {
sched::thread::yield();
Expand All @@ -662,7 +693,11 @@ int net::txq::xmit_one_locked(mbuf* m_head)
// Update the statistics
update_stats(req);

return 0;
//
// It was a good packet - increase the counter of a "pending for a kick"
// packets.
//
_pkts_to_kick++;
}

mbuf* net::txq::offload(mbuf* m, net_hdr* hdr)
Expand Down Expand Up @@ -761,6 +796,7 @@ void net::txq::gc()
req = static_cast<net_req*>(vqueue->get_buf_elem(&len));

while(req != nullptr) {
m_freem(req->mb);
delete req;

req_cnt++;
Expand Down
168 changes: 134 additions & 34 deletions drivers/virtio-net.hh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <bsd/sys/net/if.h>
#include <bsd/sys/sys/mbuf.h>

#include <osv/percpu_xmit.hh>

#include "drivers/virtio.hh"
#include "drivers/pci-device.hh"

Expand Down Expand Up @@ -220,17 +222,6 @@ public:

bool ack_irq();

/**
* Transmit a single mbuf.
* @param m_head a buffer to transmits
*
* @note should be called under the _tx_ring_lock.
*
* @return 0 in case of success and an appropriate error code
* otherwise
*/
int tx_locked(struct mbuf* m_head);

static hw_driver* probe(hw_device* dev);

/**
Expand All @@ -240,20 +231,25 @@ public:
*/
void fill_stats(struct if_data* out_data) const;

// tx ring lock protects this ring for multiple access
mutex _tx_ring_lock;

/**
* Transmit a single frame.
*
* @note This function may sleep!
* @param buff frame to transmit
*
* @return 0 in case of success, EINVAL in case the frame is not
* well-formed.
*/
int xmit(mbuf* buff);
private:

struct net_req {
struct net::net_hdr_mrg_rxbuf mhdr;
struct free_deleter {
void operator()(struct mbuf* m) {m_freem(m);}
};

std::unique_ptr<struct mbuf, free_deleter> um;
explicit net_req(mbuf *m) : mb(m) {
memset(&mhdr, 0, sizeof(mhdr));
}

net_req() {memset(&mhdr,0,sizeof(mhdr));};
struct net::net_hdr_mrg_rxbuf mhdr;
mbuf* mb;
u64 tx_bytes;
};

Expand Down Expand Up @@ -300,22 +296,112 @@ private:
struct rxq_stats stats = { 0 };
};

/* Single Tx queue object */
struct txq;
/**
* @class tx_xmit_iterator
*
* This iterator will be used as an output iterator by the nway_merger
* instance that will merge the per-CPU tx_cpu_queue instances.
*
* It's operator=() will actually sent the packet to the (virtual) HW.
*/
class tx_xmit_iterator {
public:
tx_xmit_iterator(txq* txq) : _q(txq) { }

// These ones will do nothing
tx_xmit_iterator& operator *() { return *this; }
tx_xmit_iterator& operator++() { return *this; }

/**
* Push the packet downstream
* @param tx_desc
*/
void operator=(void* cooky) {
_q->xmit_one_locked(cooky);
}
private:
txq* _q;
};

/**
* @class txq
* A single Tx queue object.
*
* TODO: Make it a class!
*/
struct txq {
friend class tx_xmit_iterator;

txq(net* parent, vring* vq) :
vqueue(vq), _parent(parent) {};
vqueue(vq), _parent(parent), _xmit_it(this),
_kick_thresh(vqueue->size()), _xmitter(this),
worker([this] {
// TODO: implement a proper StopPred when we fix a SP code
_xmitter.poll_until([] { return false; }, _xmit_it);
})
{
//
// Kick at least every full ring of packets (see _kick_thresh
// above).
//
// Othersize a deadlock is possible:
// 1) We post a full ring of buffers without a kick().
// 2) We block on posting of the next buffer.
// 3) HW doesn't know there is a work to do.
// 4) Dead lock.
//
};

/**
* Transmit a single packet. Will wait for completions if there is no
* room on a HW ring.
* Checks the packet and returns the net_req (returned in a "cooky")
* @param m_head
* @param cooky
*
* @param req Tx request handle
* @return 0 if packet is ok and EINVAL if it's not well-formed.
*/
int xmit_prep(mbuf* m_head, void*& cooky);

/**
* Try to transmit a single packet. Don't block on failure.
*
* Must run with "running" lock taken.
* In case of a success this function will update Tx statistics.
* @param m_head
* @param cooky Cooky returned by xmit_prep().
* @param tx_bytes
*
* @return 0 if packet has been successfully sent and EINVAL if it was
* not well-formed.
* @return 0 if packet has been successfully sent and ENOBUFS if there
* was no room on a HW ring to send the packet.
*/
int try_xmit_one_locked(void* cooky);

int xmit_one_locked(mbuf* m_head);
/**
* Kick the vqueue if number of pending packets has reached the given
* threshold.
*
* @param thresh threshold
*/
void kick_pending(u16 thresh = 1);
void kick_pending_with_thresh() {
kick_pending(_kick_thresh);
}

/**
* Kick the underlying vring.
*
* @return TRUE if the vring has been actually indicated.
*/
bool kick_hw();

/**
* Inform the Txq that there is a new pending work
*/
void wake_worker();

int xmit(mbuf* m_head);

/* TODO: drain the per-cpu rings in ~txq() and in if_qflush() */

vring* vqueue;
txq_stats stats = { 0 };
Expand All @@ -336,13 +422,13 @@ private:
int try_xmit_one_locked(net_req* req);

/**
* Checks the packet and returns the net_req (returned in a "cooky")
* @param m_head
* @param cooky
* Transmit a single packet. Will wait for completions if there is no
* room on a HW ring.
*
* @return 0 if packet is ok and EINVAL if it's not well-formed.
* Must run with a "running" lock taken.
* @param req Tx request handle
*/
int xmit_prep(mbuf* m_head, net_req*& cooky);
void xmit_one_locked(void* req);

/**
* Free the descriptors for the completed packets.
Expand All @@ -367,6 +453,20 @@ private:
void update_stats(net_req* req);

net* _parent;
tx_xmit_iterator _xmit_it;
const int _kick_thresh;
u16 _pkts_to_kick = 0;
//
// 4096 is the size of the buffers ring of the FreeBSD virtio-net
// driver. So, we are using this as a baseline. We may ajust this value
// later (cut it down maybe?!).
//
// Currently this gives us ~16 pages per one CPU ring.
//
osv::xmitter<txq, 4096> _xmitter;

public:
sched::thread worker;
};

/**
Expand Down

0 comments on commit 0296a6c

Please sign in to comment.