Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-38119: Fix shmem resource tracking #15989

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1309,14 +1309,6 @@ class SharedMemoryManager(BaseManager):
_Server = SharedMemoryServer

def __init__(self, *args, **kwargs):
if os.name == "posix":
# bpo-36867: Ensure the resource_tracker is running before
# launching the manager process, so that concurrent
# shared_memory manipulation both in the manager and in the
# current process does not create two resource_tracker
# processes.
from . import resource_tracker
resource_tracker.ensure_running()
BaseManager.__init__(self, *args, **kwargs)
util.debug(f"{self.__class__.__name__} created by pid {getpid()}")

Expand Down
1 change: 0 additions & 1 deletion Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

_CLEANUP_FUNCS.update({
'semaphore': _multiprocessing.sem_unlink,
'shared_memory': _posixshmem.shm_unlink,
})


Expand Down
5 changes: 0 additions & 5 deletions Lib/multiprocessing/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ def __init__(self, name=None, create=False, size=0):
self.unlink()
raise

from .resource_tracker import register
register(self._name, "shared_memory")

else:

# Windows Named Shared Memory
Expand Down Expand Up @@ -234,9 +231,7 @@ def unlink(self):
called once (and only once) across all processes which have access
to the shared memory block."""
if _USE_POSIX and self._name:
from .resource_tracker import unregister
_posixshmem.shm_unlink(self._name)
unregister(self._name, "shared_memory")


_encoding = "utf8"
Expand Down
88 changes: 21 additions & 67 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3829,6 +3829,27 @@ def test_shared_memory_across_processes(self):

sms.close()

def test_shared_memory_across_independently_started_processes(self):
# A Process may not trigger the same exit path as an independently
# spawned process (for example, resource trackers on a separate
# process). This tests that independently created processes can
# indeed continue to access shared memory and that that shared
# memory persists at least so long as one of the processes with a
# handle on it is alive.
sms = shared_memory.SharedMemory('test04_indep', True, size=256)
self.addCleanup(sms.unlink)

prog = (
"from multiprocessing import shared_memory; "
"sms = shared_memory.SharedMemory('test04_indep'); "
"sms.buf[:4] = b'1234'; "
"sms.close()"
)
rc, out, err = test.support.script_helper.assert_python_ok('-c', prog)

self.assertFalse(err)
self.assertEqual(bytes(sms.buf[:4]), b'1234')

@unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
# bpo-36368: protect SharedMemoryManager server process from
Expand All @@ -3853,27 +3874,6 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):

smm.shutdown()

@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
# bpo-36867: test that a SharedMemoryManager uses the
# same resource_tracker process as its parent.
cmd = '''if 1:
from multiprocessing.managers import SharedMemoryManager


smm = SharedMemoryManager()
smm.start()
sl = smm.ShareableList(range(10))
smm.shutdown()
'''
rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)

# Before bpo-36867 was fixed, a SharedMemoryManager not using the same
# resource_tracker process as its parent would make the parent's
# tracker complain about sl being leaked even though smm.shutdown()
# properly released sl.
self.assertFalse(err)

def test_shared_memory_SharedMemoryManager_basics(self):
smm1 = multiprocessing.managers.SharedMemoryManager()
with self.assertRaises(ValueError):
Expand Down Expand Up @@ -4012,49 +4012,6 @@ def test_shared_memory_ShareableList_pickling(self):
deserialized_sl.shm.close()
sl.shm.close()

def test_shared_memory_cleaned_after_process_termination(self):
cmd = '''if 1:
import os, time, sys
from multiprocessing import shared_memory

# Create a shared_memory segment, and send the segment name
sm = shared_memory.SharedMemory(create=True, size=10)
sys.stdout.write(sm.name + '\\n')
sys.stdout.flush()
time.sleep(100)
'''
with subprocess.Popen([sys.executable, '-E', '-c', cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
name = p.stdout.readline().strip().decode()

# killing abruptly processes holding reference to a shared memory
# segment should not leak the given memory segment.
p.terminate()
p.wait()

deadline = time.monotonic() + 60
t = 0.1
while time.monotonic() < deadline:
time.sleep(t)
t = min(t*2, 5)
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
else:
raise AssertionError("A SharedMemory segment was leaked after"
" a process was abruptly terminated.")

if os.name == 'posix':
# A warning was emitted by the subprocess' own
# resource_tracker (on Windows, shared memory segments
# are released automatically by the OS).
err = p.stderr.read().decode()
self.assertIn(
"resource_tracker: There appear to be 1 leaked "
"shared_memory objects to clean up at shutdown", err)

#
#
#
Expand Down Expand Up @@ -5006,9 +4963,6 @@ def create_and_register_resource(rtype):
if rtype == "semaphore":
lock = mp.Lock()
return lock, lock._semlock.name
elif rtype == "shared_memory":
sm = SharedMemory(create=True, size=10)
return sm, sm._name
else:
raise ValueError(
"Resource type {{}} not understood".format(rtype))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix resource tracker treatment of shared memory as if it were a semaphore.