-
-
Notifications
You must be signed in to change notification settings - Fork 927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prevent caching of oid in pidbox #1394
Conversation
Marking as draft for now. Let's wait for confirmation of this fix. |
This pull request introduces 1 alert when merging d314d73 into 5ef5e22 - view on LGTM.com new alerts:
|
oid is not cached anymore due race conditions of oid in celery. Caching oid is causing following exception: OperationalError(" Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists. Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database. ",) This exception seems to be occuring when multiple celery workers contains same oid.
d314d73
to
6394dae
Compare
looks good |
Hold up, how does this work if the OID changes every time? Each thread generates a new OID, so how there is a clash? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have questions about this.
OID won't change everytime. The generation of OID is bound to process, thread and object ID - see Lines 49 to 55 in 5ef5e22
Lines 39 to 46 in 5ef5e22
From python documentation we read: uuid.uuid3(namespace, name)¶ Hence in the same process and thread and for each pidbox object the
The problem is with caching. The cache is stored in the heap and hence it is replicated across processes during fork() when new worker is spawned. Even thread local storage won't help -see following proof: import threading
import multiprocessing
mydata = threading.local()
mydata.x = 1
def f():
print('f: ', mydata.x)
print('main process:', mydata.x)
t1 = threading.Thread(target=f)
t2 = threading.Thread(target=f)
print('starting t1')
t1.start()
print('starting t2')
t2.start()
p = multiprocessing.Process(target=f)
print('starting p')
p.start() Running this script will print one for subprocess and raise attribute error for each thread. It means subthreads of process has local variable but subprocess not. The same holds for pidbox oid - see example below: import multiprocessing
from kombu import pidbox
m = pidbox.Mailbox('unittest')
print(m.oid)
def getoid():
print('getoid: ', m.oid)
p = multiprocessing.Process(target=getoid)
print('starting p')
p.start()
print(m.oid) In this case both master process and subprocess will share the oid which is wrong: >>> import multiprocessing
>>> from kombu import pidbox
>>>
>>> m = pidbox.Mailbox('unittest')
>>> print(m.oid)
b8001cb8-1ae5-36f5-87f4-2660008969ec
>>>
>>> def getoid():
... print('getoid: ', m.oid)
...
>>> p = multiprocessing.Process(target=getoid)
>>> print('starting p')
starting p
>>> p.start()
>>>
>>> print(m.oid)getoid: b8001cb8-1ae5-36f5-87f4-2660008969ec
b8001cb8-1ae5-36f5-87f4-2660008969ec
>>>
See my explanation above |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the analysis.
This is a bug in cached_property
's implementation, it seems.
Note that the PyPy integration tests build is failing because we now only support 3.7 and above. |
The bug in celery is due combination of cached_property, wrong usage of thread local storage. I will add also unittests for this PR (99% done) |
Oh, sorry. |
oid is not cached anymore due race conditions of oid in celery. Caching oid is causing following exception: OperationalError(" Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists. Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database. ",) This exception seems to be occuring when multiple celery workers contains same oid.
oid is not cached anymore due race conditions of oid in celery. Caching
oid is causing following exception:
This exception seems to be occuring when multiple celery workers
contains same oid.
Fixes #1063