Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent caching of oid in pidbox #1394

Merged
merged 1 commit into from
Oct 5, 2021

Conversation

matusvalo
Copy link
Member

@matusvalo matusvalo commented Sep 29, 2021

oid is not cached anymore due race conditions of oid in celery. Caching
oid is causing following exception:

OperationalError("
Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.
",)

This exception seems to be occuring when multiple celery workers
contains same oid.

Fixes #1063

@matusvalo matusvalo marked this pull request as draft September 29, 2021 21:00
@matusvalo
Copy link
Member Author

Marking as draft for now. Let's wait for confirmation of this fix.

@lgtm-com
Copy link

lgtm-com bot commented Sep 29, 2021

This pull request introduces 1 alert when merging d314d73 into 5ef5e22 - view on LGTM.com

new alerts:

  • 1 for Unused import

oid is not cached anymore due race conditions of oid in celery. Caching
oid is causing following exception:

OperationalError("
Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.
",)

This exception seems to be occuring when multiple celery workers
contains same oid.
@auvipy
Copy link
Member

auvipy commented Sep 30, 2021

looks good

@thedrow
Copy link
Member

thedrow commented Oct 4, 2021

Hold up, how does this work if the OID changes every time?
How do multiple workers end up with the same OID?

Each thread generates a new OID, so how there is a clash?

Copy link
Member

@thedrow thedrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have questions about this.

@matusvalo
Copy link
Member Author

matusvalo commented Oct 4, 2021

Hold up, how does this work if the OID changes every time?

OID won't change everytime. The generation of OID is bound to process, thread and object ID - see

kombu/kombu/common.py

Lines 49 to 55 in 5ef5e22

def oid_from(instance, threads=True):
return generate_oid(
get_node_id(),
os.getpid(),
threading.get_ident() if threads else 0,
instance,
)

kombu/kombu/common.py

Lines 39 to 46 in 5ef5e22

def generate_oid(node_id, process_id, thread_id, instance):
ent = '{:x}-{:x}-{:x}-{:x}'.format(
node_id, process_id, thread_id, id(instance))
try:
ret = str(uuid3(NAMESPACE_OID, ent))
except ValueError:
ret = str(uuid5(NAMESPACE_OID, ent))
return ret

From python documentation we read:

uuid.uuid3(namespace, name)¶
Generate a UUID based on the MD5 hash of a namespace identifier (which is a UUID) and a name (which is a string). [1]

Hence in the same process and thread and for each pidbox object the oid_from() function will return the same oid.

How do multiple workers end up with the same OID?

The problem is with caching. The cache is stored in the heap and hence it is replicated across processes during fork() when new worker is spawned. Even thread local storage won't help -see following proof:

import threading
import multiprocessing
mydata = threading.local()
mydata.x = 1

def f():
    print('f: ', mydata.x)

print('main process:', mydata.x)
t1 = threading.Thread(target=f)
t2 = threading.Thread(target=f)
print('starting t1')
t1.start()
print('starting t2')
t2.start()

p = multiprocessing.Process(target=f)
print('starting p')
p.start()

Running this script will print one for subprocess and raise attribute error for each thread. It means subthreads of process has local variable but subprocess not.

The same holds for pidbox oid - see example below:

import multiprocessing
from kombu import pidbox

m = pidbox.Mailbox('unittest')
print(m.oid)

def getoid():
    print('getoid: ', m.oid)

p = multiprocessing.Process(target=getoid)
print('starting p')
p.start()

print(m.oid)

In this case both master process and subprocess will share the oid which is wrong:

>>> import multiprocessing
>>> from kombu import pidbox
>>>
>>> m = pidbox.Mailbox('unittest')
>>> print(m.oid)
b8001cb8-1ae5-36f5-87f4-2660008969ec
>>>
>>> def getoid():
...     print('getoid: ', m.oid)
...
>>> p = multiprocessing.Process(target=getoid)
>>> print('starting p')
starting p
>>> p.start()
>>>
>>> print(m.oid)getoid:  b8001cb8-1ae5-36f5-87f4-2660008969ec

b8001cb8-1ae5-36f5-87f4-2660008969ec
>>>

Each thread generates a new OID, so how there is a clash?

See my explanation above

[1] https://docs.python.org/3/library/uuid.html#uuid.uuid3

@matusvalo matusvalo requested a review from thedrow October 4, 2021 21:24
@thedrow thedrow marked this pull request as ready for review October 5, 2021 10:45
Copy link
Member

@thedrow thedrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the analysis.
This is a bug in cached_property's implementation, it seems.

@thedrow
Copy link
Member

thedrow commented Oct 5, 2021

Note that the PyPy integration tests build is failing because we now only support 3.7 and above.

@thedrow thedrow merged commit 96ca00f into celery:master Oct 5, 2021
@matusvalo
Copy link
Member Author

This is a bug in cached_property's implementation, it seems.

The bug in celery is due combination of cached_property, wrong usage of thread local storage.

I will add also unittests for this PR (99% done)

@thedrow
Copy link
Member

thedrow commented Oct 5, 2021

Oh, sorry.
I already merged the PR.
You can add the unit tests in another PR. 😄

matusvalo added a commit that referenced this pull request Oct 5, 2021
@matusvalo matusvalo deleted the redis_missing_key_fix branch October 5, 2021 12:05
@matusvalo matusvalo mentioned this pull request Nov 3, 2021
keithgg pushed a commit to open-craft/kombu that referenced this pull request Aug 11, 2022
oid is not cached anymore due race conditions of oid in celery. Caching
oid is causing following exception:

OperationalError("
Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.
",)

This exception seems to be occuring when multiple celery workers
contains same oid.
keithgg pushed a commit to open-craft/kombu that referenced this pull request Aug 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Table empty or key no longer exists
3 participants