Skip to content

Commit

Permalink
Bug 2581, Bug 3081, Bug 2948: various TCP socket connection problems
Browse files Browse the repository at this point in the history
Bug 3081:
  During conversion of listening socket handlers to AsyncCalls a violation
of the AsyncCall API was introduced. Resulting in occasional crashes from
invalid re-use of call objects.


This implements a TcpAcceptor async job which receives a listening socket
and a CallSubscription. For every connection attempt on the listener socket
a new AsyncCall is spawned from the subscription template.

Initial users are the HTTP and HTTPS listening sockets and FTP data channel.

In order to implement this job in FTP the logics surrounding data channel
handling had to be extended and reworked. Fixing bug 2948 and 2581 in the
process.
  • Loading branch information
yadij committed Jan 26, 2011
1 parent 12489dc commit cbff89b
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 383 deletions.
12 changes: 12 additions & 0 deletions src/CommCalls.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class CommAcceptCbPtrFun: public CallDialer,
{
public:
typedef CommAcceptCbParams Params;
typedef RefCount<CommAcceptCbPtrFun> Pointer;

CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
void dial();
Expand Down Expand Up @@ -259,11 +260,19 @@ template <class Dialer>
class CommCbFunPtrCallT: public AsyncCall
{
public:
typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
typedef typename Dialer::Params Params;

inline CommCbFunPtrCallT(int debugSection, int debugLevel,
const char *callName, const Dialer &aDialer);

inline CommCbFunPtrCallT(const CommCbFunPtrCallT &o) :
AsyncCall(o.debugSection, o.debugLevel, o.name),
dialer(o.dialer)
{}

~CommCbFunPtrCallT() {}

virtual CallDialer* getDialer() { return &dialer; }

public:
Expand All @@ -272,6 +281,9 @@ class CommCbFunPtrCallT: public AsyncCall
protected:
inline virtual bool canFire();
inline virtual void fire();

private:
CommCbFunPtrCallT & operator=(const CommCbFunPtrCallT &); // not defined. not permitted.
};

// Conveninece wrapper: It is often easier to call a templated function than
Expand Down
13 changes: 9 additions & 4 deletions src/ProtoPort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
*/

#include "squid.h"
#include "comm.h"
#include "ProtoPort.h"
#if HAVE_LIMITS
#include <limits>
#endif

http_port_list::http_port_list(const char *aProtocol)
http_port_list::http_port_list(const char *aProtocol) :
listenFd(-1)
#if USE_SSL
:
http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
, http(*this)
, dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
#endif
{
protocol = xstrdup(aProtocol);
}

http_port_list::~http_port_list()
{
delete listener;
if (listenFd >= 0) {
comm_close(listenFd);
listenFd = -1;
}

safe_free(name);
safe_free(defaultsite);
Expand Down
10 changes: 4 additions & 6 deletions src/ProtoPort.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
#ifndef SQUID_PROTO_PORT_H
#define SQUID_PROTO_PORT_H

//#include "typedefs.h"
#include "cbdata.h"
#include "comm/ListenStateData.h"

#if USE_SSL
#include "ssl/gadgets.h"
Expand Down Expand Up @@ -43,11 +41,11 @@ struct http_port_list {
} tcp_keepalive;

/**
* The FD listening socket handler.
* If not NULL we are actively listening for client requests.
* delete to close the socket.
* The FD listening socket.
* If >= 0 we are actively listening for client requests.
* use comm_close(listenFd) to stop.
*/
Comm::ListenStateData *listener;
int listenFd;

#if USE_SSL
// XXX: temporary hack to ease move of SSL options to http_port
Expand Down
13 changes: 13 additions & 0 deletions src/base/AsyncCall.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class AsyncCall: public RefCountable

private:
const char *isCanceled; // set to the cancelation reason by cancel()

// not implemented to prevent nil calls from being passed around and unknowingly scheduled, for now.
AsyncCall();
AsyncCall(const AsyncCall &);
};

inline
Expand Down Expand Up @@ -122,6 +126,12 @@ class AsyncCallT: public AsyncCall
const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
dialer(aDialer) {}

AsyncCallT(const AsyncCallT<Dialer> &o):
AsyncCall(o.debugSection, o.debugLevel, o.name),
dialer(o.dialer) {}

~AsyncCallT() {}

CallDialer *getDialer() { return &dialer; }

protected:
Expand All @@ -132,6 +142,9 @@ class AsyncCallT: public AsyncCall
virtual void fire() { dialer.dial(*this); }

Dialer dialer;

private:
AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted.
};

template <class Dialer>
Expand Down
2 changes: 1 addition & 1 deletion src/base/Subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CallSubscription: public Subscription
public:
/// Must be passed an object. nil pointers are not permitted.
explicit CallSubscription(const RefCount<Call_> &aCall) : call(aCall) { assert(aCall != NULL); }
virtual AsyncCall::Pointer callback() const { return new Call_(call); }
virtual AsyncCall::Pointer callback() const { return new Call_(*call); }

private:
const RefCount<Call_> call; ///< gets copied to create callback calls
Expand Down
131 changes: 61 additions & 70 deletions src/client_side.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@
#include "ClientRequestContext.h"
#include "clientStream.h"
#include "comm.h"
#include "comm/Write.h"
#include "comm/ListenStateData.h"
#include "CommCalls.h"
#include "comm/Loops.h"
#include "comm/Write.h"
#include "comm/TcpAcceptor.h"
#include "ConnectionDetail.h"
#include "eui/Config.h"
#include "fde.h"
Expand All @@ -108,6 +109,7 @@
#include "ident/Config.h"
#include "ident/Ident.h"
#include "ip/Intercept.h"
#include "ipc/FdNotes.h"
#include "ipc/StartListening.h"
#include "MemBuf.h"
#include "MemObject.h"
Expand All @@ -134,34 +136,33 @@
#define comm_close comm_lingering_close
#endif

/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
/// dials clientListenerConnectionOpened call
class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
{
public:
typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
handler(aHandler), portCfg(aPortCfg) {}
typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {}

virtual void print(std::ostream &os) const {
startPrint(os) <<
", port=" << (void*)portCfg << ')';
}

virtual bool canDial(AsyncCall &) const { return true; }
virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); }

public:
Handler handler;

private:
http_port_list *portCfg; ///< from Config.Sockaddr.http
http_port_list *portCfg; ///< from Config.Sockaddr.http
Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened
int commOpenListenerFlags; ///< flags used by comm_open_listener
Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener
};


static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
#if USE_SSL
static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
#endif
static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub);

/* our socket-related context */

Expand Down Expand Up @@ -3115,14 +3116,14 @@ connStateCreate(const Ip::Address &peer, const Ip::Address &me, int fd, http_por

/** Handle a new connection on HTTP socket. */
void
httpAccept(int sock, int newfd, ConnectionDetail *details,
comm_err_t flag, int xerrno, void *data)
httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
http_port_list *s = (http_port_list *)data;
ConnStateData *connState = NULL;

if (flag != COMM_OK) {
debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
// Its possible the call was still queued when the client disconnected
debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}

Expand Down Expand Up @@ -3361,15 +3362,14 @@ clientNegotiateSSL(int fd, void *data)

/** handle a new HTTPS connection */
static void
httpsAccept(int sock, int newfd, ConnectionDetail *details,
comm_err_t flag, int xerrno, void *data)
httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
https_port_list *s = (https_port_list *)data;
SSL_CTX *sslContext = s->staticSslContext.get();

if (flag != COMM_OK) {
errno = xerrno;
debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
// Its possible the call was still queued when the client disconnected
debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}

Expand Down Expand Up @@ -3552,15 +3552,15 @@ ConnStateData::switchToHttps(const char *host)

/// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
static bool
OpenedHttpSocket(int fd, const char *msgIfFail)
OpenedHttpSocket(int fd, const Ipc::FdNoteId portType)
{
if (fd < 0) {
Must(NHttpSockets > 0); // we tried to open some
--NHttpSockets; // there will be fewer sockets than planned
Must(HttpSockets[NHttpSockets] < 0); // no extra fds received

if (!NHttpSockets) // we could not open any listen sockets at all
fatal(msgIfFail);
fatalf("Unable to open %s",FdNote(portType));

return false;
}
Expand Down Expand Up @@ -3616,13 +3616,16 @@ clientHttpConnectionsOpen(void)
const int openFlags = COMM_NONBLOCKING |
(s->spoof_client_ip ? COMM_TRANSPARENT : 0);

AsyncCall::Pointer callback = asyncCall(33,2,
"clientHttpConnectionOpened",
ListeningStartedDialer(&clientHttpConnectionOpened, s));
Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
Ipc::fdnHttpSocket, callback);
// setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);

AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub));
Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall);

HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
}

#if USE_SSL
Expand All @@ -3635,27 +3638,27 @@ clientHttpConnectionsOpen(void)

/// process clientHttpConnectionsOpen result
static void
clientHttpConnectionOpened(int fd, int, http_port_list *s)
clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub)
{
if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
s->listenFd = fd;
if (!OpenedHttpSocket(s->listenFd, portTypeNote))
return;

Must(s);
Must(s->listenFd >= 0);

AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
CommAcceptCbPtrFun(httpAccept, s));
// TCP: setup a job to handle accept() with subscribed handler
AsyncJob::Start(new Comm::TcpAcceptor(s->listenFd, s->s, flags, FdNote(portTypeNote), sub));

s->listener = new Comm::ListenStateData(fd, call, true);

debugs(1, 1, "Accepting " <<
debugs(1, 1, "Accepting" <<
(s->intercepted ? " intercepted" : "") <<
(s->spoof_client_ip ? " spoofing" : "") <<
(s->sslBump ? " bumpy" : "") <<
(s->accel ? " accelerated" : "")
<< " HTTP connections at " << s->s
<< ", FD " << fd << "." );
<< FdNote(portTypeNote) << " connections at "
<< " FD " << s->listenFd << " on " << s->s);

Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
}

#if USE_SSL
Expand All @@ -3677,35 +3680,23 @@ clientHttpsConnectionsOpen(void)
continue;
}

AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));

Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
Ipc::fdnHttpsSocket, call);

HttpSockets[NHttpSockets++] = -1;
}
}

/// process clientHttpsConnectionsOpen result
static void
clientHttpsConnectionOpened(int fd, int, http_port_list *s)
{
if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
return;

Must(s);
const int openFlags = COMM_NONBLOCKING |
(s->spoof_client_ip ? COMM_TRANSPARENT : 0);

AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
CommAcceptCbPtrFun(httpsAccept, s));
// setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);

s->listener = new Comm::ListenStateData(fd, call, true);
AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
ListeningStartedDialer(&clientListenerConnectionOpened, openFlags,
&s->http, Ipc::fdnHttpsSocket, sub));

debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall);

Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
HttpSockets[NHttpSockets++] = -1;
}
}

#endif

void
Expand All @@ -3724,19 +3715,19 @@ void
clientHttpConnectionsClose(void)
{
for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
if (s->listener) {
debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
delete s->listener;
s->listener = NULL;
if (s->listenFd >= 0) {
debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
comm_close(s->listenFd);
s->listenFd = -1;
}
}

#if USE_SSL
for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
if (s->listener) {
debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
delete s->listener;
s->listener = NULL;
if (s->listenFd >= 0) {
debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
comm_close(s->listenFd);
s->listenFd = -1;
}
}
#endif
Expand Down
Loading

0 comments on commit cbff89b

Please sign in to comment.