Skip to content

Commit

Permalink
Add flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Sep 8, 2011
1 parent ca1d12c commit 6a7fd14
Showing 1 changed file with 52 additions and 10 deletions.
62 changes: 52 additions & 10 deletions lib/node-http-proxy/http-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,22 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
response.on('data', function (chunk) {
if (req.method !== 'HEAD' && res.writable) {
try {
res.write(chunk);
var flushed = res.write(chunk);
}
catch (ex) {
console.error("res.write error: %s", ex.message);

try { res.end() }
catch (ex) { console.error("res.write error: %s", ex.message) }
catch (ex) { console.error("res.end error: %s", ex.message) }

return;
}

if (!flushed) {
response.pause();
res.once('drain', function () {
response.resume();
});
}
}
});
Expand Down Expand Up @@ -265,7 +275,13 @@ HttpProxy.prototype.proxyRequest = function (req, res, buffer) {
//
req.on('data', function (chunk) {
if (!errState) {
reverseProxy.write(chunk);
var flushed = reverseProxy.write(chunk);
if (!flushed) {
req.pause();
reverseProxy.once('drain', function () {
req.resume();
});
}
}
});

Expand Down Expand Up @@ -334,8 +350,14 @@ HttpProxy.prototype._forwardRequest = function (req) {
// the proxied request come in
//
req.on('data', function (chunk) {
forwardProxy.write(chunk);
})
var flushed = forwardProxy.write(chunk);
if (!flushed) {
req.pause();
forwardProxy.once('drain', function () {
req.resume();
});
}
});

//
// At the end of the client request, we are going to
Expand All @@ -357,7 +379,7 @@ HttpProxy.prototype._forwardRequest = function (req) {
//
HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer) {
var self = this,
outgoing = new(this.target.base);
outgoing = new(this.target.base),
listeners = {},
errState = false,
CRLF = '\r\n';
Expand Down Expand Up @@ -420,7 +442,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
if (reverseProxy.incoming.socket.writable) {
try {
self.emit('websocket:outgoing', req, socket, head, data);
reverseProxy.incoming.socket.write(data);
var flushed = reverseProxy.incoming.socket.write(data);
if (!flushed) {
proxySocket.pause();
reverseProxy.incoming.socket.once('drain', function () {
proxySocket.resume();
});
}
}
catch (ex) {
detach();
Expand All @@ -437,7 +465,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
reverseProxy.incoming.socket.on('data', listeners.onOutgoing = function (data) {
try {
self.emit('websocket:incoming', reverseProxy, reverseProxy.incoming, head, data);
proxySocket.write(data);
var flushed = proxySocket.write(data);
if (!flushed) {
reverseProxy.incoming.socket.pause();
proxySocket.once('drain', function () {
reverseProxy.incoming.socket.resume();
});
}
}
catch (ex) {
detach();
Expand Down Expand Up @@ -595,7 +629,13 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)
//
self.emit('websocket:handshake', req, socket, head, sdata, data);
socket.write(sdata);
socket.write(data);
var flushed = socket.write(data);
if (!flushed) {
reverseProxy.socket.pause();
socket.once('drain', function () {
reverseProxy.socket.resume();
});
}
}
catch (ex) {
//
Expand All @@ -620,7 +660,9 @@ HttpProxy.prototype.proxyWebSocketRequest = function (req, socket, head, buffer)

try {
//
// Attempt to write the upgrade-head to the reverseProxy request.
// Attempt to write the upgrade-head to the reverseProxy
// request. This is small, and there's only ever one of
// it; no need for pause/resume.
//
reverseProxy.write(head);
}
Expand Down

0 comments on commit 6a7fd14

Please sign in to comment.