Skip to content

Commit

Permalink
workqueue: Add pwq->stats[] and a monitoring script
Browse files Browse the repository at this point in the history
Currently, the only way to peer into workqueue operations is through
tracing. While possible, it isn't easy or convenient to monitor
per-workqueue behaviors over time this way. Let's add pwq->stats[] that
track relevant events and a drgn monitoring script -
tools/workqueue/wq_monitor.py.

It's arguable whether this needs to be configurable. However, it currently
only has several counters and the runtime overhead shouldn't be noticeable
given that they're on pwq's which are per-cpu on per-cpu workqueues and
per-numa-node on unbound ones. Let's keep it simple for the time being.

v2: Patch reordered to earlier with fewer fields. Field will be added back
    gradually. Help message improved.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
  • Loading branch information
htejun committed May 18, 2023
1 parent 854f5cc commit 725e8ec
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 1 deletion.
32 changes: 32 additions & 0 deletions Documentation/core-api/workqueue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,37 @@ Guidelines
level of locality in wq operations and work item execution.


Monitoring
==========

Use tools/workqueue/wq_monitor.py to monitor workqueue operations: ::

$ tools/workqueue/wq_monitor.py events
total infl CMwake mayday rescued
events 18545 0 5 - -
events_highpri 8 0 0 - -
events_long 3 0 0 - -
events_unbound 38306 0 - - -
events_freezable 0 0 0 - -
events_power_efficient 29598 0 0 - -
events_freezable_power_ 10 0 0 - -
sock_diag_events 0 0 0 - -

total infl CMwake mayday rescued
events 18548 0 5 - -
events_highpri 8 0 0 - -
events_long 3 0 0 - -
events_unbound 38322 0 - - -
events_freezable 0 0 0 - -
events_power_efficient 29603 0 0 - -
events_freezable_power_ 10 0 0 - -
sock_diag_events 0 0 0 - -

...

See the command's help message for more info.


Debugging
=========

Expand Down Expand Up @@ -387,6 +418,7 @@ the stack trace of the offending worker thread. ::
The work item's function should be trivially visible in the stack
trace.


Non-reentrance Conditions
=========================

Expand Down
24 changes: 23 additions & 1 deletion kernel/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@ struct worker_pool {
struct rcu_head rcu;
};

/*
* Per-pool_workqueue statistics. These can be monitored using
* tools/workqueue/wq_monitor.py.
*/
enum pool_workqueue_stats {
PWQ_STAT_STARTED, /* work items started execution */
PWQ_STAT_COMPLETED, /* work items completed execution */
PWQ_STAT_CM_WAKEUP, /* concurrency-management worker wakeups */
PWQ_STAT_MAYDAY, /* maydays to rescuer */
PWQ_STAT_RESCUED, /* linked work items executed by rescuer */

PWQ_NR_STATS,
};

/*
* The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
* of work_struct->data are used for flags and the remaining high bits
Expand Down Expand Up @@ -236,6 +250,8 @@ struct pool_workqueue {
struct list_head pwqs_node; /* WR: node on wq->pwqs */
struct list_head mayday_node; /* MD: node on wq->maydays */

u64 stats[PWQ_NR_STATS];

/*
* Release of unbound pwq is punted to system_wq. See put_pwq()
* and pwq_unbound_release_workfn() for details. pool_workqueue
Expand Down Expand Up @@ -929,8 +945,10 @@ void wq_worker_sleeping(struct task_struct *task)
}

pool->nr_running--;
if (need_more_worker(pool))
if (need_more_worker(pool)) {
worker->current_pwq->stats[PWQ_STAT_CM_WAKEUP]++;
wake_up_worker(pool);
}
raw_spin_unlock_irq(&pool->lock);
}

Expand Down Expand Up @@ -2165,6 +2183,7 @@ static void send_mayday(struct work_struct *work)
get_pwq(pwq);
list_add_tail(&pwq->mayday_node, &wq->maydays);
wake_up_process(wq->rescuer->task);
pwq->stats[PWQ_STAT_MAYDAY]++;
}
}

Expand Down Expand Up @@ -2403,13 +2422,15 @@ __acquires(&pool->lock)
* workqueues), so hiding them isn't a problem.
*/
lockdep_invariant_state(true);
pwq->stats[PWQ_STAT_STARTED]++;
trace_workqueue_execute_start(work);
worker->current_func(work);
/*
* While we must be careful to not use "work" after this, the trace
* point will only record its address.
*/
trace_workqueue_execute_end(work, worker->current_func);
pwq->stats[PWQ_STAT_COMPLETED]++;
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);

Expand Down Expand Up @@ -2653,6 +2674,7 @@ static int rescuer_thread(void *__rescuer)
if (first)
pool->watchdog_ts = jiffies;
move_linked_works(work, scheduled, &n);
pwq->stats[PWQ_STAT_RESCUED]++;
}
first = false;
}
Expand Down
150 changes: 150 additions & 0 deletions tools/workqueue/wq_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/usr/bin/env drgn
#
# Copyright (C) 2023 Tejun Heo <tj@kernel.org>
# Copyright (C) 2023 Meta Platforms, Inc. and affiliates.

desc = """
This is a drgn script to monitor workqueues. For more info on drgn, visit
https://github.com/osandov/drgn.
total Total number of work items executed by the workqueue.
infl The number of currently in-flight work items.
CMwake The number of concurrency-management wake-ups while executing a
work item of the workqueue.
mayday The number of times the rescuer was requested while waiting for
new worker creation.
rescued The number of work items executed by the rescuer.
"""

import sys
import signal
import os
import re
import time
import json

import drgn
from drgn.helpers.linux.list import list_for_each_entry,list_empty
from drgn.helpers.linux.cpumask import for_each_possible_cpu

import argparse
parser = argparse.ArgumentParser(description=desc,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('workqueue', metavar='REGEX', nargs='*',
help='Target workqueue name patterns (all if empty)')
parser.add_argument('-i', '--interval', metavar='SECS', type=float, default=1,
help='Monitoring interval (0 to print once and exit)')
parser.add_argument('-j', '--json', action='store_true',
help='Output in json')
args = parser.parse_args()

def err(s):
print(s, file=sys.stderr, flush=True)
sys.exit(1)

workqueues = prog['workqueues']

WQ_UNBOUND = prog['WQ_UNBOUND']
WQ_MEM_RECLAIM = prog['WQ_MEM_RECLAIM']

PWQ_STAT_STARTED = prog['PWQ_STAT_STARTED'] # work items started execution
PWQ_STAT_COMPLETED = prog['PWQ_STAT_COMPLETED'] # work items completed execution
PWQ_STAT_CM_WAKEUP = prog['PWQ_STAT_CM_WAKEUP'] # concurrency-management worker wakeups
PWQ_STAT_MAYDAY = prog['PWQ_STAT_MAYDAY'] # maydays to rescuer
PWQ_STAT_RESCUED = prog['PWQ_STAT_RESCUED'] # linked work items executed by rescuer
PWQ_NR_STATS = prog['PWQ_NR_STATS']

class WqStats:
def __init__(self, wq):
self.name = wq.name.string_().decode()
self.unbound = wq.flags & WQ_UNBOUND != 0
self.mem_reclaim = wq.flags & WQ_MEM_RECLAIM != 0
self.stats = [0] * PWQ_NR_STATS
for pwq in list_for_each_entry('struct pool_workqueue', wq.pwqs.address_of_(), 'pwqs_node'):
for i in range(PWQ_NR_STATS):
self.stats[i] += int(pwq.stats[i])

def dict(self, now):
return { 'timestamp' : now,
'name' : self.name,
'unbound' : self.unbound,
'mem_reclaim' : self.mem_reclaim,
'started' : self.stats[PWQ_STAT_STARTED],
'completed' : self.stats[PWQ_STAT_COMPLETED],
'cm_wakeup' : self.stats[PWQ_STAT_CM_WAKEUP],
'mayday' : self.stats[PWQ_STAT_MAYDAY],
'rescued' : self.stats[PWQ_STAT_RESCUED], }

def table_header_str():
return f'{"":>24} {"total":>8} {"infl":>5} {"CMwake":>7} {"mayday":>7} {"rescued":>7}'

def table_row_str(self):
cm_wakeup = '-'
mayday = '-'
rescued = '-'

if not self.unbound:
cm_wakeup = str(self.stats[PWQ_STAT_CM_WAKEUP])

if self.mem_reclaim:
mayday = str(self.stats[PWQ_STAT_MAYDAY])
rescued = str(self.stats[PWQ_STAT_RESCUED])

out = f'{self.name[-24:]:24} ' \
f'{self.stats[PWQ_STAT_STARTED]:8} ' \
f'{max(self.stats[PWQ_STAT_STARTED] - self.stats[PWQ_STAT_COMPLETED], 0):5} ' \
f'{cm_wakeup:>7} ' \
f'{mayday:>7} ' \
f'{rescued:>7} '
return out.rstrip(':')

exit_req = False

def sigint_handler(signr, frame):
global exit_req
exit_req = True

def main():
# handle args
table_fmt = not args.json
interval = args.interval

re_str = None
if args.workqueue:
for r in args.workqueue:
if re_str is None:
re_str = r
else:
re_str += '|' + r

filter_re = re.compile(re_str) if re_str else None

# monitoring loop
signal.signal(signal.SIGINT, sigint_handler)

while not exit_req:
now = time.time()

if table_fmt:
print()
print(WqStats.table_header_str())

for wq in list_for_each_entry('struct workqueue_struct', workqueues.address_of_(), 'list'):
stats = WqStats(wq)
if filter_re and not filter_re.search(stats.name):
continue
if table_fmt:
print(stats.table_row_str())
else:
print(stats.dict(now))

if interval == 0:
break
time.sleep(interval)

if __name__ == "__main__":
main()

0 comments on commit 725e8ec

Please sign in to comment.