Skip to content

Commit

Permalink
fix: event handler methods are not thread-safe
Browse files Browse the repository at this point in the history
The _client_handlers dictionary allowed modifications during iteration
without proper concurrency control. I added some reentrant locks to manage
concurrent access to the _global_handlers and _client_handlers data
structures.

See open-feature#326

Signed-off-by: Federico Bond <federicobond@gmail.com>
  • Loading branch information
federicobond committed May 2, 2024
1 parent c3ad697 commit 3427796
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
43 changes: 28 additions & 15 deletions openfeature/_event_support.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import threading
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List

Expand All @@ -15,7 +16,10 @@
from openfeature.client import OpenFeatureClient


_global_lock = threading.RLock()
_global_handlers: Dict[ProviderEvent, List[EventHandler]] = defaultdict(list)

_client_lock = threading.RLock()
_client_handlers: Dict[OpenFeatureClient, Dict[ProviderEvent, List[EventHandler]]] = (
defaultdict(lambda: defaultdict(list))
)
Expand All @@ -24,41 +28,47 @@
def run_client_handlers(
client: OpenFeatureClient, event: ProviderEvent, details: EventDetails
) -> None:
for handler in _client_handlers[client][event]:
handler(details)
with _client_lock:
for handler in _client_handlers[client][event]:
handler(details)


def run_global_handlers(event: ProviderEvent, details: EventDetails) -> None:
for handler in _global_handlers[event]:
handler(details)
with _global_lock:
for handler in _global_handlers[event]:
handler(details)


def add_client_handler(
client: OpenFeatureClient, event: ProviderEvent, handler: EventHandler
) -> None:
handlers = _client_handlers[client][event]
handlers.append(handler)
with _client_lock:
handlers = _client_handlers[client][event]
handlers.append(handler)

_run_immediate_handler(client, event, handler)


def remove_client_handler(
client: OpenFeatureClient, event: ProviderEvent, handler: EventHandler
) -> None:
handlers = _client_handlers[client][event]
handlers.remove(handler)
with _client_lock:
handlers = _client_handlers[client][event]
handlers.remove(handler)


def add_global_handler(event: ProviderEvent, handler: EventHandler) -> None:
_global_handlers[event].append(handler)
with _global_lock:
_global_handlers[event].append(handler)

from openfeature.api import get_client

_run_immediate_handler(get_client(), event, handler)


def remove_global_handler(event: ProviderEvent, handler: EventHandler) -> None:
_global_handlers[event].remove(handler)
with _global_lock:
_global_handlers[event].remove(handler)


def run_handlers_for_provider(
Expand All @@ -72,9 +82,10 @@ def run_handlers_for_provider(
# run the global handlers
run_global_handlers(event, details)
# run the handlers for clients associated to this provider
for client in _client_handlers:
if client.provider == provider:
run_client_handlers(client, event, details)
with _client_lock:
for client in _client_handlers:
if client.provider == provider:
run_client_handlers(client, event, details)


def _run_immediate_handler(
Expand All @@ -91,5 +102,7 @@ def _run_immediate_handler(


def clear() -> None:
_global_handlers.clear()
_client_handlers.clear()
with _global_lock:
_global_handlers.clear()
with _client_lock:
_client_handlers.clear()
28 changes: 28 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import MagicMock

import pytest
Expand Down Expand Up @@ -356,3 +359,28 @@ def test_provider_event_late_binding():

# Then
spy.provider_configuration_changed.assert_called_once_with(details)


def test_client_handlers_thread_safety():
provider = NoOpProvider()
set_provider(provider)

def add_handlers_task():
def handler(*args, **kwargs):
time.sleep(0.005)

for _ in range(10):
time.sleep(0.01)
client = get_client(str(uuid.uuid4()))
client.add_handler(ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, handler)

def emit_events_task():
for _ in range(10):
time.sleep(0.01)
provider.emit_provider_configuration_changed(ProviderEventDetails())

with ThreadPoolExecutor(max_workers=2) as executor:
f1 = executor.submit(add_handlers_task)
f2 = executor.submit(emit_events_task)
f1.result()
f2.result()

0 comments on commit 3427796

Please sign in to comment.