Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add process initializer (from Python 3.7) to complete issue 21423 implementation #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions concurrent/futures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ALL_COMPLETED,
CancelledError,
TimeoutError,
BrokenExecutor,
Future,
Executor,
wait,
Expand Down
43 changes: 40 additions & 3 deletions concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self, work_id, fn, args, kwargs):
self.args = args
self.kwargs = kwargs

def _process_worker(call_queue, result_queue):
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.
Expand All @@ -117,7 +117,17 @@ def _process_worker(call_queue, result_queue):
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
initializer: A callable initializer, or None
initargs: A tuple of args for the initializer
"""
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
# The parent will notice that the process stopped and
# mark the pool broken
return
while True:
call_item = call_queue.get(block=True)
if call_item is None:
Expand Down Expand Up @@ -218,6 +228,14 @@ def shutdown_one_process():
del work_item
# Check whether we should start shutting down.
executor = executor_reference()
if executor is not None and not _shutdown:
all_alive = all(p.is_alive() for p in processes)
if not all_alive:
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
Expand Down Expand Up @@ -263,14 +281,23 @@ def _check_system_limits():
raise NotImplementedError(_system_limited)


class BrokenProcessPool(_base.BrokenExecutor):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
"""


class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
def __init__(self, max_workers=None, initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.

Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
initializer: An callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()

Expand All @@ -282,6 +309,11 @@ def __init__(self, max_workers=None):

self._max_workers = max_workers

if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
self._initargs = initargs

# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
Expand All @@ -295,6 +327,7 @@ def __init__(self, max_workers=None):
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}

Expand All @@ -321,12 +354,16 @@ def _adjust_process_count(self):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
self._result_queue,
self._initializer,
self._initargs))
p.start()
self._processes.add(p)

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')

Expand Down
55 changes: 52 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,16 @@ And:
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

.. class:: ThreadPoolExecutor(max_workers)
.. class:: ThreadPoolExecutor(max_workers, thread_name_prefix='', initializer=None, initargs=())

Executes calls asynchronously using a pool of at most *max_workers* threads.
An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.

*initializer* is an optional callable that is called at the start of
each worker thread; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. _threadpoolexecutor-example:

Expand Down Expand Up @@ -156,12 +163,18 @@ only picklable objects can be executed and returned.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None)
.. class:: ProcessPoolExecutor(max_workers=None, initializer=None, initargs=())

Executes calls asynchronously using a pool of at most *max_workers*
processes. If *max_workers* is ``None`` or not given then as many worker
processes will be created as the machine has processors.

*initializer* is an optional callable that is called at the start of
each worker process; *initargs* is a tuple of arguments passed to the
initializer. Should *initializer* raise an exception, all currently
pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
as well any attempt to submit more jobs to the pool.

.. _processpoolexecutor-example:

ProcessPoolExecutor Example
Expand Down Expand Up @@ -345,3 +358,39 @@ Module Functions
original call to :func:`as_completed`. *timeout* can be an int or float.
If *timeout* is not specified or ``None``, there is no limit to the wait
time.

Exception classes
-----------------

.. currentmodule:: concurrent.futures

.. exception:: CancelledError

Raised when a future is cancelled.

.. exception:: TimeoutError

Raised when a future operation exceeds the given timeout.

.. exception:: BrokenExecutor

Derived from :exc:`RuntimeError`, this exception class is raised
when an executor is broken for some reason, and cannot be used
to submit or execute new tasks.

.. currentmodule:: concurrent.futures.thread

.. exception:: BrokenThreadPool

Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
class is raised when one of the workers of a :class:`ThreadPoolExecutor`
has failed initializing.

.. currentmodule:: concurrent.futures.process

.. exception:: BrokenProcessPool

Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
:exc:`RuntimeError`), this exception class is raised when one of the
workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
fashion (for example, if it was killed from the outside).