diff --git a/include/scoreboard.h b/include/scoreboard.h index b0bdc6f13b0..e2decc65af6 100644 --- a/include/scoreboard.h +++ b/include/scoreboard.h @@ -149,6 +149,7 @@ struct process_score { apr_uint32_t keep_alive; /* async connections in keep alive */ apr_uint32_t suspended; /* connections suspended by some module */ apr_uint32_t read_line; /* async connections doing read line */ + apr_uint32_t pending; /* connections waiting a worker */ }; /* Scoreboard is now in 'local' memory, since it isn't updated once created, diff --git a/modules/filters/mod_reqtimeout.c b/modules/filters/mod_reqtimeout.c index 70d0ddb6167..693351e1280 100644 --- a/modules/filters/mod_reqtimeout.c +++ b/modules/filters/mod_reqtimeout.c @@ -97,8 +97,8 @@ static void extend_timeout(reqtimeout_con_cfg *ccfg, apr_bucket_brigade *bb) } } -static apr_status_t check_time_left(reqtimeout_con_cfg *ccfg, - apr_time_t now) +static apr_status_t check_and_update_time_left(reqtimeout_con_cfg *ccfg, + apr_time_t now) { if (!now) now = apr_time_now(); @@ -209,11 +209,11 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f, /* set new timeout */ now = apr_time_now(); ccfg->timeout_at = now + apr_time_from_sec(ccfg->cur_stage.timeout); - ccfg->cur_stage.timeout = 0; if (ccfg->cur_stage.max_timeout > 0) { ccfg->max_timeout_at = now + apr_time_from_sec(ccfg->cur_stage.max_timeout); ccfg->cur_stage.max_timeout = 0; } + ccfg->cur_stage.timeout = 0; } else if (ccfg->timeout_at == 0) { /* no timeout set, or in between requests */ @@ -227,25 +227,27 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f, rv = apr_socket_timeout_get(ccfg->socket, &saved_sock_timeout); AP_DEBUG_ASSERT(rv == APR_SUCCESS); - rv = check_time_left(ccfg, now); + rv = check_and_update_time_left(ccfg, now); if (rv != APR_SUCCESS) goto cleanup; if (mode == AP_MODE_GETLINE && block == APR_BLOCK_READ) { + apr_off_t remaining = HUGE_STRING_LEN; +#if APR_MAJOR_VERSION < 2 + apr_int32_t nsds; + apr_interval_time_t poll_timeout; + apr_pollfd_t pollset; + pollset.p = NULL; +#endif + /* * For a blocking AP_MODE_GETLINE read, apr_brigade_split_line() * would loop until a whole line has been read. As this would make it * impossible to enforce a total timeout, we only do non-blocking * reads. */ - apr_off_t remaining = HUGE_STRING_LEN; do { apr_off_t bblen; -#if APR_MAJOR_VERSION < 2 - apr_int32_t nsds; - apr_interval_time_t poll_timeout; - apr_pollfd_t pollset; -#endif rv = ap_get_brigade(f->next, bb, AP_MODE_GETLINE, APR_NONBLOCK_READ, remaining); if (rv != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(rv)) { @@ -282,10 +284,12 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f, /* ... and wait for more */ #if APR_MAJOR_VERSION < 2 - pollset.p = f->c->pool; - pollset.desc_type = APR_POLL_SOCKET; - pollset.reqevents = APR_POLLIN|APR_POLLHUP; - pollset.desc.s = ccfg->socket; + if (pollset.p == NULL) { + pollset.p = f->c->pool; + pollset.desc_type = APR_POLL_SOCKET; + pollset.reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; + pollset.desc.s = ccfg->socket; + } apr_socket_timeout_get(ccfg->socket, &poll_timeout); rv = apr_poll(&pollset, 1, &nsds, poll_timeout); #else @@ -294,7 +298,7 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f, if (rv != APR_SUCCESS) break; - rv = check_time_left(ccfg, 0); + rv = check_and_update_time_left(ccfg, 0); if (rv != APR_SUCCESS) break; @@ -306,12 +310,14 @@ static apr_status_t reqtimeout_filter(ap_filter_t *f, } else { /* mode != AP_MODE_GETLINE */ rv = ap_get_brigade(f->next, bb, mode, block, readbytes); + /* Don't extend the timeout in speculative mode, wait for * the real (relevant) bytes to be asked later, within the * currently allotted time. */ - if (ccfg->cur_stage.rate_factor && rv == APR_SUCCESS - && mode != AP_MODE_SPECULATIVE) { + if (rv == APR_SUCCESS + && mode != AP_MODE_SPECULATIVE + && ccfg->cur_stage.rate_factor) { extend_timeout(ccfg, bb); } } diff --git a/modules/generators/mod_status.c b/modules/generators/mod_status.c index 8707ebe58b3..5c620892b24 100644 --- a/modules/generators/mod_status.c +++ b/modules/generators/mod_status.c @@ -557,7 +557,8 @@ static int status_handler(request_rec *r) ap_rputs("", r); if (is_async) { - int read_line = 0, write_completion = 0, lingering_close = 0, keep_alive = 0, + int read_line = 0, write_completion = 0, + pending = 0, keep_alive = 0, lingering_close = 0, connections = 0, stopping = 0, procs = 0; /* * These differ from 'busy' and 'ready' in how gracefully finishing @@ -574,13 +575,15 @@ static int status_handler(request_rec *r) "Async connections\n" "totalaccepting" "busyidle" - "readingwritingkeep-aliveclosing\n", r); + "readingwriting" + "pendingkeep-aliveclosing\n", r); for (i = 0; i < server_limit; ++i) { ps_record = ap_get_scoreboard_process(i); if (ps_record->pid) { connections += ps_record->connections; read_line += ps_record->read_line; write_completion += ps_record->write_completion; + pending += ps_record->pending; keep_alive += ps_record->keep_alive; lingering_close += ps_record->lingering_close; busy_workers += thread_busy_buffer[i]; @@ -601,7 +604,8 @@ static int status_handler(request_rec *r) "%s%s" "%u%s" "%u%u" - "%u%u%u%u" + "%u%u" + "%u%u%u" "\n", i, ps_record->pid, dying, old, @@ -611,6 +615,7 @@ static int status_handler(request_rec *r) thread_idle_buffer[i], ps_record->read_line, ps_record->write_completion, + ps_record->pending, ps_record->keep_alive, ps_record->lingering_close); } @@ -621,12 +626,14 @@ static int status_handler(request_rec *r) "%d%d" "%d " "%d%d" - "%d%d%d%d" + "%d%d" + "%d%d%d" "\n\n", procs, stopping, connections, busy_workers, idle_workers, - read_line, write_completion, keep_alive, lingering_close); + read_line, write_completion, + pending, keep_alive, lingering_close); } else { ap_rprintf(r, "Processes: %d\n" @@ -636,12 +643,14 @@ static int status_handler(request_rec *r) "ConnsTotal: %d\n" "ConnsAsyncReading: %d\n" "ConnsAsyncWriting: %d\n" + "ConnsAsyncPending: %d\n" "ConnsAsyncKeepAlive: %d\n" "ConnsAsyncClosing: %d\n", procs, stopping, busy_workers, idle_workers, connections, - read_line, write_completion, keep_alive, lingering_close); + read_line, write_completion, + pending, keep_alive, lingering_close); } } diff --git a/modules/lua/lua_request.c b/modules/lua/lua_request.c index 52639291ae1..b2319e370ea 100644 --- a/modules/lua/lua_request.c +++ b/modules/lua/lua_request.c @@ -1264,10 +1264,18 @@ static int lua_ap_scoreboard_process(lua_State *L) lua_pushnumber(L, ps_record->suspended); lua_settable(L, -3); + lua_pushstring(L, "read_line"); + lua_pushnumber(L, ps_record->read_line); + lua_settable(L, -3); + lua_pushstring(L, "write_completion"); lua_pushnumber(L, ps_record->write_completion); lua_settable(L, -3); + lua_pushstring(L, "pending"); + lua_pushnumber(L, ps_record->pending); + lua_settable(L, -3); + lua_pushstring(L, "not_accepting"); lua_pushnumber(L, ps_record->not_accepting); lua_settable(L, -3); diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index a9f54e555e3..2a6c8b4209e 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -163,6 +163,11 @@ #define USECS_TO_LINGER apr_time_from_sec(SECONDS_TO_LINGER) #define MAX_USECS_TO_LINGER apr_time_from_sec(MAX_SECS_TO_LINGER) +#define BUSY_TIMEOUT apr_time_from_sec(1) + +#define LISTENER_IDLE_TIMEOUT apr_time_from_sec(1) +#define LISTENER_NOWAKEUP_TIMEOUT apr_time_from_msec(100) + #define TIMER_MIN_TIMEOUT apr_time_from_msec(50) /* @@ -172,9 +177,12 @@ #ifndef DEFAULT_ASYNC_FACTOR #define DEFAULT_ASYNC_FACTOR 2 #endif -#define WORKER_FACTOR_SCALE 16 /* scale factor to allow fractional values */ +#define WORKER_FACTOR_SCALE 16 /* scale factor to allow fractional values + for AsyncRequestWorkerFactor */ static apr_uint32_t async_factor = DEFAULT_ASYNC_FACTOR; - /* AsyncRequestWorkerFactor * 16 */ + +#define PENDING_ASYNC_RATIO 8 /* ratio of #threads for max pending events */ +static int max_pending_events = 0; /* Above, stop accepting connections */ static int threads_per_child = 0; /* ThreadsPerChild */ static int ap_daemons_to_start = 0; /* StartServers */ @@ -185,20 +193,20 @@ static int max_workers = 0; /* MaxRequestWorkers */ static int server_limit = 0; /* ServerLimit */ static int thread_limit = 0; /* ThreadLimit */ static int had_healthy_child = 0; -static volatile int dying = 0; -static volatile int workers_may_exit = 0; -static volatile int start_thread_may_exit = 0; -static volatile int listener_may_exit = 0; +static /*atomic*/ apr_uint32_t dying = 0; +static /*atomic*/ apr_uint32_t workers_may_exit = 0; +static /*atomic*/ apr_uint32_t start_thread_may_exit = 0; +static /*atomic*/ apr_uint32_t listener_may_exit = 0; +static unsigned int num_listensocks = 0; static int listener_is_wakeable = 0; /* Pollset supports APR_POLLSET_WAKEABLE */ -static int num_listensocks = 0; -static apr_int32_t conns_this_child; /* MaxConnectionsPerChild, only access +static int conns_this_child; /* MaxConnectionsPerChild, only access in listener thread */ -static apr_uint32_t connection_count = 0; /* Number of open connections */ -static apr_uint32_t lingering_count = 0; /* Number of connections in lingering close */ -static apr_uint32_t suspended_count = 0; /* Number of suspended connections */ -static apr_uint32_t clogged_count = 0; /* Number of threads processing ssl conns */ -static apr_uint32_t threads_shutdown = 0; /* Number of threads that have shutdown - early during graceful termination */ +static /*atomic*/ apr_uint32_t connection_count = 0; /* Number of open connections */ +static /*atomic*/ apr_uint32_t lingering_count = 0; /* Number of connections in lingering close */ +static /*atomic*/ apr_uint32_t suspended_count = 0; /* Number of suspended connections */ +static /*atomic*/ apr_uint32_t clogged_count = 0; /* Number of threads processing ssl conns */ +static /*atomic*/ apr_uint32_t threads_shutdown = 0; /* Number of threads that have shutdown + early during graceful termination */ static int resource_shortage = 0; static fd_queue_t *worker_queue; static fd_queue_info_t *worker_queue_info; @@ -210,6 +218,7 @@ module AP_MODULE_DECLARE_DATA mpm_event_module; /* forward declare */ struct event_srv_cfg_s; typedef struct event_srv_cfg_s event_srv_cfg; +struct timeout_queue; static apr_pollfd_t *listener_pollfd; @@ -224,93 +233,123 @@ static apr_pollfd_t *listener_pollfd; */ static apr_pollset_t *event_pollset; +struct pending_timer_event { + timer_event_t te; + fd_queue_event_t qe; +}; +#define te2qe(te) (&((struct pending_timer_event *)(te))->qe) + +struct pending_socket_event { + sock_event_t se; + fd_queue_event_t qe; + struct timeout_queue *q; +}; + typedef struct event_conn_state_t event_conn_state_t; struct event_conn_state_t { /** APR_RING of expiration timeouts */ APR_RING_ENTRY(event_conn_state_t) timeout_list; - /** the time when the entry was queued */ - apr_time_t queue_timestamp; + /** public parts of the connection state */ + conn_state_t pub; + /** memory pool allocated and to allocate from (ptrans) */ + apr_pool_t *p; /** connection record this struct refers to */ conn_rec *c; /** request record (if any) this struct refers to */ request_rec *r; /** server config this struct refers to */ event_srv_cfg *sc; - /** server config this struct refers to during keepalive */ + /** server config this struct refers to as for the keepalive(_q) timeout */ event_srv_cfg *ka_sc; /** scoreboard handle for the conn_rec */ ap_sb_handle_t *sbh; - /** is the current conn_rec suspended? (disassociated with - * a particular MPM thread; for suspend_/resume_connection - * hooks) - */ - int suspended; - /** memory pool to allocate from */ - apr_pool_t *p; /** bucket allocator */ apr_bucket_alloc_t *bucket_alloc; /** poll file descriptor information */ apr_pollfd_t pfd; - /** public parts of the connection state */ - conn_state_t pub; - /** When idle, attached queue or timer event */ + + /* queuing in a timeout_queue */ + /** the time when the entry was queued */ + apr_time_t queue_timestamp; + /** attached queue when in pollset */ struct timeout_queue *q; - timer_event_t *te; - /** In the pending_q? for lingering close? */ - int pending, pending_linger; + /** attached timer event when in pollset */ + timer_event_t *q_te; + + /* queuing/pending in the worker_queue */ + struct pending_socket_event pse; + + /* (fillable) bitfield (of bools) */ + /** is the current conn_rec suspended? (disassociated with + * a particular MPM thread; for suspend_/resume_connection + * hooks) + */ + unsigned int suspended :1; + /** is the connection pending for lingering close? */ + unsigned int linger_pending :1; + /** is lingering close started on the connection? */ + unsigned int linger_started :1; }; static APR_INLINE apr_socket_t *cs_sd(event_conn_state_t *cs) { + ap_assert(cs != NULL); return cs->pfd.desc.s; } -static APR_INLINE apr_sockaddr_t *cs_raddr(event_conn_state_t *cs) -{ - apr_sockaddr_t *addr = NULL; - apr_socket_addr_get(&addr, APR_REMOTE, cs_sd(cs)); - return addr; -} static APR_INLINE int cs_fd(event_conn_state_t *cs) { apr_os_sock_t fd = -1; apr_os_sock_get(&fd, cs_sd(cs)); return fd; } +static APR_INLINE apr_sockaddr_t *cs_raddr(event_conn_state_t *cs) +{ + apr_sockaddr_t *addr = NULL; + apr_socket_addr_get(&addr, APR_REMOTE, cs_sd(cs)); + return addr; +} static APR_INLINE const char *cs_state_str(event_conn_state_t *cs) { switch (cs->pub.state) { case CONN_STATE_CHECK_REQUEST_LINE_READABLE: return "keepalive"; case CONN_STATE_READ_REQUEST_LINE: - return "read_request"; + return "reading"; case CONN_STATE_HANDLER: return "handler"; case CONN_STATE_WRITE_COMPLETION: - return "write_completion"; + return "writing"; case CONN_STATE_SUSPENDED: - return "suspend"; + return "suspended"; case CONN_STATE_LINGER: - return "lingering_close"; + return "closing"; case CONN_STATE_LINGER_NORMAL: - return "normal_close"; + return "linger_close"; case CONN_STATE_LINGER_SHORT: - return "short_close"; + return "linger_short"; default: return "bad_state"; } } #define CS_FMT "pp:%pp:%i:%s" -#define CS_ARG(cs) cs, cs_sd(cs), cs_fd(cs), cs_state_str(cs) +#define CS_ARG(cs) (cs), cs_sd(cs), cs_fd(cs), cs_state_str(cs) -APR_RING_HEAD(timeout_head_t, event_conn_state_t); +struct ap_timer_t { + apr_time_t when; + apr_interval_time_t timeout; +}; +typedef struct ap_timer_t ap_timer_t; +APR_RING_HEAD(timeout_head_t, event_conn_state_t); struct timeout_queue { struct timeout_head_t head; apr_interval_time_t timeout; apr_uint32_t count; /* for this queue */ apr_uint32_t *total; /* for all chained/related queues */ + const char *name; struct timeout_queue *next; /* chaining */ }; + /* * Several timeout queues that use different timeouts, so that we always can * simply append to the end. @@ -319,7 +358,8 @@ struct timeout_queue { * keepalive_q uses vhost's KeepAliveTimeOut * linger_q uses MAX_SECS_TO_LINGER * short_linger_q uses SECONDS_TO_LINGER - * pending_q uses as many subqueues as above's timeouts + * pending_q uses as many subqueues as above's timeouts, + * and worker_queue's mutex rather than timeout_mutex * */ static struct timeout_queue *read_line_q, @@ -327,51 +367,30 @@ static struct timeout_queue *read_line_q, *keepalive_q, *linger_q, *short_linger_q, - *pending_q, /* pending_linger_q and pending_short_linger_q are * actually chained into pending_q, we save their - * pointers globally for push2pending() to access - * them directly. - */ - *pending_linger_q, *pending_short_linger_q; + * pointers globally for conn_state_set_pending() + * to access them directly */ + *pending_q, *pending_linger_q, *pending_short_linger_q; static volatile apr_time_t queues_next_expiry; -#ifndef MONOTONIC_CLOCK_IN_USE +#ifndef AP_CLOCK_GETTIME_MONOTONIC #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) -#define MONOTONIC_CLOCK_IN_USE CLOCK_MONOTONIC +#define AP_CLOCK_GETTIME_MONOTONIC CLOCK_MONOTONIC #endif #endif static APR_INLINE apr_time_t event_time_now(void) { -#ifdef MONOTONIC_CLOCK_IN_USE +#ifdef AP_CLOCK_GETTIME_MONOTONIC struct timespec ts; - clock_gettime(MONOTONIC_CLOCK_IN_USE, &ts); + clock_gettime(AP_CLOCK_GETTIME_MONOTONIC, &ts); return apr_time_from_sec(ts.tv_sec) + ts.tv_nsec / 1000; #else return apr_time_now(); #endif } -static APR_INLINE int TIMEOUT_EXPIRED(apr_time_t now, - apr_time_t timestamp, - apr_interval_time_t timeout) -{ - if (now >= timestamp + timeout) - return 1; - -#ifndef MONOTONIC_CLOCK_IN_USE - /* No entry should be registered after now + timeout in normal - * circonstances, expire them. This means the clock went in the - * past (i.e. not monotonic). - */ - if (timestamp > now + timeout) - return 1; -#endif - - return 0; -} - /* Prevent extra poll/wakeup calls for timeouts close in the future (queues * have the granularity of a second anyway). * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"? @@ -387,7 +406,7 @@ static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el) apr_time_t elem_expiry; apr_time_t next_expiry; - ap_assert(q && el->q == NULL); + ap_assert(q && !el->q); el->q = q; APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list); @@ -421,6 +440,7 @@ static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el) } static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, + const char *name, apr_interval_time_t t, struct timeout_queue *ref) { @@ -429,12 +449,14 @@ static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, q = apr_pcalloc(p, sizeof *q); APR_RING_INIT(&q->head, event_conn_state_t, timeout_list); q->total = (ref) ? ref->total : apr_pcalloc(p, sizeof *q->total); + q->name = name; q->timeout = t; return q; } static struct timeout_queue *TO_QUEUE_CHAIN(apr_pool_t *p, + const char *name, apr_interval_time_t t, struct timeout_queue **ref, apr_hash_t *ht, apr_pool_t *hp) @@ -442,7 +464,7 @@ static struct timeout_queue *TO_QUEUE_CHAIN(apr_pool_t *p, struct timeout_queue *q = apr_hash_get(ht, &t, sizeof t); if (!q) { - q = TO_QUEUE_MAKE(p, t, *ref); + q = TO_QUEUE_MAKE(p, name, t, *ref); q->next = *ref; *ref = q; @@ -502,7 +524,6 @@ typedef struct socket_callback_baton apr_array_header_t *pfds; timer_event_t *cancel_event; /* If a timeout was requested, a pointer to the timer event */ struct socket_callback_baton *next; - unsigned int signaled :1; } socket_callback_baton_t; typedef struct event_child_bucket { @@ -532,7 +553,7 @@ typedef struct event_retained_data { * We use this value to optimize routines that have to scan the entire * scoreboard. */ - int max_daemons_limit; + int max_daemons_used; /* * All running workers, active and shutting down, including those that @@ -612,23 +633,30 @@ static int ap_child_slot; /* Current child process slot in scoreboard */ */ static apr_socket_t **worker_sockets; -static volatile apr_uint32_t listensocks_disabled; +static /*atomic*/ apr_uint32_t listensocks_disabled; + +static APR_INLINE int listeners_disabled(void) +{ + return apr_atomic_read32(&listensocks_disabled) != 0; +} static int disable_listensocks(void) { - int i; + unsigned int i; if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) { return 0; } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(10381) - "Backlogging new connections: %u conns, %u idlers, " - "%u pending, %u lingering, %u clogged, %u suspended)", + "Suspend accepting new connections: avail:%i conns:%u " + "rl:%u wc:%u ka:%u lingering:%u suspended:%u clogged:%u", + ap_queue_info_avail(worker_queue_info), apr_atomic_read32(&connection_count), - ap_queue_info_num_idlers(worker_queue_info), - apr_atomic_read32(pending_q->total), + apr_atomic_read32(read_line_q->total), + apr_atomic_read32(write_completion_q->total), + apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), - apr_atomic_read32(&clogged_count), - apr_atomic_read32(&suspended_count)); + apr_atomic_read32(&suspended_count), + apr_atomic_read32(&clogged_count)); if (event_pollset) { for (i = 0; i < num_listensocks; i++) { apr_pollset_remove(event_pollset, &listener_pollfd[i]); @@ -640,20 +668,22 @@ static int disable_listensocks(void) static int enable_listensocks(void) { - int i; - if (listener_may_exit - || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) { + unsigned int i; + if (apr_atomic_read32(&dying) + || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) { return 0; } ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457) - "Accepting new connections again: %u conn, %u idlers, " - "%u lingering, %u pending, %u clogged, %u suspended)", + "Resume accepting new connections: avail:%i conns:%u " + "rl:%u wc:%u ka:%u lingering:%u suspended:%u clogged:%u", + ap_queue_info_avail(worker_queue_info), apr_atomic_read32(&connection_count), - ap_queue_info_num_idlers(worker_queue_info), - apr_atomic_read32(pending_q->total), + apr_atomic_read32(read_line_q->total), + apr_atomic_read32(write_completion_q->total), + apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), - apr_atomic_read32(&clogged_count), - apr_atomic_read32(&suspended_count)); + apr_atomic_read32(&suspended_count), + apr_atomic_read32(&clogged_count)); for (i = 0; i < num_listensocks; i++) apr_pollset_add(event_pollset, &listener_pollfd[i]); /* @@ -664,39 +694,25 @@ static int enable_listensocks(void) return 1; } -static APR_INLINE int listeners_disabled(void) +static APR_INLINE int pending_above_limit(int limit) { - return apr_atomic_read32(&listensocks_disabled) != 0; + return ap_queue_info_avail(worker_queue_info) < -limit; } -static APR_INLINE apr_uint32_t u32_min(apr_uint32_t u1, apr_uint32_t u2) +static APR_INLINE int should_disable_listensocks(void) { - return (u1 < u2) ? u1 : u2; -} + if (listeners_disabled()) + return 0; -static int connections_above_limit(int *busy) -{ -#ifndef LISTEN_OFF_SCALE -#define LISTEN_OFF_SCALE 4u -#endif - apr_uint32_t t_p_c = (apr_uint32_t)threads_per_child; - apr_uint32_t i_num = ap_queue_info_num_idlers(worker_queue_info); - apr_uint32_t i_min = listeners_disabled() ? t_p_c / LISTEN_OFF_SCALE : 0; - if (i_num > i_min) { /* amortize disable=>enable_listensocks() switch */ - apr_uint32_t t_num = t_p_c + i_num * async_factor; - if (apr_atomic_read32(pending_q->total) <= t_num) { - return 0; - } - } - else if (busy && !i_num) { - *busy = 1; - } - return 1; + return pending_above_limit(max_pending_events); } -static APR_INLINE int should_enable_listensocks(void) +static APR_INLINE int should_reenable_listensocks(void) { - return !dying && listeners_disabled() && !connections_above_limit(NULL); + if (!listeners_disabled() || apr_atomic_read32(&dying)) + return 0; + + return !pending_above_limit(1); } static void close_socket_at(apr_socket_t *csd, @@ -742,9 +758,9 @@ static void shutdown_listener(void) { ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, "shutting down listener%s", - listener_may_exit ? " again" : ""); + apr_atomic_read32(&listener_may_exit) ? " again" : ""); - listener_may_exit = 1; + apr_atomic_set32(&listener_may_exit, 1); disable_listensocks(); /* Unblock the listener if it's poll()ing */ @@ -801,7 +817,7 @@ static void signal_threads(int mode) * workers to exit once it has stopped accepting new connections */ if (mode == ST_UNGRACEFUL) { - workers_may_exit = 1; + apr_atomic_set32(&workers_may_exit, 1); ap_queue_interrupt_all(worker_queue); close_worker_sockets(); /* forcefully kill all current connections */ } @@ -814,7 +830,7 @@ static int event_query(int query_code, int *result, apr_status_t *rv) *rv = APR_SUCCESS; switch (query_code) { case AP_MPMQ_MAX_DAEMON_USED: - *result = retained->max_daemons_limit; + *result = retained->max_daemons_used; break; case AP_MPMQ_IS_THREADED: *result = AP_MPMQ_STATIC; @@ -960,11 +976,11 @@ static apr_status_t decrement_connection_count(void *cs_) */ is_last_connection = !apr_atomic_dec32(&connection_count); if (listener_is_wakeable - && ((is_last_connection && listener_may_exit) - || should_enable_listensocks())) { + && ((is_last_connection && apr_atomic_read32(&listener_may_exit)) + || should_reenable_listensocks())) { apr_pollset_wakeup(event_pollset); } - if (dying) { + if (apr_atomic_read32(&dying)) { /* Help worker_thread_should_exit_early() */ ap_queue_interrupt_one(worker_queue); } @@ -1031,54 +1047,60 @@ static void kill_connection_at(event_conn_state_t *cs, apr_status_t status, kill_connection_at((cs), (status), __FUNCTION__, __LINE__) /* forward declare */ -static void do_push2pending(event_conn_state_t *cs, apr_time_t now, - int processing_timeout); +static void update_reqevents_from_sense(event_conn_state_t *cs, int sense); +static void push2worker(event_conn_state_t *cs, timer_event_t *te, + apr_time_t now, int *busy); /* Shutdown the connection in case of timeout, error or resources shortage. - * This starts short lingering close if not already there, or directly closes + * This starts lingering close if not already there, or directly closes * the connection otherwise. * Pre-condition: nonblocking, can be called from anywhere provided cs is not - * in any timeout queue or in the pollset. + * in the pollset nor any non-pending timeout queue. */ -static void shutdown_connection(event_conn_state_t *cs, apr_time_t now) +static void shutdown_connection(event_conn_state_t *cs, apr_time_t now, + int was_pending) { + ap_assert(!cs->q && !cs->q_te); + if (cs->c) { - int log_level; - switch (cs->pub.state) { - case CONN_STATE_LINGER: - case CONN_STATE_LINGER_SHORT: - case CONN_STATE_LINGER_NORMAL: - case CONN_STATE_CHECK_REQUEST_LINE_READABLE: - log_level = APLOG_TRACE2; - break; - default: - log_level = APLOG_INFO; - break; + if (APLOGcinfo(cs->c)) { + int log_level; + switch (cs->pub.state) { + case CONN_STATE_LINGER: + case CONN_STATE_LINGER_SHORT: + case CONN_STATE_LINGER_NORMAL: + case CONN_STATE_CHECK_REQUEST_LINE_READABLE: + log_level = APLOG_TRACE2; + break; + default: + log_level = APLOG_INFO; + break; + } + ap_log_cerror(APLOG_MARK, log_level, 0, cs->c, APLOGNO(10380) + "shutting down %s connection in %s", + was_pending ? "pending" : "timed out", + cs_state_str(cs)); } - ap_log_cerror(APLOG_MARK, log_level, 0, cs->c, APLOGNO(10380) - "shutting down %s connection in %s", - cs->pending ? "pending" : "timed out", - cs_state_str(cs)); /* Don't re-schedule connections in lingering close, they had * their chance already so just close them now. */ - if (cs->pub.state < CONN_STATE_LINGER) { - /* Set pending_linger for process_lingering_close() to + if (cs->pub.state >= CONN_STATE_LINGER) { + close_connection(cs); + } + else { + /* linger_pending tells process_lingering_close() to * not increment lingering_count twice. */ - cs->pending_linger = 1; - cs->pub.state = CONN_STATE_LINGER; + cs->linger_pending = 1; + cs->pub.state = CONN_STATE_LINGER_SHORT; apr_atomic_inc32(&lingering_count); - - do_push2pending(cs, now, 1); - } - else { - close_connection(cs); + push2worker(cs, NULL, now, NULL); } } else { /* Never been scheduled/processed, kill it. */ + ap_assert(was_pending); kill_connection(cs, APR_EBUSY); } } @@ -1154,8 +1176,8 @@ static int event_post_read_request(request_rec *r) return OK; } -static int pollset_add_at(event_conn_state_t *cs, - struct timeout_queue *q, timer_event_t *te, +static int pollset_add_at(event_conn_state_t *cs, int sense, + struct timeout_queue *q, timer_event_t *q_te, const char *at, int line) { apr_status_t rv; @@ -1164,31 +1186,36 @@ static int pollset_add_at(event_conn_state_t *cs, "pollset: add %s=%" APR_TIME_T_FMT " events=%x" " for connection %" CS_FMT " at %s:%i", (q) ? "q" : "t", - (q) ? q->timeout : (te) ? te->timeout : -1, + (q) ? q->timeout : (q_te) ? q_te->timeout : -1, (int)cs->pfd.reqevents, CS_ARG(cs), at, line); + update_reqevents_from_sense(cs, sense); + if (q) { - ap_assert(te == NULL); + ap_assert(!q_te); apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(q, cs); } + else if (q_te) { + ap_assert(!q); + cs->q_te = q_te; + } else { - ap_assert(te != NULL); - cs->te = te; + ap_assert(0); } rv = apr_pollset_add(event_pollset, &cs->pfd); - if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + if (rv != APR_SUCCESS) { if (q) { TO_QUEUE_REMOVE(q, cs); apr_thread_mutex_unlock(timeout_mutex); } else { - cs->te = NULL; - te->canceled = 1; + q_te->canceled = 1; + cs->q_te = NULL; } /* close_worker_sockets() may have closed it already */ - if (workers_may_exit) { + if (apr_atomic_read32(&workers_may_exit)) { AP_DEBUG_ASSERT(APR_STATUS_IS_EBADF(rv)); } else { @@ -1203,45 +1230,38 @@ static int pollset_add_at(event_conn_state_t *cs, if (q) { apr_thread_mutex_unlock(timeout_mutex); } - return 1; } -#define pollset_add(cs, q, te) \ - pollset_add_at((cs), (q), (te), __FUNCTION__, __LINE__) +#define pollset_add(cs, sense, q, q_te) \ + pollset_add_at((cs), (sense), (q), (q_te), __FUNCTION__, __LINE__) static int pollset_del_at(event_conn_state_t *cs, int locked, const char *at, int line) { apr_status_t rv; - struct timeout_queue *q = cs->q; ap_log_cerror(APLOG_MARK, APLOG_TRACE7, 0, cs->c, "pollset: del %s=%" APR_TIME_T_FMT " events=%x" " for connection %" CS_FMT " at %s:%i", - (q) ? "q" : "t", - (q) ? q->timeout : (cs->te) ? cs->te->timeout : -1, + (cs->q) ? "q" : "t", + (cs->q) ? cs->q->timeout : (cs->q_te ? cs->q_te->timeout : -1), (int)cs->pfd.reqevents, CS_ARG(cs), at, line); - if (q) { - ap_assert(cs->te == NULL); + if (cs->q) { + ap_assert(!cs->q_te); if (!locked) { apr_thread_mutex_lock(timeout_mutex); } - TO_QUEUE_REMOVE(q, cs); - } - else { - ap_assert(cs->te != NULL); - } - rv = apr_pollset_remove(event_pollset, &cs->pfd); - if (q) { + TO_QUEUE_REMOVE(cs->q, cs); if (!locked) { apr_thread_mutex_unlock(timeout_mutex); } } else { - cs->te->canceled = 1; - cs->te = NULL; + ap_assert(cs->q_te); + cs->q_te->canceled = 1; + cs->q_te = NULL; } /* @@ -1250,6 +1270,7 @@ static int pollset_del_at(event_conn_state_t *cs, int locked, * therefore, we can accept _SUCCESS or _NOTFOUND, * and we still want to keep going */ + rv = apr_pollset_remove(event_pollset, &cs->pfd); if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) { ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(03094) "pollset remove failed for connection %" CS_FMT " at %s:%i", @@ -1265,11 +1286,11 @@ static int pollset_del_at(event_conn_state_t *cs, int locked, pollset_del_at((cs), (locked), __FUNCTION__, __LINE__) /* Forward declare */ -static timer_event_t *event_get_timer_event(apr_time_t t, - ap_mpm_callback_fn_t *cbfn, - void *baton, - int insert, - apr_array_header_t *pfds); +static timer_event_t *get_timer_event(apr_time_t timeout, + ap_mpm_callback_fn_t *cbfn, + void *baton, + int insert, + apr_array_header_t *pfds); static void process_lingering_close(event_conn_state_t *cs); static void update_reqevents_from_sense(event_conn_state_t *cs, int sense) @@ -1286,6 +1307,9 @@ static void update_reqevents_from_sense(event_conn_state_t *cs, int sense) else if (sense == CONN_SENSE_WANT_WRITE) { cs->pfd.reqevents = APR_POLLOUT; } + else { + ap_assert(0); + } /* POLLERR is usually returned event only, but some pollset * backends may require it in reqevents to do the right thing, @@ -1301,10 +1325,13 @@ static event_conn_state_t *make_conn_state(apr_pool_t *p, apr_socket_t *csd) { event_conn_state_t *cs = apr_pcalloc(p, sizeof(*cs)); - cs->p = p; - cs->pfd.desc.s = csd; - cs->pfd.desc_type = APR_POLL_SOCKET; APR_RING_ELEM_INIT(cs, timeout_list); + cs->p = cs->pse.se.p = p; + cs->pfd.desc_type = APR_POLL_SOCKET; + cs->pfd.desc.s = cs->pse.se.sd = csd; + cs->pse.qe.cb_baton = cs->pse.se.baton = cs; + cs->pse.qe.type = FD_QUEUE_EVENT_SOCK; + cs->pse.qe.data.se = &cs->pse.se; cs->sc = cs->ka_sc = ap_get_module_config(ap_server_conf->module_config, &mpm_event_module); @@ -1348,6 +1375,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc if (cs == NULL) { cs = make_conn_state(p, sock); } + cs->bucket_alloc = apr_bucket_alloc_create(p); ap_create_sb_handle(&cs->sbh, p, my_child_num, my_thread_num); c = ap_run_create_connection(p, ap_server_conf, sock, @@ -1360,14 +1388,13 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc c->current_thread = thd; c->cs = &cs->pub; cs->c = c; - cs->p = p; - cs->pub.sense = CONN_SENSE_DEFAULT; - update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); + apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup); + pt = apr_pcalloc(p, sizeof(*pt)); pt->type = PT_CSD; pt->baton = cs; cs->pfd.client_data = pt; - apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup); + update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); ap_update_vhost_given_ip(c); @@ -1390,13 +1417,13 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "processing connection %" CS_FMT, CS_ARG(cs)); - if (c->aborted) { + if (cs->pub.state >= CONN_STATE_LINGER) { + /* fall through */ + } + else if (c->aborted) { /* do lingering close below */ cs->pub.state = CONN_STATE_LINGER; } - else if (cs->pub.state >= CONN_STATE_LINGER) { - /* fall through */ - } else { if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE /* If we have an input filter which 'clogs' the input stream, @@ -1415,9 +1442,6 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc if (clogging) { apr_atomic_dec32(&clogged_count); } - if (cs->pub.state > CONN_STATE_LINGER) { - cs->pub.state = CONN_STATE_LINGER; - } if (rc == DONE) { rc = OK; } @@ -1492,8 +1516,6 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->queue_timestamp = event_time_now(); notify_suspend(cs); - /* Add work to pollset. */ - update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); /* If the connection timeout is actually different than the rl_q's, use * a timer event to honor it (e.g. mod_reqtimeout may enforce its own * timeouts per request stage). @@ -1504,18 +1526,17 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc if (timeout < TIMER_MIN_TIMEOUT) { timeout = TIMER_MIN_TIMEOUT; } - te = event_get_timer_event(timeout, NULL, cs, 1, NULL); + te = get_timer_event(timeout, NULL, cs, 1, NULL); } else { q = cs->sc->rl_q; } - if (pollset_add(cs, q, te)) { + if (pollset_add(cs, CONN_SENSE_WANT_READ, q, te)) { return; } /* fall through */ - apr_table_setn(cs->c->notes, "short-lingering-close", "1"); - cs->pub.state = CONN_STATE_LINGER; + cs->pub.state = CONN_STATE_LINGER_SHORT; } if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { @@ -1539,25 +1560,23 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->queue_timestamp = event_time_now(); notify_suspend(cs); - update_reqevents_from_sense(cs, CONN_SENSE_WANT_WRITE); - if (pollset_add(cs, cs->sc->wc_q, NULL)) { + if (pollset_add(cs, CONN_SENSE_WANT_WRITE, cs->sc->wc_q, NULL)) { return; } /* fall through */ - apr_table_setn(cs->c->notes, "short-lingering-close", "1"); - pending = DONE; /* CONN_STATE_LINGER below */ + cs->pub.state = CONN_STATE_LINGER_SHORT; } - if (pending != DECLINED - || c->aborted - || c->keepalive != AP_CONN_KEEPALIVE) { + else if (c->aborted + || c->keepalive != AP_CONN_KEEPALIVE + || pending != DECLINED) { cs->pub.state = CONN_STATE_LINGER; } else if (ap_run_input_pending(c) == OK) { cs->pub.state = CONN_STATE_READ_REQUEST_LINE; goto read_request; } - else if (!listener_may_exit) { + else if (!apr_atomic_read32(&listener_may_exit)) { cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE; } else { @@ -1579,14 +1598,12 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->queue_timestamp = event_time_now(); notify_suspend(cs); - update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); - if (pollset_add(cs, cs->ka_sc->ka_q, NULL)) { + if (pollset_add(cs, CONN_SENSE_WANT_READ, cs->ka_sc->ka_q, NULL)) { return; } /* fall through */ - apr_table_setn(cs->c->notes, "short-lingering-close", "1"); - cs->pub.state = CONN_STATE_LINGER; + cs->pub.state = CONN_STATE_LINGER_SHORT; } if (cs->pub.state == CONN_STATE_SUSPENDED) { @@ -1624,16 +1641,13 @@ static apr_status_t event_resume_suspended (conn_rec *c) cs->pub.state = CONN_STATE_WRITE_COMPLETION; notify_suspend(cs); - update_reqevents_from_sense(cs, CONN_SENSE_WANT_WRITE); - if (pollset_add(cs, cs->sc->wc_q, NULL)) { + if (pollset_add(cs, CONN_SENSE_WANT_WRITE, cs->sc->wc_q, NULL)) { return APR_SUCCESS; } /* fall through */ - apr_table_setn(cs->c->notes, "short-lingering-close", "1"); + cs->pub.state = CONN_STATE_LINGER_SHORT; } - - cs->pub.state = CONN_STATE_LINGER; process_lingering_close(cs); return OK; } @@ -1659,7 +1673,7 @@ static void close_listeners(void) ap_close_listeners_ex(my_bucket->listeners); - dying = 1; + apr_atomic_set32(&dying, 1); ap_scoreboard_image->parent[ap_child_slot].quiescing = 1; for (i = 0; i < threads_per_child; ++i) { ap_update_child_status_from_indexes(ap_child_slot, i, @@ -1737,55 +1751,23 @@ static void init_serf(apr_pool_t *p) } #endif -static apr_status_t push_timer2worker(timer_event_t* te) +static void conn_state_queue_cb(void *baton, int push) { - return ap_queue_push_timer(worker_queue, te); -} + event_conn_state_t *cs = baton; + ap_assert(cs && cs->pse.q); -/* - * Pre-condition: cs is neither in event_pollset nor a timeout queue - * this function may only be called by the listener - */ -static apr_status_t push2worker(event_conn_state_t *cs) -{ - apr_status_t rc; - apr_pool_t *ptrans = NULL; - apr_socket_t *csd = NULL; - - if (cs) { - ptrans = cs->p; - csd = cs_sd(cs); + if (push) { + TO_QUEUE_APPEND(cs->pse.q, cs); } - - rc = ap_queue_push_socket(worker_queue, csd, cs, ptrans); - if (rc != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471) - "push2worker: ap_queue_push_socket failed"); - /* trash the connection; we couldn't queue the connected - * socket to a worker - */ - if (cs) { - kill_connection(cs, rc); - } - else { - if (csd) { - close_socket(csd); - } - if (ptrans) { - ap_queue_info_push_pool(worker_queue_info, ptrans); - } - } - signal_threads(ST_GRACEFUL); + else { /* pop */ + TO_QUEUE_REMOVE(cs->pse.q, cs); + cs->pse.qe.cb = NULL; + cs->pse.q = NULL; } - - return rc; } -static void do_push2pending(event_conn_state_t *cs, apr_time_t now, - int processing_timeout) +static void conn_state_set_pending(event_conn_state_t *cs, apr_time_t now) { - struct timeout_queue *q = NULL; - if (cs->c) { ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "pushing pending connection %" CS_FMT, CS_ARG(cs)); @@ -1795,95 +1777,92 @@ static void do_push2pending(event_conn_state_t *cs, apr_time_t now, "pushing pending connection %pp", cs); } - /* Kindle the fire by not accepting new connections until - * the situation settles. New idling workers will test for - * should_enable_listensocks() to recover when possible. - */ - if (connections_above_limit(NULL) && disable_listensocks()) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, - "No idle worker, not accepting new conns " - "in this process"); - } - - cs->pending = 1; cs->queue_timestamp = now; switch (cs->pub.state) { case CONN_STATE_CHECK_REQUEST_LINE_READABLE: - q = cs->sc->pending_ka_q; + cs->pse.q = cs->sc->pending_ka_q; break; case CONN_STATE_READ_REQUEST_LINE: - q = cs->sc->pending_rl_q; + cs->pse.q = cs->sc->pending_rl_q; break; case CONN_STATE_WRITE_COMPLETION: - q = cs->sc->pending_wc_q; + cs->pse.q = cs->sc->pending_wc_q; break; case CONN_STATE_LINGER_NORMAL: - q = pending_linger_q; + cs->pse.q = pending_linger_q; break; case CONN_STATE_LINGER_SHORT: - q = pending_short_linger_q; + cs->pse.q = pending_short_linger_q; break; - case CONN_STATE_LINGER: - /* Well, process_timeout_queue() calls us in CONN_STATE_LINGER - * state but that's an exception, all other cases are of the - * "should not happen" nature. - */ - if (processing_timeout) { - q = pending_linger_q; - break; - } - /* fallthru */ default: ap_assert(0); return; } + cs->pse.qe.cb = conn_state_queue_cb; +} - /* process_timeout_queue() may hold the lock already */ - if (!processing_timeout) { - apr_thread_mutex_lock(timeout_mutex); +/* get_worker: + * If no worker was available immediately, will set *busy to 1. + */ +static int get_worker(int *busy) +{ + if (ap_queue_info_get_idler(worker_queue_info) != APR_EBUSY) { + return 1; } - - /* That simple finally :) */ - TO_QUEUE_APPEND(q, cs); - - if (!processing_timeout) { - apr_thread_mutex_unlock(timeout_mutex); + if (busy) { + *busy = 1; } -} -static APR_INLINE void push2pending(event_conn_state_t *cs, apr_time_t now) -{ - do_push2pending(cs, now, 0); + return 0; } -/* get_worker: - * If *have_idle_worker_p == 0, reserve a worker thread, and set - * *have_idle_worker_p = 1. - * If *have_idle_worker_p is already 1, will do nothing. - * If no worker was available immediately, will set *all_busy to 1. +/* + * Pre-condition: cs is neither in event_pollset nor a timeout queue + * this function may only be called by the listener */ -static void get_worker(int *have_idle_worker_p, int *all_busy) +static void push2worker(event_conn_state_t *cs, timer_event_t *te, + apr_time_t now, int *busy) { apr_status_t rc; - if (*have_idle_worker_p) { - /* already reserved a worker thread - must have hit a - * transient error on a previous pass + ap_assert(cs || te); + + if (!get_worker(busy)) { + /* Might need to kindle the fire by not accepting new connections + * until the situation settles. New idling workers will test for + * should_reenable_listensocks() to recover (when suitable). */ - return; - } + if (should_disable_listensocks()) { + disable_listensocks(); + } - rc = ap_queue_info_try_get_idler(worker_queue_info); - if (rc == APR_SUCCESS || APR_STATUS_IS_EOF(rc)) { - *have_idle_worker_p = 1; - } - else if (rc == APR_EAGAIN) { - *all_busy = 1; + if (cs) { + /* The connection will be both in a timeout and the worker queues, + * make sure either one handled first unregisters from the other. + */ + conn_state_set_pending(cs, now); + } } - else { - ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(00472) - "ap_queue_info_wait_for_idler failed. " - "Attempting to shutdown process gracefully"); - signal_threads(ST_GRACEFUL); + + rc = ap_queue_push_event(worker_queue, (cs) ? &cs->pse.qe : te2qe(te)); + if (rc != APR_SUCCESS) { + int mode = ST_GRACEFUL; + ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471) + "push2worker: queuing %s failed", cs ? "socker" : "timer"); + + if (cs) { + /* Can't go anywhere, kill (and log). */ + kill_connection(cs, rc); + } + else { + /* Can't block thus call te->cbfunc() here, someone is going + * to miss this event and never release their connection(s), + * so graceful may never complete. + */ + mode = ST_UNGRACEFUL; + } + + AP_DEBUG_ASSERT(0); + signal_threads(mode); } } @@ -1914,14 +1893,14 @@ static int timer_comp(void *a, void *b) static apr_thread_mutex_t *g_timer_skiplist_mtx; -static timer_event_t * event_get_timer_event(apr_time_t t, - ap_mpm_callback_fn_t *cbfn, - void *baton, - int insert, - apr_array_header_t *pfds) +static timer_event_t *get_timer_event(apr_time_t timeout, + ap_mpm_callback_fn_t *cbfn, + void *baton, + int insert, + apr_array_header_t *pfds) { timer_event_t *te; - apr_time_t now = (t < 0) ? 0 : event_time_now(); + apr_time_t now = (timeout < 0) ? 0 : event_time_now(); /* oh yeah, and make locking smarter/fine grained. */ @@ -1932,15 +1911,21 @@ static timer_event_t * event_get_timer_event(apr_time_t t, APR_RING_REMOVE(te, link); } else { - te = apr_skiplist_alloc(timer_skiplist, sizeof(timer_event_t)); - APR_RING_ELEM_INIT(te, link); + struct pending_timer_event *pte; + pte = apr_skiplist_alloc(timer_skiplist, sizeof(*pte)); + te = &pte->te; + + memset(&pte->qe, 0, sizeof(pte->qe)); + pte->qe.type = FD_QUEUE_EVENT_TIMER; + pte->qe.data.te = te; } + APR_RING_ELEM_INIT(te, link); te->cbfunc = cbfn; te->baton = baton; te->canceled = 0; - te->when = now + t; - te->timeout = t; + te->when = now + timeout; + te->timeout = timeout; te->pfds = pfds; if (insert) { @@ -1955,18 +1940,32 @@ static timer_event_t * event_get_timer_event(apr_time_t t, next_expiry = timers_next_expiry; if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) { timers_next_expiry = te->when; - /* Unblock the poll()ing listener for it to update its timeout. */ + /* Wake up the listener to eventually update its poll()ing timeout. */ if (listener_is_wakeable) { apr_pollset_wakeup(event_pollset); } } } + apr_thread_mutex_unlock(g_timer_skiplist_mtx); return te; } -static apr_status_t event_register_timed_callback_ex(apr_time_t t, +static void put_timer_event(timer_event_t *te, int locked) +{ + if (!locked) { + apr_thread_mutex_lock(g_timer_skiplist_mtx); + } + + APR_RING_INSERT_TAIL(&timer_free_ring.link, te, timer_event_t, link); + + if (!locked) { + apr_thread_mutex_unlock(g_timer_skiplist_mtx); + } +} + +static apr_status_t event_register_timed_callback_ex(apr_time_t timeout, ap_mpm_callback_fn_t *cbfn, void *baton, apr_array_header_t *pfds) @@ -1974,15 +1973,15 @@ static apr_status_t event_register_timed_callback_ex(apr_time_t t, if (!cbfn) { return APR_EINVAL; } - event_get_timer_event(t, cbfn, baton, 1, pfds); + get_timer_event(timeout, cbfn, baton, 1, pfds); return APR_SUCCESS; } -static apr_status_t event_register_timed_callback(apr_time_t t, +static apr_status_t event_register_timed_callback(apr_time_t timeout, ap_mpm_callback_fn_t *cbfn, void *baton) { - event_register_timed_callback_ex(t, cbfn, baton, NULL); + event_register_timed_callback_ex(timeout, cbfn, baton, NULL); return APR_SUCCESS; } @@ -2004,6 +2003,10 @@ static apr_status_t event_cleanup_poll_callback(void *data) } } + if (final_rc) { + AP_DEBUG_ASSERT(0); + signal_threads(ST_GRACEFUL); + } return final_rc; } @@ -2053,7 +2056,7 @@ static apr_status_t event_register_poll_callback_ex(apr_pool_t *p, if (timeout < TIMER_MIN_TIMEOUT) { timeout = TIMER_MIN_TIMEOUT; } - scb->cancel_event = event_get_timer_event(timeout, tofn, baton, 1, scb->pfds); + scb->cancel_event = get_timer_event(timeout, tofn, baton, 1, scb->pfds); } for (i = 0; i < scb->pfds->nelts; i++) { apr_pollfd_t *pfd = (apr_pollfd_t *)scb->pfds->elts + i; @@ -2092,26 +2095,32 @@ static apr_status_t event_register_poll_callback(apr_pool_t *p, #define LINGERING_BUF_SIZE (32 * 1024) static void process_lingering_close(event_conn_state_t *cs) { - apr_socket_t *csd = ap_get_conn_socket(cs->c); char dummybuf[LINGERING_BUF_SIZE]; + apr_socket_t *csd = cs_sd(cs); + struct timeout_queue *q; apr_size_t nbytes; apr_status_t rv; - struct timeout_queue *q; ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "lingering close for connection %" CS_FMT, CS_ARG(cs)); - AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER); + ap_assert(cs->pub.state >= CONN_STATE_LINGER); - if (cs->pub.state == CONN_STATE_LINGER) { - /* push2pending() may have bumped lingering_count already */ - if (cs->pending_linger) { - cs->pending_linger = 0; + /* start lingering close once */ + if (!cs->linger_started) { + cs->linger_started = 1; + + /* shutdown_connection() may have bumped lingering_count already */ + if (!cs->linger_pending) { + apr_atomic_inc32(&lingering_count); } else { - apr_atomic_inc32(&lingering_count); + cs->linger_pending = 0; } + /* Shutdown the connection, i.e. pre_connection_close hooks, + * SSL/TLS close notify, flush, etc.. + */ apr_socket_timeout_set(csd, USECS_TO_LINGER); if (ap_start_lingering_close(cs->c)) { notify_suspend(cs); @@ -2120,25 +2129,28 @@ static void process_lingering_close(event_conn_state_t *cs) } cs->queue_timestamp = event_time_now(); - /* Clear APR_INCOMPLETE_READ if it was ever set, we'll do the poll() - * at the listener only from now, if needed. - */ - apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0); - /* - * If some module requested a shortened waiting period, only wait for - * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain - * DoS attacks. - */ - if (apr_table_get(cs->c->notes, "short-lingering-close")) { - cs->pub.state = CONN_STATE_LINGER_SHORT; - } - else { - cs->pub.state = CONN_STATE_LINGER_NORMAL; + if (cs->pub.state == CONN_STATE_LINGER) { + /* If some module requested a shortened waiting period, only wait + * for 2s (SECONDS_TO_LINGER). This is useful for mitigating + * certain DoS attacks. + */ + if (apr_table_get(cs->c->notes, "short-lingering-close")) { + cs->pub.state = CONN_STATE_LINGER_SHORT; + } + else { + cs->pub.state = CONN_STATE_LINGER_NORMAL; + } } + cs->pub.sense = CONN_SENSE_DEFAULT; notify_suspend(cs); + + /* All nonblocking reads from now, we'll poll() in the listener + * if needed so clear APR_INCOMPLETE_READ if it was ever set. + */ + apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0); + apr_socket_timeout_set(csd, 0); } - apr_socket_timeout_set(csd, 0); do { nbytes = sizeof(dummybuf); rv = apr_socket_recv(csd, dummybuf, &nbytes); @@ -2150,76 +2162,100 @@ static void process_lingering_close(event_conn_state_t *cs) } /* (Re)queue the connection to come back when readable */ - cs->pub.sense = CONN_SENSE_DEFAULT; - update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q; - if (!pollset_add(cs, q, NULL)) { + if (!pollset_add(cs, CONN_SENSE_WANT_READ, q, NULL)) { close_connection(cs); } } -/* Call shutdown_connection() for the elements of 'q' reaching 'now', - * or for all if 'flush' is asked. +/* Call shutdown_connection() for the elements of 'q' that timed out, or + * for all if the given 'timeout' is zero. * Pre-condition: timeout_mutex must already be locked - * Post-condition: timeout_mutex will be locked again */ -static void do_process_timeout_queue(struct timeout_queue *queue, - apr_time_t now, int flush) +static void process_timeout_queue_ex(struct timeout_queue *queue, + apr_interval_time_t timeout, + apr_time_t now) { struct timeout_queue *q; - if (!*queue->total) { - return; - } - for (q = queue; q; q = q->next) { while (!APR_RING_EMPTY(&q->head, event_conn_state_t, timeout_list)) { event_conn_state_t *cs = APR_RING_FIRST(&q->head); + ap_assert(cs->q == q); - /* Stop if this entry did not expire, no following one will - * thanks to the single timeout per queue (latest entries are - * added to the tail). - */ - if (!flush && !TIMEOUT_EXPIRED(now, cs->queue_timestamp, - q->timeout)) { - /* Since this is the next expiring entry of this queue, update - * the global queues_next_expiry if it expires after this one. + if (timeout) { + apr_time_t elem_expiry; + if (timeout < 0 || timeout > q->timeout) { + elem_expiry = cs->queue_timestamp + q->timeout; + } + else { + elem_expiry = cs->queue_timestamp + timeout; + } + + /* Stop if this entry did not expire, no following one will + * thanks to the single timeout per queue (latest entries are + * added to the tail). */ - apr_time_t elem_expiry = cs->queue_timestamp + q->timeout; - apr_time_t next_expiry = queues_next_expiry; - if (!next_expiry + if (elem_expiry > now) { + /* This is the next expiring entry of this queue, update the + * global queues_next_expiry if it expires after this one. + */ + apr_time_t next_expiry = queues_next_expiry; + if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) { - queues_next_expiry = elem_expiry; + queues_next_expiry = elem_expiry; + } + break; } - break; } - if (!pollset_del(cs, 1)) { + if (cs->pse.q) { + /* Unregister from the worker_queue if the connection is + * pending there (note that the worker_queue is already + * locked by the listener when maintaining the pending_q). + */ + ap_assert(cs->pse.qe.cb && cs->pse.qe.cb_baton == cs); + ap_queue_kill_event_locked(worker_queue, &cs->pse.qe); + shutdown_connection(cs, now, 1); + } + else if (pollset_del(cs, 1)) { + /* Removed from the pollset and timeout queue. */ + shutdown_connection(cs, now, 0); + } + else { + /* Can't go anywhere, kill (and log). */ kill_connection(cs, APR_EGENERAL); - continue; } - - shutdown_connection(cs, now); } } } + static APR_INLINE void process_timeout_queue(struct timeout_queue *queue, apr_time_t now) { - do_process_timeout_queue(queue, now, 0); + if (!*queue->total) { + return; + } + + process_timeout_queue_ex(queue, -1, now); } -static APR_INLINE void kill_keepalive_queue(apr_time_t now) + +static APR_INLINE void shrink_timeout_queue(struct timeout_queue *queue, + apr_interval_time_t timeout, + apr_time_t now) { - /* If all workers are busy, we kill older keep-alive connections so - * that they may connect to another process. - */ - if (*keepalive_q->total) { - ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, - "All workers are busy or dying, will shutdown %u " - "keep-alive connections", *keepalive_q->total); + if (!*queue->total) { + return; } - do_process_timeout_queue(keepalive_q, now, 1); + + /* When all workers are busy or dying, apply the given timeout */ + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf, + "All workers are %s, shutdown %s queue (%u connections)", + dying ? "dying" : "busy", queue->name, + apr_atomic_read32(queue->total)); + + process_timeout_queue_ex(queue, timeout, now); } static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) @@ -2227,12 +2263,10 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pslot; - struct process_score *ps = ap_get_scoreboard_process(process_slot); + process_score *ps = ap_get_scoreboard_process(process_slot); + apr_time_t saved_process_time = 0, last_log = 0, last_shrink = 0; int listeners_closed = 0; - int have_idle_worker = 0; - apr_time_t last_log; - last_log = event_time_now(); free(ti); #if HAVE_SERF @@ -2248,12 +2282,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) for (;;) { timer_event_t *te; event_conn_state_t *cs; - const apr_pollfd_t *out_pfd; - apr_int32_t num = 0; - apr_interval_time_t timeout; + apr_time_t next_expiry = 0; + apr_interval_time_t timeout = -1; + apr_time_t poll_time, process_time; socket_callback_baton_t *user_chain; - apr_time_t time1, time2, expiry = -1; + const apr_pollfd_t *out_pfd; int workers_were_busy = 0; + apr_int32_t num = 0; + + if (!saved_process_time) { + poll_time = process_time = event_time_now(); + } + else { + poll_time = process_time = saved_process_time; + saved_process_time = 0; + } if (conns_this_child <= 0) { /* Signal (eventually) and keep going */ @@ -2261,7 +2304,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) conns_this_child = APR_INT32_MAX; } - if (listener_may_exit) { + if (apr_atomic_read32(&listener_may_exit)) { int first_close = !listeners_closed; if (first_close) { listeners_closed = 1; /* once */ @@ -2277,27 +2320,25 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * and shutdown the workers/child faster. */ if (first_close) { - time1 = time2 = event_time_now(); - goto do_maintenance; /* with expiry == -1 */ + goto do_maintenance; /* with next_expiry == 0 */ } } if (APLOGtrace6(ap_server_conf)) { - apr_time_t now = event_time_now(); /* trace log status every second */ - if (now - last_log > apr_time_from_sec(1)) { - last_log = now; + if (poll_time - last_log >= apr_time_from_sec(1)) { apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, - "connections: %u (clogged: %u read-line: %d write-completion: %d " - "keep-alive: %d lingering: %d suspended: %u)", + "avail:%i conns:%u rl:%u wc:%u ka:%u " + "lingering:%u suspended:%u clogged:%u", + ap_queue_info_avail(worker_queue_info), apr_atomic_read32(&connection_count), - apr_atomic_read32(&clogged_count), apr_atomic_read32(read_line_q->total), apr_atomic_read32(write_completion_q->total), apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), - apr_atomic_read32(&suspended_count)); + apr_atomic_read32(&suspended_count), + apr_atomic_read32(&clogged_count)); if (dying) { ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, "%u/%u workers shutdown", @@ -2305,6 +2346,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) threads_per_child); } apr_thread_mutex_unlock(timeout_mutex); + last_log = poll_time; } } @@ -2312,7 +2354,11 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) rc = serf_context_prerun(g_serf); if (rc != APR_SUCCESS) { /* TODO: what should we do here? ugh. */ + AP_DEBUG_ASSERT(0); } +#if 0 /* Can serf_context_prerun() block? */ + poll_time = process_time = event_time_now(); +#endif #endif /* Start with an infinite poll() timeout and update it according to @@ -2321,91 +2367,99 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * up occurs, otherwise periodic checks (maintenance, shutdown, ...) * must be performed. */ - time1 = event_time_now(); - timeout = -1; - /* Push expired timers to a worker, the first remaining one determines - * the maximum time to poll() below, if any. + /* Push expired timers to a worker, the first remaining one (if any) + * determines the maximum time to poll() below. */ - expiry = timers_next_expiry; - if (expiry && expiry <= time1) { /* XXX: not clock backward safe */ + next_expiry = timers_next_expiry; + if (next_expiry && next_expiry <= poll_time) { apr_thread_mutex_lock(g_timer_skiplist_mtx); while ((te = apr_skiplist_peek(timer_skiplist))) { - if (te->when > time1) { + if (te->when > poll_time) { break; } apr_skiplist_pop(timer_skiplist, NULL); - if (!te->canceled) { - if (te->cbfunc) { - if (te->pfds) { - /* remove all sockets from the pollset */ - apr_pool_cleanup_run(te->pfds->pool, te->pfds, - event_cleanup_poll_callback); - } - push_timer2worker(te); - continue; - } + if (te->canceled) { + put_timer_event(te, 1); + continue; + } + if (!te->cbfunc) { cs = te->baton; - ap_assert(cs && cs->te == te); + ap_assert(cs && cs->q_te == te); + put_timer_event(te, 1); + cs->q_te = NULL; + ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "timed out connection %" CS_FMT, CS_ARG(cs)); - (void)pollset_del(cs, 0); kill_connection(cs, APR_TIMEUP); + continue; } - APR_RING_INSERT_TAIL(&timer_free_ring.link, te, - timer_event_t, link); + if (te->pfds) { + /* remove all sockets from the pollset */ + apr_pool_cleanup_run(te->pfds->pool, te->pfds, + event_cleanup_poll_callback); + } + push2worker(NULL, te, process_time, &workers_were_busy); } if (te) { - expiry = te->when; + next_expiry = te->when; } else { - expiry = 0; + next_expiry = 0; } - timers_next_expiry = expiry; + timers_next_expiry = next_expiry; apr_thread_mutex_unlock(g_timer_skiplist_mtx); } - if (expiry) { - timeout = expiry > time1 ? expiry - time1 : 0; + if (next_expiry) { + timeout = next_expiry > poll_time ? next_expiry - poll_time : 0; } /* Same for queues, use their next expiry, if any. */ - expiry = queues_next_expiry; - if (expiry && (timeout < 0 - || expiry <= time1 - || timeout > expiry - time1)) { - timeout = expiry > time1 ? expiry - time1 : 0; + next_expiry = queues_next_expiry; + if (next_expiry && (timeout < 0 || next_expiry - poll_time < timeout)) { + timeout = next_expiry > poll_time ? next_expiry - poll_time : 0; } /* When non-wakeable, don't wait more than 100 ms, in any case. */ -#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100) - if (!listener_is_wakeable - && (timeout < 0 - || timeout > NON_WAKEABLE_POLL_TIMEOUT)) { - timeout = NON_WAKEABLE_POLL_TIMEOUT; - } - else if (timeout > 0) { - /* apr_pollset_poll() might round down the timeout to milliseconds, - * let's forcibly round up here to never return before the timeout. + if (!listener_is_wakeable && (timeout < 0 || + timeout > LISTENER_NOWAKEUP_TIMEOUT)) { + timeout = LISTENER_NOWAKEUP_TIMEOUT; + } +#if 0 + if (timeout < 0 || timeout > LISTENER_IDLE_TIMEOUT) { + timeout = LISTENER_IDLE_TIMEOUT; + } + else +#endif + if (timeout > 0) { + /* apr_pollset_poll() might round down the timeout to + * milliseconds, let's forcibly round up here to never + * return before the timeout. */ timeout = apr_time_from_msec( apr_time_as_msec(timeout + apr_time_from_msec(1) - 1) ); } + /* If needed, reenable listening sockets before poll()ing */ + if (should_reenable_listensocks()) { + enable_listensocks(); + } + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, - "pollset: wait for timeout=%" APR_TIME_T_FMT + "pollset: wait timeout=%" APR_TIME_T_FMT " queues_timeout=%" APR_TIME_T_FMT " timers_timeout=%" APR_TIME_T_FMT - " exit=%d/%d conns=%d", + " conns=%d exit=%d/%d", timeout, - queues_next_expiry ? queues_next_expiry - time1 : -1, - timers_next_expiry ? timers_next_expiry - time1 : -1, - listener_may_exit, dying, - apr_atomic_read32(&connection_count)); + queues_next_expiry ? queues_next_expiry - poll_time : 0, + timers_next_expiry ? timers_next_expiry - poll_time : 0, + apr_atomic_read32(&connection_count), + apr_atomic_read32(&listener_may_exit), dying); rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd); if (rc != APR_SUCCESS) { @@ -2414,35 +2468,45 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) APLOGNO(03267) "apr_pollset_poll failed. Attempting to " "shutdown process gracefully"); + AP_DEBUG_ASSERT(0); signal_threads(ST_GRACEFUL); } num = 0; } - /* Fetch the current time once after polling and use it for - * everything below since it's all non-(indefinitely-)blocking - * code. + /* The processing time(stamp) fetched once after polling and used for + * everything below (which is all non-(indefinitely-)blocking code). + * + * XXX possible optimization: stash this time for use as + * r->request_time for new requests. */ - time2 = event_time_now(); + if (timeout >= 0 && APR_STATUS_IS_TIMEUP(rc)) { + process_time = poll_time + timeout; + saved_process_time = process_time; + } + else { + process_time = event_time_now(); + saved_process_time = 0; + } ap_log_error(APLOG_MARK, APLOG_TRACE7, rc, ap_server_conf, - "pollset: have #%i time=%" APR_TIME_T_FMT "/%" APR_TIME_T_FMT + "pollset: have num=%i" + " elapsed=%" APR_TIME_T_FMT "/%" APR_TIME_T_FMT " queues_timeout=%" APR_TIME_T_FMT " timers_timeout=%" APR_TIME_T_FMT - " exit=%d/%d conns=%d", - (int)num, time2 - time1, timeout, - queues_next_expiry ? queues_next_expiry - time2 : -1, - timers_next_expiry ? timers_next_expiry - time2 : -1, - listener_may_exit, dying, - apr_atomic_read32(&connection_count)); - - /* XXX possible optimization: stash the current time for use as - * r->request_time for new requests or queues maintenance - */ + " conns=%d exit=%d/%d", + (int)num, process_time - poll_time, timeout, + queues_next_expiry ? queues_next_expiry - process_time : 0, + timers_next_expiry ? timers_next_expiry - process_time : 0, + apr_atomic_read32(&connection_count), + apr_atomic_read32(&listener_may_exit), dying); for (user_chain = NULL; num > 0; --num, ++out_pfd) { - listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data; - if (pt->type == PT_CSD) { + listener_poll_type *pt = out_pfd->client_data; + socket_callback_baton_t *baton; + + switch (pt->type) { + case PT_CSD: /* one of the sockets is ready */ cs = (event_conn_state_t *)pt->baton; @@ -2468,40 +2532,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) ap_assert(0); } - if (!pollset_del(cs, 0)) { + if (pollset_del(cs, 0)) { + push2worker(cs, NULL, process_time, &workers_were_busy); + } + else { + /* Can't go anywhere, kill (and log) and next. */ kill_connection(cs, APR_EGENERAL); - continue; } + break; - /* If we can't get a worker immediately (nonblocking), - * push to the pending queue for the next idle worker - * to take it. - */ - get_worker(&have_idle_worker, &workers_were_busy); - if (!have_idle_worker) { - push2pending(cs, time2); - } - else if (push2worker(cs) == APR_SUCCESS) { - have_idle_worker = 0; - } - } - else if (pt->type == PT_ACCEPT && !listeners_disabled()) { + case PT_ACCEPT: /* A Listener Socket is ready for an accept() */ if (workers_were_busy) { - if (disable_listensocks()) - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, - APLOGNO(03268) - "All workers busy, not accepting new conns " - "in this process"); - } - else if (connections_above_limit(&workers_were_busy)) { - if (disable_listensocks()) - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, - APLOGNO(03269) - "Too many open connections, not accepting " - "new conns in this process"); + continue; } - else if (!listener_may_exit) { + if (!apr_atomic_read32(&listener_may_exit)) { void *csd = NULL; ap_listen_rec *lr = (ap_listen_rec *) pt->baton; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ @@ -2536,23 +2581,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) } rc = lr->accept_func(&csd, lr, ptrans); - - /* later we trash rv and rely on csd to indicate - * success/failure - */ - AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd); - - if (rc == APR_EGENERAL) { - /* E[NM]FILE, ENOMEM, etc */ - resource_shortage = 1; - signal_threads(ST_GRACEFUL); - } - else if (ap_accept_error_is_nonfatal(rc)) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, rc, ap_server_conf, - "accept() on client socket failed"); - } - - if (csd != NULL) { + if (rc == APR_SUCCESS) { conns_this_child--; /* Create and account for the connection from here, or @@ -2560,49 +2589,49 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * would consider it does not exist and could exit the * child too early. */ + ap_assert(csd != NULL); cs = make_conn_state(ptrans, csd); - - /* If we can't get a worker immediately (nonblocking), - * push to the pending queue for the next idle worker - * to take it. - */ - get_worker(&have_idle_worker, &workers_were_busy); - if (!have_idle_worker) { - push2pending(cs, time2); - } - else if (push2worker(cs) == APR_SUCCESS) { - have_idle_worker = 0; - } + push2worker(cs, NULL, process_time, &workers_were_busy); } else { + if (rc == APR_EGENERAL) { + /* E[NM]FILE, ENOMEM, etc */ + resource_shortage = 1; + signal_threads(ST_GRACEFUL); + } + else if (ap_accept_error_is_nonfatal(rc)) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, rc, ap_server_conf, + "accept() on client socket failed"); + } ap_queue_info_push_pool(worker_queue_info, ptrans); } } - } /* if:else on pt->type */ + break; + #if HAVE_SERF - else if (pt->type == PT_SERF) { + case PT_SERF: /* send socket to serf. */ /* XXXX: this doesn't require get_worker() */ serf_event_trigger(g_serf, pt->baton, out_pfd); - } - + break; #endif - else if (pt->type == PT_USER) { - socket_callback_baton_t *baton = pt->baton; - if (baton->cancel_event) { - baton->cancel_event->canceled = 1; - } - /* We only signal once per N sockets with this baton, - * and after this loop to avoid any race/lifetime issue - * with the user callback being called while we handle - * the same baton multiple times here. + case PT_USER: + /* Multiple pfds of the same baton might trigger in this pass + * so chain once here and run the cleanup only after this loop + * to avoid lifetime issues (i.e. pfds's pool cleared and + * pfd->client_data dereferenced here). */ - if (!baton->signaled) { - baton->signaled = 1; + baton = pt->baton; + if (!baton->next) { + if (baton->cancel_event) { + baton->cancel_event->canceled = 1; + baton->cancel_event = NULL; + } baton->next = user_chain; user_chain = baton; } + break; } } /* for processing poll */ @@ -2617,12 +2646,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) event_cleanup_poll_callback); /* masquerade as a timer event that is firing */ - te = event_get_timer_event(-1 /* fake timer */, - baton->cbfunc, - baton->user_baton, - 0, /* don't insert it */ - NULL /* no associated socket callback */); - push_timer2worker(te); + te = get_timer_event(-1 /* fake timer */, + baton->cbfunc, + baton->user_baton, + 0, /* don't insert it */ + NULL /* no associated socket callback */); + push2worker(NULL, te, process_time, &workers_were_busy); } /* We process the timeout queues here only when the global @@ -2631,88 +2660,81 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) * while latest ones are only taken into account here (in listener) * during queues' processing, with the lock held. This works both * with and without wake-ability. - * Even if time2 drifted a bit since it was fetched and the real + * Even if process_time drifted a bit since it was fetched and the real * "now" went below "expiry" in the meantime, the next poll() will * return immediately so the maintenance will happen then. */ - expiry = queues_next_expiry; + next_expiry = queues_next_expiry; + if (next_expiry && next_expiry <= process_time) { do_maintenance: - if (expiry && expiry <= time2) { + AP_DEBUG_ASSERT(process_time); + ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, - "queues maintenance: timeout=%" APR_TIME_T_FMT, - expiry > 0 ? expiry - time2 : -1); + "queues maintenance: expired=%" APR_TIME_T_FMT, + next_expiry ? process_time - next_expiry : -1); apr_thread_mutex_lock(timeout_mutex); - /* Steps below will recompute this. */ + /* Recompute this (under the lock) by walking the timeout queues */ queues_next_expiry = 0; - /* Step 1: keepalive timeouts */ - if (workers_were_busy || dying) { - kill_keepalive_queue(time2); /* kill'em all \m/ */ + if (dying || workers_were_busy) { + /* Shorten the keepalive timeout but don't kill the most recent + * ones (i.e. the hottests) to minimize unexpected disruption, + * unless the child is dying anyway. + */ + apr_interval_time_t busy_timeout = (dying) ? 0 : BUSY_TIMEOUT; + shrink_timeout_queue(linger_q, busy_timeout, process_time); + shrink_timeout_queue(short_linger_q, busy_timeout, process_time); + shrink_timeout_queue(keepalive_q, busy_timeout, process_time); + last_shrink = process_time; } else { - process_timeout_queue(keepalive_q, time2); + process_timeout_queue(linger_q, process_time); + process_timeout_queue(short_linger_q, process_time); + process_timeout_queue(keepalive_q, process_time); } - /* Step 2: read line timeouts */ - process_timeout_queue(read_line_q, time2); - /* Step 3: write completion timeouts */ - process_timeout_queue(write_completion_q, time2); - /* Step 4: (normal) lingering close completion timeouts */ - if (dying && linger_q->timeout > short_linger_q->timeout) { - /* Dying, force short timeout for normal lingering close */ - linger_q->timeout = short_linger_q->timeout; + process_timeout_queue(read_line_q, process_time); + process_timeout_queue(write_completion_q, process_time); + + /* Pending connections race with the workers (dequeuing) under + * the worker_queue mutex. + */ + if (apr_atomic_read32(pending_q->total)) { + ap_queue_lock(worker_queue); + process_timeout_queue(pending_q, process_time); + ap_queue_unlock(worker_queue); } - process_timeout_queue(linger_q, time2); - /* Step 5: (short) lingering close completion timeouts */ - process_timeout_queue(short_linger_q, time2); - /* Step 6: pending completion timeouts */ - process_timeout_queue(pending_q, time2); - expiry = queues_next_expiry; + next_expiry = queues_next_expiry; apr_thread_mutex_unlock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf, - "queues maintained: timeout=%" APR_TIME_T_FMT, - expiry > 0 ? expiry - time2 : -1); + "queues maintained: next timeout=%" APR_TIME_T_FMT, + next_expiry ? next_expiry - process_time : -1); - ps->keep_alive = apr_atomic_read32(keepalive_q->total); ps->read_line = apr_atomic_read32(read_line_q->total); ps->write_completion = apr_atomic_read32(write_completion_q->total); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); ps->connections = apr_atomic_read32(&connection_count); ps->suspended = apr_atomic_read32(&suspended_count); ps->lingering_close = apr_atomic_read32(&lingering_count); } - else if ((workers_were_busy || dying) - && apr_atomic_read32(keepalive_q->total)) { + else if ((dying || (workers_were_busy && + process_time - last_shrink >= BUSY_TIMEOUT / 8)) + && (apr_atomic_read32(keepalive_q->total) + || apr_atomic_read32(short_linger_q->total) + || apr_atomic_read32(linger_q->total))) { + apr_interval_time_t busy_timeout = (dying) ? 0 : BUSY_TIMEOUT; apr_thread_mutex_lock(timeout_mutex); - kill_keepalive_queue(time2); /* kill'em all \m/ */ + shrink_timeout_queue(linger_q, busy_timeout, process_time); + shrink_timeout_queue(short_linger_q, busy_timeout, process_time); + shrink_timeout_queue(keepalive_q, busy_timeout, process_time); apr_thread_mutex_unlock(timeout_mutex); - ps->keep_alive = 0; - } - - /* If there are some pending connections (deferred to worker), schedule - * them now. We might wakeup a worker spuriously if another one empties - * the pending queue in the meantime, but there also may be no active - * or all busy workers for an undefined time. In any case no pending - * connection can't starve if we do that here since the queue is filled - * only above in the listener and it's emptied only in the worker(s); - * thus an empty queue here means it will stay so while the listener - * waits (possibly indefinitely) in poll(). It's also not an issue if - * we can't get an idle worker (nonblocking) here since it means that - * there are plenty ones to consume the queue before idling. - */ - if (apr_atomic_read32(pending_q->total)) { - get_worker(&have_idle_worker, &workers_were_busy); - if (have_idle_worker - && apr_atomic_read32(pending_q->total) /* recheck(?) */ - && push2worker(NULL) == APR_SUCCESS) { - have_idle_worker = 0; - } - } + last_shrink = process_time; - if (!workers_were_busy && should_enable_listensocks()) { - enable_listensocks(); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); + ps->lingering_close = apr_atomic_read32(&lingering_count); } } /* listener main loop */ @@ -2734,11 +2756,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) static int worker_thread_should_exit_early(int slot) { for (;;) { + const apr_uint32_t max = threads_per_child; apr_uint32_t conns = apr_atomic_read32(&connection_count); apr_uint32_t deads = apr_atomic_read32(&threads_shutdown); - AP_DEBUG_ASSERT(deads < threads_per_child); - if (conns >= threads_per_child - deads) + AP_DEBUG_ASSERT(deads < max); + if (conns >= max - deads) return 0; if (apr_atomic_cas32(&threads_shutdown, deads + 1, deads) == deads) { @@ -2764,10 +2787,11 @@ static int worker_thread_should_exit_early(int slot) static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) { proc_info *ti = dummy; - int process_slot = ti->pslot; int thread_slot = ti->tslot; - apr_status_t rv; + int process_slot = ti->pslot; + process_score *ps = ap_get_scoreboard_process(process_slot); int is_idle = 0; + apr_status_t rv; free(ti); @@ -2778,10 +2802,8 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) SERVER_STARTING, NULL); for (;;) { - apr_socket_t *csd = NULL; - event_conn_state_t *cs; - timer_event_t *te = NULL; - apr_pool_t *ptrans; /* Pool for per-transaction stuff */ + fd_queue_event_t *qe; + apr_int32_t avail; if (!is_idle) { rv = ap_queue_info_set_idle(worker_queue_info, NULL); @@ -2790,35 +2812,40 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) APLOGNO(03270) "ap_queue_info_set_idle failed. Attempting to " "shutdown process gracefully."); + AP_DEBUG_ASSERT(0); signal_threads(ST_GRACEFUL); break; } - /* A new idler may have changed connections_above_limit(), + + /* A new idler may have changed pending_above_limit(), * let the listener know and decide. */ - if (listener_is_wakeable && should_enable_listensocks()) { + if (listener_is_wakeable && should_reenable_listensocks()) { apr_pollset_wakeup(event_pollset); } + is_idle = 1; } + avail = ap_queue_info_avail(worker_queue_info); + apr_atomic_set32(&ps->pending, (avail < 0) ? -avail : 0); ap_update_child_status_from_indexes(process_slot, thread_slot, - dying ? SERVER_GRACEFUL - : SERVER_READY, NULL); - worker_pop: - if (workers_may_exit) { + (apr_atomic_read32(&dying) + ? SERVER_GRACEFUL : SERVER_READY), + NULL); + + if (apr_atomic_read32(&workers_may_exit)) { ap_log_error(APLOG_MARK, APLOG_TRACE5, 0, ap_server_conf, "worker thread %i/%i may exit", thread_slot, threads_per_child); break; } - if (dying && worker_thread_should_exit_early(thread_slot)) { + if (apr_atomic_read32(&dying) + && worker_thread_should_exit_early(thread_slot)) { break; } - rv = ap_queue_pop_something(worker_queue, &csd, (void **)&cs, - &ptrans, &te); - + rv = ap_queue_pop_event(worker_queue, &qe); if (rv != APR_SUCCESS) { /* We get APR_EOF during a graceful shutdown once all the * connections accepted by this server process have been handled. @@ -2829,6 +2856,7 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) thread_slot, threads_per_child); break; } + /* We get APR_EINTR whenever ap_queue_pop_*() has been interrupted * from an explicit call to ap_queue_interrupt_all(). This allows * us to unblock threads stuck in ap_queue_pop_*() when a shutdown @@ -2840,72 +2868,60 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy) * may have already been cleaned up. Don't log the "error" if * workers_may_exit is set. */ - else if (APR_STATUS_IS_EINTR(rv)) { - goto worker_pop; - } - /* We got some other error. */ - else if (!workers_may_exit) { + if (!APR_STATUS_IS_EINTR(rv) && !apr_atomic_read32(&workers_may_exit)) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, - APLOGNO(03099) "ap_queue_pop_socket failed"); + APLOGNO(03099) "ap_queue_pop_event failed"); + AP_DEBUG_ASSERT(0); + signal_threads(ST_GRACEFUL); } continue; } - if (te != NULL) { - te->cbfunc(te->baton); - { - apr_thread_mutex_lock(g_timer_skiplist_mtx); - APR_RING_INSERT_TAIL(&timer_free_ring.link, te, timer_event_t, link); - apr_thread_mutex_unlock(g_timer_skiplist_mtx); - } - } - else { - is_idle = 0; - if (csd != NULL) { - worker_sockets[thread_slot] = csd; - process_socket(thd, ptrans, csd, cs, process_slot, thread_slot); - worker_sockets[thread_slot] = NULL; - } + is_idle = 0; /* event consumed */ + + if (qe->type == FD_QUEUE_EVENT_SOCK) { + apr_pool_t *p; + apr_socket_t *csd; + event_conn_state_t *cs; + + ap_assert(qe->data.se); + p = qe->data.se->p; + csd = qe->data.se->sd; + cs = qe->data.se->baton; + ap_assert(p && csd && cs && qe == &cs->pse.qe); + + worker_sockets[thread_slot] = csd; + process_socket(thd, p, csd, cs, process_slot, thread_slot); + worker_sockets[thread_slot] = NULL; } + else if (qe->type == FD_QUEUE_EVENT_TIMER) { + timer_event_t *te; + ap_mpm_callback_fn_t *cbfunc; + void *baton; - /* If there are pending connections, handle them now. */ - while (!workers_may_exit && apr_atomic_read32(pending_q->total)) { - struct timeout_queue *q; + te = qe->data.te; + ap_assert(te && qe == te2qe(te)); - cs = NULL; - apr_thread_mutex_lock(timeout_mutex); - for (q = pending_q; q; q = q->next) { - if (!APR_RING_EMPTY(&q->head, event_conn_state_t, - timeout_list)) { - cs = APR_RING_FIRST(&q->head); - TO_QUEUE_REMOVE(q, cs); - break; - } - } - apr_thread_mutex_unlock(timeout_mutex); - if (!cs) { - break; - } - cs->pending = 0; + cbfunc = te->cbfunc; + baton = te->baton; - if (cs->c) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, - "pulled pending connection %" CS_FMT, CS_ARG(cs)); - } - else { - ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, - "pulled pending connection %pp", cs); - } + /* first recycle the timer event */ + put_timer_event(te, 0); - worker_sockets[thread_slot] = csd = cs_sd(cs); - process_socket(thd, cs->p, csd, cs, process_slot, thread_slot); - worker_sockets[thread_slot] = NULL; + ap_update_child_status_from_indexes(process_slot, thread_slot, + SERVER_BUSY_WRITE, NULL); + ap_assert(cbfunc != NULL); + cbfunc(baton); + } + else { + ap_assert(0); } } ap_update_child_status_from_indexes(process_slot, thread_slot, - dying ? SERVER_DEAD - : SERVER_GRACEFUL, NULL); + (apr_atomic_read32(&dying) + ? SERVER_DEAD : SERVER_GRACEFUL), + NULL); apr_thread_exit(thd, APR_SUCCESS); return NULL; @@ -2952,9 +2968,9 @@ static void setup_threads_runtime(void) APR_POLLSET_PORT, APR_POLLSET_EPOLL }; /* XXX: K-A or lingering close connection included in the async factor */ - const apr_uint32_t pollset_size = (apr_uint32_t)num_listensocks + - (apr_uint32_t)threads_per_child * - (async_factor > 2 ? async_factor : 2); + const apr_size_t num_threads = ((apr_size_t)threads_per_child * + (async_factor > 2 ? async_factor : 2)); + const apr_size_t pollset_size = num_listensocks + num_threads; int pollset_flags; /* Event's skiplist operations will happen concurrently with other modules' @@ -2986,8 +3002,8 @@ static void setup_threads_runtime(void) apr_pool_tag(pruntime, "mpm_runtime"); /* We must create the fd queues before we start up the listener - * and worker threads. */ - rv = ap_queue_create(&worker_queue, threads_per_child, pruntime); + * and worker threads, it's bounded by pending_above_limit(). */ + rv = ap_queue_create(&worker_queue, -1, pruntime); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf, APLOGNO(03100) "ap_queue_create() failed"); @@ -3053,13 +3069,13 @@ static void setup_threads_runtime(void) } /* Add listeners to the main pollset */ - listener_pollfd = apr_pcalloc(pruntime, num_listensocks * - sizeof(apr_pollfd_t)); + listener_pollfd = apr_pcalloc(pruntime, + num_listensocks * sizeof(apr_pollfd_t)); for (i = 0, lr = my_bucket->listeners; lr; lr = lr->next, i++) { apr_pollfd_t *pfd; listener_poll_type *pt; - AP_DEBUG_ASSERT(i < num_listensocks); + ap_assert(i < num_listensocks); pfd = &listener_pollfd[i]; pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR; @@ -3075,8 +3091,7 @@ static void setup_threads_runtime(void) pfd->desc_type = APR_POLL_SOCKET; pfd->desc.s = lr->sd; - pt = apr_pcalloc(pruntime, sizeof(*pt)); - pfd->client_data = pt; + pfd->client_data = pt = apr_pcalloc(pruntime, sizeof(*pt)); pt->type = PT_ACCEPT; pt->baton = lr; @@ -3153,7 +3168,8 @@ static void *APR_THREAD_FUNC start_threads(apr_thread_t * thd, void *dummy) } - if (start_thread_may_exit || threads_created == threads_per_child) { + if (apr_atomic_read32(&start_thread_may_exit) + || threads_created == threads_per_child) { break; } /* wait for previous generation to clean up an entry */ @@ -3203,9 +3219,9 @@ static void join_workers(apr_thread_t * listener, apr_thread_t ** threads) */ iter = 0; - while (!dying) { + while (!apr_atomic_read32(&dying)) { apr_sleep(apr_time_from_msec(500)); - if (dying || ++iter > 10) { + if (apr_atomic_read32(&dying) || ++iter > 10) { break; } /* listener has not stopped accepting yet */ @@ -3245,10 +3261,11 @@ static void join_start_thread(apr_thread_t * start_thread_id) { apr_status_t rv, thread_rv; - start_thread_may_exit = 1; /* tell it to give up in case it is still - * trying to take over slots from a - * previous generation - */ + /* tell it to give up in case it is still trying to take over slots + * from a previous generation + */ + apr_atomic_set32(&start_thread_may_exit, 1); + rv = apr_thread_join(&thread_rv, start_thread_id); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, APLOGNO(00478) @@ -3457,8 +3474,8 @@ static int make_child(server_rec * s, int slot, int bucket) { int pid; - if (slot + 1 > retained->max_daemons_limit) { - retained->max_daemons_limit = slot + 1; + if (slot + 1 > retained->max_daemons_used) { + retained->max_daemons_used = slot + 1; } if (ap_scoreboard_image->parent[slot].pid != 0) { @@ -3560,11 +3577,12 @@ static void perform_idle_server_maintenance(int child_bucket, int free_slots[MAX_SPAWN_RATE]; int last_non_dead = -1; int active_thread_count = 0; + int pending_events_count = 0; int i, j; /* We only care about the given child_bucket in this call */ for (i = child_bucket; i < server_limit; i += num_buckets) { - if (i >= *max_daemons_used && + if (i >= retained->max_daemons_used && free_length == retained->idle_spawn_rate[child_bucket]) { /* short cut if all active processes have been examined and * enough empty scoreboard slots have been found @@ -3588,7 +3606,7 @@ static void perform_idle_server_maintenance(int child_bucket, * So we hopefully won't need to fork more if we count it. * This depends on the ordering of SERVER_READY and SERVER_STARTING. */ - if (status <= SERVER_READY && !ps->quiescing && !ps->not_accepting + if (status <= SERVER_READY && !ps->quiescing && ps->generation == retained->mpm->my_generation) { ++idle_thread_count; } @@ -3600,6 +3618,7 @@ static void perform_idle_server_maintenance(int child_bucket, if (child_threads_active == threads_per_child) { had_healthy_child = 1; } + pending_events_count += apr_atomic_read32(&ps->pending); last_non_dead = i; } else if (free_length < retained->idle_spawn_rate[child_bucket]) { @@ -3608,6 +3627,44 @@ static void perform_idle_server_maintenance(int child_bucket, } if (*max_daemons_used < last_non_dead + 1) { *max_daemons_used = last_non_dead + 1; + + /* Below make_child() can grow retained->max_daemons_used, so + * be accurate if the one being computed is higher already. + */ + if (retained->max_daemons_used < *max_daemons_used) { + retained->max_daemons_used = *max_daemons_used; + } + } + + if (APLOGdebug(ap_server_conf)) { + static int s_idle_thread_count; + static int s_active_thread_count; + static int s_pending_events_count; + if (child_bucket == 0) { + s_idle_thread_count = 0; + s_pending_events_count = 0; + s_active_thread_count = 0; + } + s_idle_thread_count += idle_thread_count; + s_pending_events_count += pending_events_count; + s_active_thread_count += active_thread_count; + if (child_bucket == num_buckets - 1) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, + "maintenance: " + "threads avail:%d=%d-%d active:%d/%d max:%d, " + "daemons active:%d/%d used:%d max:%d limit:%d", + s_idle_thread_count - s_pending_events_count, + s_idle_thread_count, s_pending_events_count, + s_active_thread_count, retained->active_daemons * threads_per_child, + max_workers, retained->active_daemons, retained->total_daemons, + *max_daemons_used, active_daemons_limit, server_limit); + } + } + + /* Drop idle threads reserved by pending events already */ + idle_thread_count -= pending_events_count; + if (idle_thread_count < 0) { + idle_thread_count = 0; } if (retained->sick_child_detected) { @@ -3876,7 +3933,7 @@ static void server_main_loop(int remaining_children_to_start) for (i = 0; i < num_buckets; i++) { perform_idle_server_maintenance(i, &max_daemons_used); } - retained->max_daemons_limit = max_daemons_used; + retained->max_daemons_used = max_daemons_used; } } @@ -4115,7 +4172,7 @@ static int event_run(apr_pool_t * _pconf, apr_pool_t * plog, server_rec * s) ap_relieve_child_processes(event_note_child_killed); active_children = 0; - for (index = 0; index < retained->max_daemons_limit; ++index) { + for (index = 0; index < retained->max_daemons_used; ++index) { if (ap_mpm_safe_kill(MPM_CHILD_PID(index), 0) == APR_SUCCESS) { active_children = 1; /* Having just one child is enough to stay around */ @@ -4163,17 +4220,13 @@ static void setup_slave_conn(conn_rec *c, void *csd) mcs = ap_get_module_config(c->master->conn_config, &mpm_event_module); - cs = apr_pcalloc(c->pool, sizeof(*cs)); + cs = make_conn_state(c->pool, csd); cs->c = c; - cs->r = NULL; cs->sc = mcs->sc; cs->suspended = 0; - cs->p = c->pool; cs->bucket_alloc = c->bucket_alloc; cs->pfd = mcs->pfd; cs->pub = mcs->pub; - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; - cs->pub.sense = CONN_SENSE_DEFAULT; c->cs = &(cs->pub); ap_set_module_config(c->conn_config, &mpm_event_module, cs); @@ -4213,6 +4266,7 @@ static int event_open_logs(apr_pool_t * p, apr_pool_t * plog, { int startup = 0; int level_flags = 0; + int num; pconf = p; @@ -4222,13 +4276,14 @@ static int event_open_logs(apr_pool_t * p, apr_pool_t * plog, level_flags |= APLOG_STARTUP; } - if ((num_listensocks = ap_setup_listeners(ap_server_conf)) < 1) { + if ((num = ap_setup_listeners(ap_server_conf)) < 1) { ap_log_error(APLOG_MARK, APLOG_ALERT | level_flags, 0, (startup ? NULL : s), APLOGNO(03272) "no listening sockets available, shutting down"); return !OK; } + num_listensocks = num; return OK; } @@ -4346,33 +4401,32 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, ka_h = apr_hash_make(ptemp); pd_h = apr_hash_make(ptemp); - linger_q = TO_QUEUE_MAKE(pconf, MAX_USECS_TO_LINGER, NULL); - pending_linger_q = TO_QUEUE_CHAIN(pconf, MAX_USECS_TO_LINGER, + linger_q = TO_QUEUE_MAKE(pconf, "linger", MAX_USECS_TO_LINGER, NULL); + pending_linger_q = TO_QUEUE_CHAIN(pconf, "pending_linger", MAX_USECS_TO_LINGER, &pending_q, pd_h, ptemp); - short_linger_q = TO_QUEUE_MAKE(pconf, USECS_TO_LINGER, NULL); - pending_short_linger_q = TO_QUEUE_CHAIN(pconf, USECS_TO_LINGER, + short_linger_q = TO_QUEUE_MAKE(pconf, "short_linger", USECS_TO_LINGER, NULL); + pending_short_linger_q = TO_QUEUE_CHAIN(pconf, "pending_short_linger", USECS_TO_LINGER, &pending_q, pd_h, ptemp); for (; s; s = s->next) { event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc); - ap_set_module_config(s->module_config, &mpm_event_module, sc); sc->s = s; /* backref */ - sc->rl_q = TO_QUEUE_CHAIN(pconf, s->timeout, + sc->rl_q = TO_QUEUE_CHAIN(pconf, "read", s->timeout, &read_line_q, rl_h, ptemp); - sc->pending_rl_q = TO_QUEUE_CHAIN(pconf, s->timeout, + sc->pending_rl_q = TO_QUEUE_CHAIN(pconf, "pending_read", s->timeout, &pending_q, pd_h, ptemp); - sc->wc_q = TO_QUEUE_CHAIN(pconf, s->timeout, + sc->wc_q = TO_QUEUE_CHAIN(pconf, "write", s->timeout, &write_completion_q, wc_h, ptemp); - sc->pending_wc_q = TO_QUEUE_CHAIN(pconf, s->timeout, + sc->pending_wc_q = TO_QUEUE_CHAIN(pconf, "pending_write", s->timeout, &pending_q, pd_h, ptemp); - sc->ka_q = TO_QUEUE_CHAIN(pconf, s->keep_alive_timeout, + sc->ka_q = TO_QUEUE_CHAIN(pconf, "keepalive", s->keep_alive_timeout, &keepalive_q, ka_h, ptemp); - sc->pending_ka_q = TO_QUEUE_CHAIN(pconf, s->keep_alive_timeout, + sc->pending_ka_q = TO_QUEUE_CHAIN(pconf, "pending_keepalive", s->keep_alive_timeout, &pending_q, pd_h, ptemp); } @@ -4591,6 +4645,8 @@ static int event_check_config(apr_pool_t *p, apr_pool_t *plog, * checked in ap_mpm_run() */ + max_pending_events = (threads_per_child * async_factor) / PENDING_ASYNC_RATIO; + return OK; } diff --git a/server/mpm/worker/worker.c b/server/mpm/worker/worker.c index 9f05dbb8196..080213bbc92 100644 --- a/server/mpm/worker/worker.c +++ b/server/mpm/worker/worker.c @@ -690,7 +690,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t *thd, void * dummy) accept_mutex_error("unlock", rv, process_slot); } if (csd != NULL) { - rv = ap_queue_push_socket(worker_queue, csd, NULL, ptrans); + rv = ap_queue_push_socket(worker_queue, csd, ptrans); if (rv) { /* trash the connection; we couldn't queue the connected * socket to a worker diff --git a/server/mpm_fdqueue.c b/server/mpm_fdqueue.c index 3697ca722f6..d00cf985908 100644 --- a/server/mpm_fdqueue.c +++ b/server/mpm_fdqueue.c @@ -28,16 +28,28 @@ struct recycled_pool struct recycled_pool *next; }; +struct fd_queue_t +{ + APR_RING_HEAD(fd_queue_ring, fd_queue_elem_t) elts; + apr_uint32_t nelts; + apr_uint32_t bounds; + apr_pool_t *spare_pool; + fd_queue_elem_t *spare_elems; + apr_thread_mutex_t *one_big_mutex; + apr_thread_cond_t *not_empty; + apr_uint32_t num_waiters; + apr_uint32_t num_interrupts; + apr_uint32_t terminated; +}; + struct fd_queue_info_t { - apr_uint32_t volatile idlers; /** - * >= zero_pt: number of idle worker threads - * < zero_pt: number of threads blocked, - * waiting for an idle worker - */ + volatile apr_uint32_t idlers; /* >= zero_pt: number of idle worker threads + * < zero_pt: number of pending events + * (waiting for an idle thread) */ apr_thread_mutex_t *idlers_mutex; apr_thread_cond_t *wait_for_idler; - int terminated; + apr_uint32_t terminated; int max_idlers; int max_recycled_pools; apr_uint32_t recycled_pools_count; @@ -46,9 +58,11 @@ struct fd_queue_info_t struct fd_queue_elem_t { - apr_socket_t *sd; - void *sd_baton; - apr_pool_t *p; + APR_RING_ENTRY(fd_queue_elem_t) link; /* in ring */ + struct fd_queue_elem_t *next; /* in spare list */ + sock_event_t my_sock_event; + fd_queue_event_t my_event; + fd_queue_event_t *event; }; static apr_status_t queue_info_cleanup(void *data_) @@ -130,11 +144,20 @@ apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info, return APR_SUCCESS; } +apr_status_t ap_queue_info_get_idler(fd_queue_info_t *queue_info) +{ + /* apr_atomic_add32() returns the previous value */ + if (apr_atomic_add32(&queue_info->idlers, -1) <= zero_pt) { + return APR_EBUSY; + } + return APR_SUCCESS; +} + apr_status_t ap_queue_info_try_get_idler(fd_queue_info_t *queue_info) { /* Don't block if there isn't any idle worker. */ for (;;) { - apr_uint32_t idlers = queue_info->idlers; + apr_uint32_t idlers = apr_atomic_read32(&queue_info->idlers); if (idlers <= zero_pt) { return APR_EAGAIN; } @@ -181,7 +204,11 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, * queue_info->idlers tells how many * threads are waiting on an idle worker. */ - if (queue_info->idlers < zero_pt) { + while (queue_info->idlers < zero_pt) { + if (queue_info->terminated) { + apr_thread_mutex_unlock(queue_info->idlers_mutex); + return APR_EOF; + } if (had_to_block) { *had_to_block = 1; } @@ -199,7 +226,7 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, } } - if (queue_info->terminated) { + if (apr_atomic_read32(&queue_info->terminated)) { return APR_EOF; } else { @@ -207,10 +234,14 @@ apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, } } +apr_int32_t ap_queue_info_avail(fd_queue_info_t *queue_info) +{ + return apr_atomic_read32(&queue_info->idlers) - zero_pt; +} + apr_uint32_t ap_queue_info_num_idlers(fd_queue_info_t *queue_info) { - apr_uint32_t val; - val = apr_atomic_read32(&queue_info->idlers); + apr_uint32_t val = apr_atomic_read32(&queue_info->idlers); return (val > zero_pt) ? val - zero_pt : 0; } @@ -305,44 +336,25 @@ apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info) return rv; } - queue_info->terminated = 1; + apr_atomic_set32(&queue_info->terminated, 1); apr_thread_cond_broadcast(queue_info->wait_for_idler); return apr_thread_mutex_unlock(queue_info->idlers_mutex); } -/** +/* * Detects when the fd_queue_t is full. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ #define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds) -/** +/* * Detects when the fd_queue_t is empty. This utility function is expected * to be called from within critical sections, and is not threadsafe. */ -#define ap_queue_empty(queue) ((queue)->nelts == 0 && \ - APR_RING_EMPTY(&queue->timers, \ - timer_event_t, link)) - -/** - * Callback routine that is called to destroy this - * fd_queue_t when its pool is destroyed. - */ -static apr_status_t ap_queue_destroy(void *data) -{ - fd_queue_t *queue = data; +#define ap_queue_empty(queue) ((queue)->nelts == 0) - /* Ignore errors here, we can't do anything about them anyway. - * XXX: We should at least try to signal an error here, it is - * indicative of a programmer error. -aaron */ - apr_thread_cond_destroy(queue->not_empty); - apr_thread_mutex_destroy(queue->one_big_mutex); - - return APR_SUCCESS; -} - -/** +/* * Initialize the fd_queue_t. */ apr_status_t ap_queue_create(fd_queue_t **pqueue, int capacity, apr_pool_t *p) @@ -361,139 +373,287 @@ apr_status_t ap_queue_create(fd_queue_t **pqueue, int capacity, apr_pool_t *p) return rv; } - APR_RING_INIT(&queue->timers, timer_event_t, link); - - queue->data = apr_pcalloc(p, capacity * sizeof(fd_queue_elem_t)); - queue->bounds = capacity; + apr_pool_create(&queue->spare_pool, p); + APR_RING_INIT(&queue->elts, fd_queue_elem_t, link); + queue->bounds = (capacity > 0) ? capacity : APR_UINT32_MAX; - apr_pool_cleanup_register(p, queue, ap_queue_destroy, - apr_pool_cleanup_null); *pqueue = queue; - return APR_SUCCESS; } -/** - * Push a new socket onto the queue. - * - * precondition: ap_queue_info_wait_for_idler has already been called - * to reserve an idle worker thread - */ -apr_status_t ap_queue_push_socket(fd_queue_t *queue, - apr_socket_t *sd, void *sd_baton, - apr_pool_t *p) +static fd_queue_elem_t *get_spare_elem(fd_queue_t *queue) +{ + fd_queue_elem_t *elem = queue->spare_elems; + if (elem == NULL) { + elem = apr_pcalloc(queue->spare_pool, sizeof(*elem)); + } + else { + queue->spare_elems = elem->next; + elem->next = NULL; + } + return elem; +} + +static void put_spare_elem(fd_queue_t *queue, fd_queue_elem_t *elem) +{ + elem->next = queue->spare_elems; + queue->spare_elems = elem; + elem->event = NULL; +} + +/* Pushes the last available element to the queue. */ +static void push_elem(fd_queue_t *queue, fd_queue_elem_t **pushed_elem, + fd_queue_event_t *event) { fd_queue_elem_t *elem; + + AP_DEBUG_ASSERT(!ap_queue_full(queue)); + AP_DEBUG_ASSERT(!queue->terminated); + + elem = get_spare_elem(queue); + if (event) { + elem->event = event; + } + else { + elem->event = &elem->my_event; + } + elem->event->elem = elem; + + APR_RING_INSERT_TAIL(&queue->elts, elem, fd_queue_elem_t, link); + queue->nelts++; + + if (pushed_elem) { + *pushed_elem = elem; + } +} + +static void APR_INLINE drop_elem(fd_queue_t *queue, fd_queue_elem_t *elem) +{ + elem->event->elem = NULL; + APR_RING_REMOVE(elem, link); + APR_RING_ELEM_INIT(elem, link); + ap_assert(queue->nelts > 0); + queue->nelts--; +} + +/* + * Retrieves the oldest available element from the queue, waiting until one + * becomes available. + */ +static apr_status_t pop_elem(fd_queue_t *queue, fd_queue_elem_t **popped_elem) +{ + for (;;) { + apr_status_t rv; + + if (queue->terminated) { + return APR_EOF; /* no more elements ever again */ + } + + if (!ap_queue_empty(queue)) { + *popped_elem = APR_RING_FIRST(&queue->elts); + drop_elem(queue, *popped_elem); + return APR_SUCCESS; + } + + queue->num_waiters++; + rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); + queue->num_waiters--; + if (rv != APR_SUCCESS) { + return rv; + } + + if (queue->num_interrupts) { + queue->num_interrupts--; + return queue->terminated ? APR_EOF : APR_EINTR; + } + } +} + +apr_status_t ap_queue_push_event(fd_queue_t *queue, fd_queue_event_t *event) +{ apr_status_t rv; if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } - AP_DEBUG_ASSERT(!queue->terminated); - AP_DEBUG_ASSERT(!ap_queue_full(queue)); + switch (event->type) { + case FD_QUEUE_EVENT_SOCK: + case FD_QUEUE_EVENT_TIMER: + case FD_QUEUE_EVENT_BATON: + push_elem(queue, NULL, event); + if (event->cb) { + event->cb(event->cb_baton, 1); + } + if (queue->num_waiters) { + apr_thread_cond_signal(queue->not_empty); + } + break; - elem = &queue->data[queue->in++]; - if (queue->in >= queue->bounds) - queue->in -= queue->bounds; - elem->sd = sd; - elem->sd_baton = sd_baton; - elem->p = p; - queue->nelts++; + default: + rv = APR_EINVAL; + break; + } + + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; +} + +apr_status_t ap_queue_pop_event(fd_queue_t *queue, fd_queue_event_t **pevent) +{ + apr_status_t rv; + fd_queue_elem_t *elem; + + *pevent = NULL; + + if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { + return rv; + } + + rv = pop_elem(queue, &elem); + if (rv == APR_SUCCESS) { + fd_queue_event_t *event = elem->event; + ap_assert(event && event != &elem->my_event); + put_spare_elem(queue, elem); + if (event->cb) { + event->cb(event->cb_baton, 0); + } + *pevent = event; + } + + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; +} + +void ap_queue_kill_event_locked(fd_queue_t *queue, fd_queue_event_t *event) +{ + fd_queue_elem_t *elem = event->elem; + ap_assert(elem && APR_RING_NEXT(elem, link) != elem); - apr_thread_cond_signal(queue->not_empty); + drop_elem(queue, elem); + put_spare_elem(queue, elem); + if (event->cb) { + event->cb(event->cb_baton, 0); + } +} +apr_status_t ap_queue_lock(fd_queue_t *queue) +{ + return apr_thread_mutex_lock(queue->one_big_mutex); +} + +apr_status_t ap_queue_unlock(fd_queue_t *queue) +{ return apr_thread_mutex_unlock(queue->one_big_mutex); } -apr_status_t ap_queue_push_timer(fd_queue_t *queue, timer_event_t *te) +/** + * Push something onto the queue. + */ +apr_status_t ap_queue_push_something(fd_queue_t *queue, + apr_socket_t *sd, void *baton, + apr_pool_t *p, timer_event_t *te) { apr_status_t rv; + fd_queue_elem_t *elem; + + ap_assert(sd || te); if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } - AP_DEBUG_ASSERT(!queue->terminated); - - APR_RING_INSERT_TAIL(&queue->timers, te, timer_event_t, link); + push_elem(queue, &elem, NULL); + if (te) { + elem->event->type = FD_QUEUE_EVENT_TIMER; + elem->event->data.te = te; + } + else { + elem->event->type = FD_QUEUE_EVENT_SOCK; + elem->event->data.se = &elem->my_sock_event; + elem->event->data.se->sd = sd; + elem->event->data.se->baton = baton; + elem->event->data.se->p = p; + } - apr_thread_cond_signal(queue->not_empty); + if (queue->num_waiters) { + apr_thread_cond_signal(queue->not_empty); + } return apr_thread_mutex_unlock(queue->one_big_mutex); } /** - * Retrieves the next available socket from the queue. If there are no - * sockets available, it will block until one becomes available. - * Once retrieved, the socket is placed into the address specified by - * 'sd'. + * Pop something from the queue. */ apr_status_t ap_queue_pop_something(fd_queue_t *queue, - apr_socket_t **sd, void **sd_baton, - apr_pool_t **p, timer_event_t **te_out) + apr_socket_t **sd, void **baton, + apr_pool_t **p, timer_event_t **te) { - fd_queue_elem_t *elem; - timer_event_t *te; apr_status_t rv; + fd_queue_elem_t *elem; + + ap_assert(sd); + + if (sd) { + *sd = NULL; + } + if (baton) { + *baton = NULL; + } + if (p) { + *p = NULL; + } + if (te) { + *te = NULL; + } if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { return rv; } - /* Keep waiting until we wake up and find that the queue is not empty. */ - if (ap_queue_empty(queue)) { - if (!queue->terminated) { - apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); - } - /* If we wake up and it's still empty, then we were interrupted */ - if (ap_queue_empty(queue)) { - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - if (queue->terminated) { - return APR_EOF; /* no more elements ever again */ + rv = pop_elem(queue, &elem); + if (rv == APR_SUCCESS) { + fd_queue_event_t *event = elem->event; + ap_assert(event && event == &elem->my_event); + switch (event->type) { + case FD_QUEUE_EVENT_SOCK: + ap_assert(sd && event->data.se); + *sd = event->data.se->sd; + if (baton) { + *baton = event->data.se->baton; } - else { - return APR_EINTR; + if (p) { + *p = event->data.se->p; } - } - } + break; - te = NULL; - if (te_out) { - if (!APR_RING_EMPTY(&queue->timers, timer_event_t, link)) { - te = APR_RING_FIRST(&queue->timers); - APR_RING_REMOVE(te, link); - } - *te_out = te; - } - if (!te) { - elem = &queue->data[queue->out++]; - if (queue->out >= queue->bounds) - queue->out -= queue->bounds; - queue->nelts--; + case FD_QUEUE_EVENT_TIMER: + ap_assert(te && event->data.te); + *te = event->data.te; + break; - *sd = elem->sd; - if (sd_baton) { - *sd_baton = elem->sd_baton; + case FD_QUEUE_EVENT_BATON: + ap_assert(baton && event->data.baton); + *baton = event->data.baton; + break; + + default: + ap_assert(0); + break; } - *p = elem->p; -#ifdef AP_DEBUG - elem->sd = NULL; - elem->p = NULL; -#endif /* AP_DEBUG */ + put_spare_elem(queue, elem); } - return apr_thread_mutex_unlock(queue->one_big_mutex); + apr_thread_mutex_unlock(queue->one_big_mutex); + return rv; } static apr_status_t queue_interrupt(fd_queue_t *queue, int all, int term) { apr_status_t rv; - if (queue->terminated) { + if (apr_atomic_read32(&queue->terminated)) { return APR_EOF; } @@ -506,12 +666,18 @@ static apr_status_t queue_interrupt(fd_queue_t *queue, int all, int term) * would-be popper checks it but right before they block */ if (term) { - queue->terminated = 1; + apr_atomic_set32(&queue->terminated, 1); + } + if (queue->num_waiters) { + if (all) { + queue->num_interrupts = queue->num_waiters; + apr_thread_cond_broadcast(queue->not_empty); + } + else { + queue->num_interrupts = 1; + apr_thread_cond_signal(queue->not_empty); + } } - if (all) - apr_thread_cond_broadcast(queue->not_empty); - else - apr_thread_cond_signal(queue->not_empty); return apr_thread_mutex_unlock(queue->one_big_mutex); } diff --git a/server/mpm_fdqueue.h b/server/mpm_fdqueue.h index 260e22ab80e..149751af305 100644 --- a/server/mpm_fdqueue.h +++ b/server/mpm_fdqueue.h @@ -40,8 +40,10 @@ #include #include +struct fd_queue_t; /* opaque */ struct fd_queue_info_t; /* opaque */ struct fd_queue_elem_t; /* opaque */ +typedef struct fd_queue_t fd_queue_t; typedef struct fd_queue_info_t fd_queue_info_t; typedef struct fd_queue_elem_t fd_queue_elem_t; @@ -50,9 +52,11 @@ AP_DECLARE(apr_status_t) ap_queue_info_create(fd_queue_info_t **queue_info, int max_recycled_pools); AP_DECLARE(apr_status_t) ap_queue_info_set_idle(fd_queue_info_t *queue_info, apr_pool_t *pool_to_recycle); +AP_DECLARE(apr_status_t) ap_queue_info_get_idler(fd_queue_info_t *queue_info); AP_DECLARE(apr_status_t) ap_queue_info_try_get_idler(fd_queue_info_t *queue_info); AP_DECLARE(apr_status_t) ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, int *had_to_block); +AP_DECLARE(apr_int32_t) ap_queue_info_avail(fd_queue_info_t *queue_info); AP_DECLARE(apr_uint32_t) ap_queue_info_num_idlers(fd_queue_info_t *queue_info); AP_DECLARE(apr_status_t) ap_queue_info_term(fd_queue_info_t *queue_info); @@ -62,6 +66,22 @@ AP_DECLARE(void) ap_queue_info_push_pool(fd_queue_info_t *queue_info, apr_pool_t *pool_to_recycle); AP_DECLARE(void) ap_queue_info_free_idle_pools(fd_queue_info_t *queue_info); +enum fd_queue_event_type_e +{ + FD_QUEUE_EVENT_SOCK, + FD_QUEUE_EVENT_TIMER, + FD_QUEUE_EVENT_BATON, +}; +typedef enum fd_queue_event_type_e fd_queue_event_type_e; + +struct sock_event_t +{ + apr_pool_t *p; + apr_socket_t *sd; + void *baton; +}; +typedef struct sock_event_t sock_event_t; + struct timer_event_t { APR_RING_ENTRY(timer_event_t) link; @@ -74,33 +94,50 @@ struct timer_event_t }; typedef struct timer_event_t timer_event_t; -struct fd_queue_t +struct fd_queue_event_t { - APR_RING_HEAD(timers_t, timer_event_t) timers; - fd_queue_elem_t *data; - unsigned int nelts; - unsigned int bounds; - unsigned int in; - unsigned int out; - apr_thread_mutex_t *one_big_mutex; - apr_thread_cond_t *not_empty; - volatile int terminated; + /* queue container (used internally) */ + fd_queue_elem_t *elem; + + /* called back when (de)queuing (under the queue lock) */ + void (*cb)(void *cb_baton, int push); + void *cb_baton; + + /* event data */ + fd_queue_event_type_e type; + union { + sock_event_t *se; + timer_event_t *te; + void *baton; + } data; }; -typedef struct fd_queue_t fd_queue_t; +typedef struct fd_queue_event_t fd_queue_event_t; + +apr_status_t ap_queue_create(fd_queue_t **queue, int capacity, apr_pool_t *p); + +/* mpm_event API */ +AP_DECLARE(apr_status_t) ap_queue_push_event(fd_queue_t *queue, + fd_queue_event_t *event); +AP_DECLARE(apr_status_t) ap_queue_pop_event(fd_queue_t *queue, + fd_queue_event_t **event); +AP_DECLARE(apr_status_t) ap_queue_lock(fd_queue_t *queue); +AP_DECLARE(apr_status_t) ap_queue_unlock(fd_queue_t *queue); +AP_DECLARE(void) ap_queue_kill_event_locked(fd_queue_t *queue, + fd_queue_event_t *event); -AP_DECLARE(apr_status_t) ap_queue_create(fd_queue_t **pqueue, - int capacity, apr_pool_t *p); -AP_DECLARE(apr_status_t) ap_queue_push_socket(fd_queue_t *queue, - apr_socket_t *sd, void *sd_baton, - apr_pool_t *p); -AP_DECLARE(apr_status_t) ap_queue_push_timer(fd_queue_t *queue, - timer_event_t *te); +/* mpm_worker API */ +AP_DECLARE(apr_status_t) ap_queue_push_something(fd_queue_t *queue, + apr_socket_t *sd, void *sd_baton, + apr_pool_t *p, timer_event_t *te); AP_DECLARE(apr_status_t) ap_queue_pop_something(fd_queue_t *queue, apr_socket_t **sd, void **sd_baton, apr_pool_t **p, timer_event_t **te); -#define ap_queue_pop_socket(q_, s_, p_) \ - ap_queue_pop_something((q_), (s_), NULL, (p_), NULL) +#define ap_queue_push_socket(q_, s_, p_) \ + ap_queue_push_something((q_), (s_), NULL, (p_), NULL) +#define ap_queue_pop_socket(q_, s_, p_) \ + ap_queue_pop_something((q_), (s_), NULL, (p_), NULL) +/* common API */ AP_DECLARE(apr_status_t) ap_queue_interrupt_all(fd_queue_t *queue); AP_DECLARE(apr_status_t) ap_queue_interrupt_one(fd_queue_t *queue); AP_DECLARE(apr_status_t) ap_queue_term(fd_queue_t *queue); diff --git a/support/ab.c b/support/ab.c index bd44ead6ba0..136392c43a6 100644 --- a/support/ab.c +++ b/support/ab.c @@ -1819,7 +1819,7 @@ static void read_connection(struct connection * c) } /* are we done? */ - if (started >= requests && (c->bread >= c->length)) { + if (done >= requests && (c->bread >= c->length)) { close_connection(c); }