From cbff89ba6b249941fe47ad28d4ac2ed50e286b94 Mon Sep 17 00:00:00 2001 From: Amos Jeffries Date: Wed, 26 Jan 2011 16:47:13 +1300 Subject: [PATCH] Bug 2581, Bug 3081, Bug 2948: various TCP socket connection problems Bug 3081: During conversion of listening socket handlers to AsyncCalls a violation of the AsyncCall API was introduced. Resulting in occasional crashes from invalid re-use of call objects. This implements a TcpAcceptor async job which receives a listening socket and a CallSubscription. For every connection attempt on the listener socket a new AsyncCall is spawned from the subscription template. Initial users are the HTTP and HTTPS listening sockets and FTP data channel. In order to implement this job in FTP the logics surrounding data channel handling had to be extended and reworked. Fixing bug 2948 and 2581 in the process. --- src/CommCalls.h | 12 + src/ProtoPort.cc | 13 +- src/ProtoPort.h | 10 +- src/base/AsyncCall.h | 13 + src/base/Subscription.h | 2 +- src/client_side.cc | 131 ++++---- src/comm.cc | 6 +- src/comm/AcceptLimiter.cc | 33 +- src/comm/AcceptLimiter.h | 9 +- src/comm/ListenStateData.h | 54 ---- src/comm/Makefile.am | 4 +- .../{ListenStateData.cc => TcpAcceptor.cc} | 225 +++++++++---- src/comm/TcpAcceptor.h | 99 ++++++ src/ftp.cc | 303 ++++++++---------- 14 files changed, 531 insertions(+), 383 deletions(-) delete mode 100644 src/comm/ListenStateData.h rename src/comm/{ListenStateData.cc => TcpAcceptor.cc} (54%) create mode 100644 src/comm/TcpAcceptor.h diff --git a/src/CommCalls.h b/src/CommCalls.h index 84e4a72875a..7bf0ded84a4 100644 --- a/src/CommCalls.h +++ b/src/CommCalls.h @@ -176,6 +176,7 @@ class CommAcceptCbPtrFun: public CallDialer, { public: typedef CommAcceptCbParams Params; + typedef RefCount Pointer; CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); void dial(); @@ -259,11 +260,19 @@ template class CommCbFunPtrCallT: public AsyncCall { public: + typedef RefCount > Pointer; typedef typename Dialer::Params Params; inline CommCbFunPtrCallT(int debugSection, int debugLevel, const char *callName, const Dialer &aDialer); + inline CommCbFunPtrCallT(const CommCbFunPtrCallT &o) : + AsyncCall(o.debugSection, o.debugLevel, o.name), + dialer(o.dialer) + {} + + ~CommCbFunPtrCallT() {} + virtual CallDialer* getDialer() { return &dialer; } public: @@ -272,6 +281,9 @@ class CommCbFunPtrCallT: public AsyncCall protected: inline virtual bool canFire(); inline virtual void fire(); + +private: + CommCbFunPtrCallT & operator=(const CommCbFunPtrCallT &); // not defined. not permitted. }; // Conveninece wrapper: It is often easier to call a templated function than diff --git a/src/ProtoPort.cc b/src/ProtoPort.cc index 98d4142a629..de42e8e204a 100644 --- a/src/ProtoPort.cc +++ b/src/ProtoPort.cc @@ -3,15 +3,17 @@ */ #include "squid.h" +#include "comm.h" #include "ProtoPort.h" #if HAVE_LIMITS #include #endif -http_port_list::http_port_list(const char *aProtocol) +http_port_list::http_port_list(const char *aProtocol) : + listenFd(-1) #if USE_SSL - : - http(*this), dynamicCertMemCacheSize(std::numeric_limits::max()) + , http(*this) + , dynamicCertMemCacheSize(std::numeric_limits::max()) #endif { protocol = xstrdup(aProtocol); @@ -19,7 +21,10 @@ http_port_list::http_port_list(const char *aProtocol) http_port_list::~http_port_list() { - delete listener; + if (listenFd >= 0) { + comm_close(listenFd); + listenFd = -1; + } safe_free(name); safe_free(defaultsite); diff --git a/src/ProtoPort.h b/src/ProtoPort.h index 6bee84f5786..323a6d2084b 100644 --- a/src/ProtoPort.h +++ b/src/ProtoPort.h @@ -4,9 +4,7 @@ #ifndef SQUID_PROTO_PORT_H #define SQUID_PROTO_PORT_H -//#include "typedefs.h" #include "cbdata.h" -#include "comm/ListenStateData.h" #if USE_SSL #include "ssl/gadgets.h" @@ -43,11 +41,11 @@ struct http_port_list { } tcp_keepalive; /** - * The FD listening socket handler. - * If not NULL we are actively listening for client requests. - * delete to close the socket. + * The FD listening socket. + * If >= 0 we are actively listening for client requests. + * use comm_close(listenFd) to stop. */ - Comm::ListenStateData *listener; + int listenFd; #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port diff --git a/src/base/AsyncCall.h b/src/base/AsyncCall.h index 12da25a733d..74f2864baec 100644 --- a/src/base/AsyncCall.h +++ b/src/base/AsyncCall.h @@ -84,6 +84,10 @@ class AsyncCall: public RefCountable private: const char *isCanceled; // set to the cancelation reason by cancel() + + // not implemented to prevent nil calls from being passed around and unknowingly scheduled, for now. + AsyncCall(); + AsyncCall(const AsyncCall &); }; inline @@ -122,6 +126,12 @@ class AsyncCallT: public AsyncCall const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName), dialer(aDialer) {} + AsyncCallT(const AsyncCallT &o): + AsyncCall(o.debugSection, o.debugLevel, o.name), + dialer(o.dialer) {} + + ~AsyncCallT() {} + CallDialer *getDialer() { return &dialer; } protected: @@ -132,6 +142,9 @@ class AsyncCallT: public AsyncCall virtual void fire() { dialer.dial(*this); } Dialer dialer; + +private: + AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted. }; template diff --git a/src/base/Subscription.h b/src/base/Subscription.h index ec9cc517f72..74e97bba875 100644 --- a/src/base/Subscription.h +++ b/src/base/Subscription.h @@ -42,7 +42,7 @@ class CallSubscription: public Subscription public: /// Must be passed an object. nil pointers are not permitted. explicit CallSubscription(const RefCount &aCall) : call(aCall) { assert(aCall != NULL); } - virtual AsyncCall::Pointer callback() const { return new Call_(call); } + virtual AsyncCall::Pointer callback() const { return new Call_(*call); } private: const RefCount call; ///< gets copied to create callback calls diff --git a/src/client_side.cc b/src/client_side.cc index efa2d5bb1e2..391d58e7818 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -96,9 +96,10 @@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" -#include "comm/Write.h" -#include "comm/ListenStateData.h" +#include "CommCalls.h" #include "comm/Loops.h" +#include "comm/Write.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" @@ -108,6 +109,7 @@ #include "ident/Config.h" #include "ident/Ident.h" #include "ip/Intercept.h" +#include "ipc/FdNotes.h" #include "ipc/StartListening.h" #include "MemBuf.h" #include "MemObject.h" @@ -134,13 +136,13 @@ #define comm_close comm_lingering_close #endif -/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call +/// dials clientListenerConnectionOpened call class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: - typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg); - ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg): - handler(aHandler), portCfg(aPortCfg) {} + typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub); + ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub): + handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {} virtual void print(std::ostream &os) const { startPrint(os) << @@ -148,20 +150,19 @@ class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb } virtual bool canDial(AsyncCall &) const { return true; } - virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); } + virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); } public: Handler handler; private: - http_port_list *portCfg; ///< from Config.Sockaddr.http + http_port_list *portCfg; ///< from Config.Sockaddr.http + Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened + int commOpenListenerFlags; ///< flags used by comm_open_listener + Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener }; - -static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s); -#if USE_SSL -static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s); -#endif +static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub); /* our socket-related context */ @@ -3115,14 +3116,14 @@ connStateCreate(const Ip::Address &peer, const Ip::Address &me, int fd, http_por /** Handle a new connection on HTTP socket. */ void -httpAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; if (flag != COMM_OK) { - debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // Its possible the call was still queued when the client disconnected + debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3361,15 +3362,14 @@ clientNegotiateSSL(int fd, void *data) /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->staticSslContext.get(); if (flag != COMM_OK) { - errno = xerrno; - debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // Its possible the call was still queued when the client disconnected + debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } @@ -3552,7 +3552,7 @@ ConnStateData::switchToHttps(const char *host) /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed static bool -OpenedHttpSocket(int fd, const char *msgIfFail) +OpenedHttpSocket(int fd, const Ipc::FdNoteId portType) { if (fd < 0) { Must(NHttpSockets > 0); // we tried to open some @@ -3560,7 +3560,7 @@ OpenedHttpSocket(int fd, const char *msgIfFail) Must(HttpSockets[NHttpSockets] < 0); // no extra fds received if (!NHttpSockets) // we could not open any listen sockets at all - fatal(msgIfFail); + fatalf("Unable to open %s",FdNote(portType)); return false; } @@ -3616,13 +3616,16 @@ clientHttpConnectionsOpen(void) const int openFlags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer callback = asyncCall(33,2, - "clientHttpConnectionOpened", - ListeningStartedDialer(&clientHttpConnectionOpened, s)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, - Ipc::fdnHttpSocket, callback); + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub)); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall); - HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened + HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened } #if USE_SSL @@ -3635,27 +3638,27 @@ clientHttpConnectionsOpen(void) /// process clientHttpConnectionsOpen result static void -clientHttpConnectionOpened(int fd, int, http_port_list *s) +clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub) { - if (!OpenedHttpSocket(fd, "Cannot open HTTP Port")) + s->listenFd = fd; + if (!OpenedHttpSocket(s->listenFd, portTypeNote)) return; Must(s); + Must(s->listenFd >= 0); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", - CommAcceptCbPtrFun(httpAccept, s)); + // TCP: setup a job to handle accept() with subscribed handler + AsyncJob::Start(new Comm::TcpAcceptor(s->listenFd, s->s, flags, FdNote(portTypeNote), sub)); - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting " << + debugs(1, 1, "Accepting" << (s->intercepted ? " intercepted" : "") << (s->spoof_client_ip ? " spoofing" : "") << (s->sslBump ? " bumpy" : "") << (s->accel ? " accelerated" : "") - << " HTTP connections at " << s->s - << ", FD " << fd << "." ); + << FdNote(portTypeNote) << " connections at " + << " FD " << s->listenFd << " on " << s->s); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for } #if USE_SSL @@ -3677,35 +3680,23 @@ clientHttpsConnectionsOpen(void) continue; } - AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened", - ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING, - Ipc::fdnHttpsSocket, call); - - HttpSockets[NHttpSockets++] = -1; - } -} - -/// process clientHttpsConnectionsOpen result -static void -clientHttpsConnectionOpened(int fd, int, http_port_list *s) -{ - if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port")) - return; - - Must(s); + const int openFlags = COMM_NONBLOCKING | + (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", - CommAcceptCbPtrFun(httpsAccept, s)); + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); - s->listener = new Comm::ListenStateData(fd, call, true); + AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, + &s->http, Ipc::fdnHttpsSocket, sub)); - debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << "."); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + HttpSockets[NHttpSockets++] = -1; + } } - #endif void @@ -3724,19 +3715,19 @@ void clientHttpConnectionsClose(void) { for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #if USE_SSL for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #endif diff --git a/src/comm.cc b/src/comm.cc index ad28dd1cea9..f88caff2cb6 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -40,9 +40,9 @@ #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" #include "comm/IoCallback.h" -#include "comm/Write.h" -#include "comm/ListenStateData.h" #include "comm/Loops.h" +#include "comm/Write.h" +#include "comm/TcpAcceptor.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" @@ -144,7 +144,7 @@ fd_debug_t *fdd_table = NULL; bool isOpen(const int fd) { - return fd_table[fd].flags.open != 0; + return fd >= 0 && fd_table[fd].flags.open != 0; } /** diff --git a/src/comm/AcceptLimiter.cc b/src/comm/AcceptLimiter.cc index 7881db0348a..d0f5763e0f0 100644 --- a/src/comm/AcceptLimiter.cc +++ b/src/comm/AcceptLimiter.cc @@ -1,6 +1,6 @@ #include "config.h" #include "comm/AcceptLimiter.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; @@ -11,22 +11,41 @@ Comm::AcceptLimiter &Comm::AcceptLimiter::Instance() } void -Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd) { afd->isLimited++; debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); deferred.push_back(afd); } +void +Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd) +{ + for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) { + if (deferred[i] == afd) { + deferred[i]->isLimited--; + deferred[i] = NULL; // fast. kick() will skip empty entries later. + debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + } + } +} + void Comm::AcceptLimiter::kick() { + // TODO: this could be optimized further with an iterator to search + // looking for first non-NULL, followed by dumping the first N + // with only one shift()/pop_front operation + debugs(5, 5, HERE << " size=" << deferred.size()); - if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { - debugs(5, 5, HERE << " doing one."); + while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ - ListenStateData *temp = deferred.shift(); - temp->isLimited--; - temp->acceptNext(); + TcpAcceptor *temp = deferred.shift(); + if (temp != NULL) { + debugs(5, 5, HERE << " doing one."); + temp->isLimited--; + temp->acceptNext(); + break; + } } } diff --git a/src/comm/AcceptLimiter.h b/src/comm/AcceptLimiter.h index 57313b142c1..3d5540f7e47 100644 --- a/src/comm/AcceptLimiter.h +++ b/src/comm/AcceptLimiter.h @@ -6,7 +6,7 @@ namespace Comm { -class ListenStateData; +class TcpAcceptor; /** * FIFO Queue holding listener socket handlers which have been activated @@ -25,7 +25,10 @@ class AcceptLimiter static AcceptLimiter &Instance(); /** delay accepting a new client connection. */ - void defer(Comm::ListenStateData *afd); + void defer(Comm::TcpAcceptor *afd); + + /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */ + void removeDead(const Comm::TcpAcceptor *afd); /** try to accept and begin processing any delayed client connections. */ void kick(); @@ -34,7 +37,7 @@ class AcceptLimiter static AcceptLimiter Instance_; /** FIFO queue */ - Vector deferred; + Vector deferred; }; }; // namepace Comm diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h deleted file mode 100644 index b5b5872f789..00000000000 --- a/src/comm/ListenStateData.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef SQUID_LISTENERSTATEDATA_H -#define SQUID_LISTENERSTATEDATA_H - -#include "base/AsyncCall.h" -#include "comm.h" -#if HAVE_MAP -#include -#endif - -class ConnectionDetail; - -namespace Comm -{ - -class ListenStateData -{ - -public: - ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); - ListenStateData(const ListenStateData &r); // not implemented. - ~ListenStateData(); - - void subscribe(AsyncCall::Pointer &call); - void acceptNext(); - void notify(int newfd, comm_err_t flag, const ConnectionDetail &details); - - int fd; - - /// errno code of the last accept() or listen() action if one occurred. - int errcode; - - /// whether this socket is delayed and on the AcceptLimiter queue. - int32_t isLimited; - -private: - /// Method to test if there are enough file escriptors to open a new client connection - /// if not the accept() will be postponed - static bool okToAccept(); - - /// Method callback for whenever an FD is ready to accept a client connection. - static void doAccept(int fd, void *data); - - void acceptOne(); - int oldAccept(ConnectionDetail &details); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; - - void setListen(); -}; - -} // namespace Comm - -#endif /* SQUID_LISTENERSTATEDATA_H */ diff --git a/src/comm/Makefile.am b/src/comm/Makefile.am index 3ddf7797da2..cdb8f5032a8 100644 --- a/src/comm/Makefile.am +++ b/src/comm/Makefile.am @@ -7,8 +7,6 @@ noinst_LTLIBRARIES = libcomm.la libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ Loops.h \ ModDevPoll.cc \ ModEpoll.cc \ @@ -16,6 +14,8 @@ libcomm_la_SOURCES= \ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + TcpAcceptor.cc \ + TcpAcceptor.h \ \ IoCallback.cc \ IoCallback.h \ diff --git a/src/comm/ListenStateData.cc b/src/comm/TcpAcceptor.cc similarity index 54% rename from src/comm/ListenStateData.cc rename to src/comm/TcpAcceptor.cc index 62816145f56..6931a87e397 100644 --- a/src/comm/ListenStateData.cc +++ b/src/comm/TcpAcceptor.cc @@ -33,16 +33,104 @@ */ #include "squid.h" +#include "base/TextException.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" -#include "comm/ListenStateData.h" #include "comm/Loops.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" +namespace Comm { + CBDATA_CLASS_INIT(TcpAcceptor); +}; + +Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::TcpAcceptor"), + errcode(0), + fd(listenFd), + isLimited(0), + theCallSub(aSub), + local_addr(laddr) +{} + +void +Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << aSub); + unsubscribe("subscription change"); + theCallSub = aSub; +} + +void +Comm::TcpAcceptor::unsubscribe(const char *reason) +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription " << theCallSub << " removed: " << reason); + theCallSub = NULL; +} + +void +Comm::TcpAcceptor::start() +{ + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << theCallSub); + + Must(isOpen(fd)); + + setListen(); + + // if no error so far start accepting connections. + if (errcode == 0) + SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +bool +Comm::TcpAcceptor::doneAll() const +{ + // stop when FD is closed + if (!isOpen(fd)) { + return AsyncJob::doneAll(); + } + + // stop when handlers are gone + if (theCallSub == NULL) { + return AsyncJob::doneAll(); + } + + // open FD with handlers...keep accepting. + return false; +} + +void +Comm::TcpAcceptor::swanSong() +{ + debugs(5,5, HERE); + unsubscribe("swanSong"); + fd = -1; + AcceptLimiter::Instance().removeDead(this); + AsyncJob::swanSong(); +} + +const char * +Comm::TcpAcceptor::status() const +{ + static char ipbuf[MAX_IPSTRLEN] = {'\0'}; + if (ipbuf[0] == '\0') + local_addr.ToHostname(ipbuf, MAX_IPSTRLEN); + + static MemBuf buf; + buf.reset(); + buf.Printf(" FD %d, %s",fd, ipbuf); + + const char *jobStatus = AsyncJob::status(); + buf.append(jobStatus, strlen(jobStatus)); + + return buf.content(); +} + /** * New-style listen and accept routines * @@ -51,11 +139,11 @@ * accept()ed some time later. */ void -Comm::ListenStateData::setListen() +Comm::TcpAcceptor::setListen() { errcode = 0; // reset local errno copy. if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + debugs(50, DBG_CRITICAL, "ERROR: listen(" << status() << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } @@ -67,37 +155,19 @@ Comm::ListenStateData::setListen() debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); + debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } } -Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : - fd(aFd), - theCallback(call), - mayAcceptMore(accept_many) -{ - assert(aFd >= 0); - debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); - assert(isOpen(aFd)); - setListen(); - SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); -} - -Comm::ListenStateData::~ListenStateData() -{ - comm_close(fd); - fd = -1; -} - /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection @@ -108,23 +178,30 @@ Comm::ListenStateData::~ListenStateData() * done later when enough sockets become available. */ void -Comm::ListenStateData::doAccept(int fd, void *data) +Comm::TcpAcceptor::doAccept(int fd, void *data) { - debugs(5, 2, HERE << "New connection on FD " << fd); + try { + debugs(5, 2, HERE << "New connection on FD " << fd); - assert(isOpen(fd)); - ListenStateData *afd = static_cast(data); + Must(isOpen(fd)); + TcpAcceptor *afd = static_cast(data); + + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } else { + afd->acceptNext(); + } + SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0); - if (!okToAccept()) { - AcceptLimiter::Instance().defer(afd); - } else { - afd->acceptNext(); + } catch(const std::exception &e) { + fatalf("FATAL: error while accepting new client connection: %s\n", e.what()); + } catch(...) { + fatal("FATAL: error while accepting new client connection: [unkown]\n"); } - SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); } bool -Comm::ListenStateData::okToAccept() +Comm::TcpAcceptor::okToAccept() { static time_t last_warn = 0; @@ -140,7 +217,7 @@ Comm::ListenStateData::okToAccept() } void -Comm::ListenStateData::acceptOne() +Comm::TcpAcceptor::acceptOne() { /* * We don't worry about running low on FDs here. Instead, @@ -149,42 +226,45 @@ Comm::ListenStateData::acceptOne() */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + ConnectionDetail newConnDetails; + int newFd = -1; + const comm_err_t flag = oldAccept(newConnDetails, &newFd); /* Check for errors */ - if (newfd < 0) { + if (!isOpen(newFd)) { - if (newfd == COMM_NOMESSAGE) { + if (flag == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub); SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); - notify(-1, COMM_ERROR, connDetails); - mayAcceptMore = false; + debugs(5, 5, HERE << "non-recoverable error:" << status() << " handler Subscription: " << theCallSub); + notify(flag, newConnDetails, newFd); + mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << theCallback); - notify(newfd, COMM_OK, connDetails); + debugs(5, 5, HERE << "Listener: FD " << fd << + " accepted new connection from " << newConnDetails.peer << + " handler Subscription: " << theCallSub); + notify(flag, newConnDetails, newFd); } void -Comm::ListenStateData::acceptNext() +Comm::TcpAcceptor::acceptNext() { - assert(isOpen(fd)); + Must(isOpen(fd)); debugs(5, 2, HERE << "connection on FD " << fd); acceptOne(); } +// XXX: obsolete comment? +// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void -Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails) +Comm::TcpAcceptor::notify(const comm_err_t flag, const ConnectionDetail &connDetails, int newFd) const { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@ -192,26 +272,29 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail return; } - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); + if (theCallSub != NULL) { + AsyncCall::Pointer call = theCallSub->callback(); + CommAcceptCbParams ¶ms = GetCommParams(call); params.fd = fd; - params.nfd = newfd; + params.nfd = newFd; params.details = connDetails; params.flag = flag; params.xerrno = errcode; - ScheduleCallHere(theCallback); - if (!mayAcceptMore) - theCallback = NULL; + ScheduleCallHere(call); } } /** * accept() and process - * Wait for an incoming connection on FD. + * Wait for an incoming connection on our listener socket. + * + * \retval COMM_OK success. details parameter filled. + * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in. + * \retval COMM_ERROR an outright failure occured. + * Or if this client has too many connections already. */ -int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +comm_err_t +Comm::TcpAcceptor::oldAccept(ConnectionDetail &details, int *newFd) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; @@ -228,17 +311,19 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details) PROF_stop(comm_accept); if (ignoreErrno(errno)) { - debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 5, HERE << status() << ": " << xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { - debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 3, HERE << status() << ": " << xstrerror()); return COMM_ERROR; } else { - debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror()); + debugs(50, 1, HERE << status() << ": " << xstrerror()); return COMM_ERROR; } } + Must(sock >= 0); + *newFd = sock; details.peer = *gai; if ( Config.client_ip_max_connections >= 0) { @@ -249,15 +334,16 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details) } } + // lookup the local-end details of this new connection details.me.InitAddrInfo(gai); - details.me.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); details.me = *gai; - - commSetCloseOnExec(sock); + details.me.FreeAddrInfo(gai); /* fdstat update */ + // XXX : these are not all HTTP requests. use a note about type and ip:port details-> + // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port" fd_open(sock, FD_SOCKET, "HTTP Request"); fdd_table[sock].close_file = NULL; @@ -266,15 +352,16 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details) fde *F = &fd_table[sock]; details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); + F->local_addr = details.me; F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET; - details.me.FreeAddrInfo(gai); + // set socket flags + commSetCloseOnExec(sock); commSetNonBlocking(sock); /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ F->flags.transparent = fd_table[fd].flags.transparent; PROF_stop(comm_accept); - return sock; + return COMM_OK; } diff --git a/src/comm/TcpAcceptor.h b/src/comm/TcpAcceptor.h new file mode 100644 index 00000000000..1c3a12fb03b --- /dev/null +++ b/src/comm/TcpAcceptor.h @@ -0,0 +1,99 @@ +#ifndef SQUID_COMM_TCPACCEPTOR_H +#define SQUID_COMM_TCPACCEPTOR_H + +#include "base/AsyncCall.h" +#include "base/Subscription.h" +#include "CommCalls.h" +#include "comm_err_t.h" +#include "comm/TcpAcceptor.h" +#include "ip/Address.h" + +#if HAVE_MAP +#include +#endif + +namespace Comm +{ + +class AcceptLimiter; + +/** + * Listens on an FD for new incoming connections and + * emits an active FD descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class TcpAcceptor : public AsyncJob +{ +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); + virtual const char *status() const; + + TcpAcceptor(const TcpAcceptor &); // not implemented. + +public: + TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub); + + /** Subscribe a handler to receive calls back about new connections. + * Unsubscribes any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Already scheduled callbacks remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ + void acceptNext(); + + /// Call the subscribed callback handler with details about a new connection. + void notify(const comm_err_t flags, const ConnectionDetail &newConnDetails, const int newFd) const; + + /// errno code of the last accept() or listen() action if one occurred. + int errcode; + + /// conn being listened on for new connections + /// Reserved for read-only use. + // NP: public only until we can hide it behind connection handles + int fd; + +protected: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + +private: + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + + /// IP Address and port being listened on + Ip::Address local_addr; + + /// Method to test if there are enough file descriptors to open a new client connection + /// if not the accept() will be postponed + static bool okToAccept(); + + /// Method callback for whenever an FD is ready to accept a client connection. + static void doAccept(int fd, void *data); + + void acceptOne(); + comm_err_t oldAccept(ConnectionDetail &newConnDetails, int *fd); + void setListen(); + + CBDATA_CLASS2(TcpAcceptor); +}; + +} // namespace Comm + +#endif /* SQUID_COMM_TCPACCEPTOR_H */ diff --git a/src/ftp.cc b/src/ftp.cc index 9a1c3d50e1b..752c2ebf600 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -34,8 +34,9 @@ #include "squid.h" #include "comm.h" +#include "CommCalls.h" +#include "comm/TcpAcceptor.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "compat/strtoll.h" #include "ConnectionDetail.h" #include "errorpage.h" @@ -153,13 +154,11 @@ class FtpChannel void clear(); /// just resets fd and close handler. does not close active connections. - int fd; /// channel descriptor; \todo: remove because the closer has it + int fd; /// channel descriptor - /** Current listening socket handler. delete on shutdown or abort. - * FTP stores a copy of the FD in the field fd above. - * Use close() to properly close the channel. - */ - Comm::ListenStateData *listener; + Ip::Address local; ///< The local IP address:port this channel is using + + int flags; ///< socket flags used when opening. private: AsyncCall::Pointer closer; /// Comm close handler callback @@ -245,6 +244,12 @@ class FtpStateData : public ServerStateData void completedListing(void); void dataComplete(); void dataRead(const CommIoCbParams &io); + + /// ignore timeout on CTRL channel. set read timeout on DATA channel. + void switchTimeoutToDataChannel(); + /// create a data channel acceptor and start listening. + void listenForDataChannel(const int fd, const char *note); + int checkAuth(const HttpHeader * req_hdr); void checkUrlpath(); void buildTitleUrl(); @@ -443,6 +448,7 @@ FTPSM *FTP_SM_FUNCS[] = { void FtpStateData::ctrlClosed(const CommCloseCbParams &io) { + debugs(9, 4, HERE); ctrl.clear(); deleteThis("FtpStateData::ctrlClosed"); } @@ -451,10 +457,10 @@ FtpStateData::ctrlClosed(const CommCloseCbParams &io) void FtpStateData::dataClosed(const CommCloseCbParams &io) { - if (data.listener) { - delete data.listener; - data.listener = NULL; - data.fd = -1; + debugs(9, 4, HERE); + if (data.fd >= 0) { + comm_close(data.fd); + // NP clear() does the: data.fd = -1; } data.clear(); failed(ERR_FTP_FAILURE, 0); @@ -605,6 +611,46 @@ FtpStateData::loginParser(const char *login, int escaped) debugs(9, 9, HERE << ": OUT: login='" << login << "', escaped=" << escaped << ", user=" << user << ", password=" << password); } +void +FtpStateData::switchTimeoutToDataChannel() +{ + commSetTimeout(ctrl.fd, -1, NULL, NULL); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); +} + +void +FtpStateData::listenForDataChannel(const int fd, const char *note) +{ + assert(data.fd < 0); + + typedef CommCbMemFunT AcceptDialer; + typedef AsyncCallT AcceptCall; + RefCount call = static_cast(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection)); + Subscription::Pointer sub = new CallSubscription(call); + + /* open the conn if its not already open */ + int newFd = fd; + if (newFd < 0) { + newFd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, data.local, data.flags, note); + if (newFd < 0) { + debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << data.local << " error: " << errno); + return; + } + debugs(9, 3, HERE << "Unconnected data socket created on FD " << newFd << ", " << data.local); + } + + assert(newFd >= 0); + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(newFd, data.local, data.flags, note, sub); + AsyncJob::Start(tmp); + + // Ensure we have a copy of the FD opened for listening and a close handler on it. + data.opened(newFd, dataCloser()); + switchTimeoutToDataChannel(); +} + void FtpStateData::ftpTimeout(const CommTimeoutCbParams &io) { @@ -1066,10 +1112,16 @@ FtpStateData::parseListing() usable = end - sbuf; - debugs(9, 3, HERE << "usable = " << usable); + debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes."); if (usable == 0) { - debugs(9, 3, HERE << "didn't find end for " << entry->url() ); + if (buf[0] == '\0' && len == 1) { + debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?"); + data.readBuf->consume(len); + } else { + debugs(9, 3, HERE << "didn't find end for " << entry->url()); + debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'"); + } xfree(sbuf); return; } @@ -1138,7 +1190,14 @@ FtpStateData::dataComplete() * status code after the data command. FtpStateData was being * deleted in the middle of dataRead(). */ - scheduleReadControlReply(0); + /* AYJ: 2011-01-13: Bug 2581. + * 226 status is possibly waiting in the ctrl buffer. + * The connection will hang if we DONT send buffered_ok. + * This happens on all transfers which can be completly sent by the + * server before the 150 started status message is read in by Squid. + * ie all transfers of about one packet hang. + */ + scheduleReadControlReply(1); } void @@ -1674,7 +1733,7 @@ FtpStateData::scheduleReadControlReply(int buffered_ok) * establish one on the control socket. */ - if (data.fd > -1) { + if (data.fd >= 0) { AsyncCall::Pointer nullCall = NULL; commSetTimeout(data.fd, -1, nullCall); } @@ -2722,27 +2781,24 @@ FtpStateData::ftpPasvCallback(int fd, const DnsLookupDetails &dns, comm_err_t st static int ftpOpenListenSocket(FtpStateData * ftpState, int fallback) { - int fd; - Ip::Address addr; struct addrinfo *AI = NULL; - int on = 1; int x = 0; /// Close old data channels, if any. We may open a new one below. - ftpState->data.close(); + if ((ftpState->data.flags & COMM_REUSEADDR)) + // NP: in fact it points to the control channel. just clear it. + ftpState->data.clear(); + else + ftpState->data.close(); /* * Set up a listen socket on the same local address as the * control connection. */ - - addr.InitAddrInfo(AI); - + ftpState->data.local.InitAddrInfo(AI); x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen); - - addr = *AI; - - addr.FreeAddrInfo(AI); + ftpState->data.local = *AI; + ftpState->data.local.FreeAddrInfo(AI); if (x) { debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror()); @@ -2754,38 +2810,18 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback) * used for both control and data. */ if (fallback) { + int on = 1; setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ftpState->ctrl.flags |= COMM_REUSEADDR; + ftpState->data.flags |= COMM_REUSEADDR; } else { /* if not running in fallback mode a new port needs to be retrieved */ - addr.SetPort(0); - } - - fd = comm_open(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0), - ftpState->entry->url()); - debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd ); - - if (fd < 0) { - debugs(9, DBG_CRITICAL, HERE << "comm_open failed"); - return -1; - } - - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false); - - if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) { - comm_close(fd); - return -1; + ftpState->data.local.SetPort(0); + ftpState->data.flags = COMM_NONBLOCKING; } - ftpState->data.opened(fd, ftpState->dataCloser()); - ftpState->data.port = comm_local_port(fd); - ftpState->data.host = NULL; - return fd; + ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url()); + return ftpState->data.fd; } /// \ingroup ServerProtocolFTPInternal @@ -2881,6 +2917,7 @@ ftpSendEPRT(FtpStateData * ftpState) debugs(9, 3, HERE); ftpState->flags.pasv_supported = 0; fd = ftpOpenListenSocket(ftpState, 0); + debugs(9, 3, "Listening for FTP data connection with FD " << fd); Ip::Address::InitAddrInfo(AI); @@ -2933,77 +2970,68 @@ ftpReadEPRT(FtpStateData * ftpState) */ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { - char ntoapeer[MAX_IPSTRLEN]; - debugs(9, 3, "ftpAcceptDataConnection"); - - // one connection accepted. the handler has stopped listening. drop our local pointer to it. - data.listener = NULL; + debugs(9, 3, HERE); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); return; } + if (io.flag != COMM_OK) { + data.close(); + debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno)); + /** \todo Need to send error message on control channel*/ + ftpFail(this); + return; + } + + /* data listening conn is no longer even open. abort. */ + if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) { + data.clear(); // ensure that it's cleared and not just closed. + return; + } + /** \par * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being * made by the remote client which is connected to the FTP control socket. + * Or the one which we were told to listen for by control channel messages (may differ under NAT). * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { + char ntoapeer[MAX_IPSTRLEN]; io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); - if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { + if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 && + strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << io.details.peer << "), expecting " << - fd_table[ctrl.fd].ipaddr); + fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr); - /* close the bad soures connection down ASAP. */ + /* close the bad sources connection down ASAP. */ comm_close(io.nfd); - /* we are ony accepting once, so need to re-open the listener socket. */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + /* drop the bad connection (io) by ignoring the attempt. */ return; } } - if (io.flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno)); - /** \todo XXX Need to set error message */ - ftpFail(this); - return; - } - /**\par - * Replace the Listen socket with the accepted data socket */ + * Replace the Listening socket with the accepted data socket */ data.close(); data.opened(io.nfd, dataCloser()); data.port = io.details.peer.GetPort(); - io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); + data.host = xstrdup(fd_table[io.nfd].ipaddr); debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << "data-peer= " << fd_table[data.fd].ipaddr); + assert(haveControlChannel("ftpAcceptDataConnection")); + assert(ctrl.message == NULL); - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - - /*\todo XXX We should have a flag to track connect state... - * host NULL -> not connected, port == local port - * host set -> connected, port == remote port - */ - /* Restart state (SENT_NLST/LIST/RETR) */ - FTP_SM_FUNCS[state] (this); + // Ctrl channel operations will determine what happens to this data connection } /// \ingroup ServerProtocolFTPInternal @@ -3075,34 +3103,17 @@ void FtpStateData::readStor() return; } - /*\par - * When client status is 125, or 150 without a hostname, Begin data transfer. */ + /* When client status is 125, or 150 without a hostname, Begin data transfer. */ debugs(9, 3, HERE << "starting data transfer"); + switchTimeoutToDataChannel(); sendMoreRequestBody(); - /** \par - * Cancel the timeout on the Control socket and - * establish one on the data socket. - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - state = WRITING_DATA; debugs(9, 3, HERE << "writing data channel"); } else if (code == 150) { /*\par - * When client code is 150 with a hostname, Accept data channel. */ + * When client code is 150 without a hostname, Accept data channel. */ debugs(9, 3, "ftpReadStor: accepting data channel"); - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + listenForDataChannel(data.fd, data.host); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); @@ -3222,34 +3233,15 @@ ftpReadList(FtpStateData * ftpState) if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ - /* XXX what about Config.Timeout.read? */ + debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); return; } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); @@ -3285,32 +3277,12 @@ ftpReadRetr(FtpStateData * ftpState) if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debugs(9, 3, HERE << "reading data channel"); - /* XXX what about Config.Timeout.read? */ + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ @@ -3965,6 +3937,13 @@ FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser) fd = aFd; closer = aCloser; comm_add_close_handler(fd, closer); + + // grab the local IP address:port details for this connection + struct addrinfo *AI = NULL; + local.InitAddrInfo(AI); + getsockname(aFd, AI->ai_addr, &AI->ai_addrlen); + local = *AI; + local.FreeAddrInfo(AI); } /// planned close: removes the close handler and calls comm_close @@ -3972,15 +3951,11 @@ void FtpChannel::close() { // channels with active listeners will be closed when the listener handler dies. - if (listener) { - delete listener; - listener = NULL; - comm_remove_close_handler(fd, closer); - closer = NULL; - fd = -1; - } else if (fd >= 0) { - comm_remove_close_handler(fd, closer); - closer = NULL; + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } comm_close(fd); // we do not expect to be called back fd = -1; }